aboutsummaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/pleroma/web/fed_sockets/fed_socket.ex31
-rw-r--r--lib/pleroma/web/fed_sockets/fetch_registry.ex160
-rw-r--r--lib/pleroma/web/fed_sockets/outgoing_handler.ex14
3 files changed, 72 insertions, 133 deletions
diff --git a/lib/pleroma/web/fed_sockets/fed_socket.ex b/lib/pleroma/web/fed_sockets/fed_socket.ex
index 98d64e65a..57b17174a 100644
--- a/lib/pleroma/web/fed_sockets/fed_socket.ex
+++ b/lib/pleroma/web/fed_sockets/fed_socket.ex
@@ -45,14 +45,9 @@ defmodule Pleroma.Web.FedSockets.FedSocket do
|> send_packet(socket_pid)
end
- def fetch(%SocketInfo{pid: socket_pid}, id) do
- fetch_uuid = FetchRegistry.register_fetch(id)
-
- %{action: :fetch, data: id, uuid: fetch_uuid}
- |> Jason.encode!()
- |> send_packet(socket_pid)
-
- wait_for_fetch_to_return(fetch_uuid, 0)
+ def fetch(%SocketInfo{pid: socket_pid}, data) do
+ timeout = Pleroma.Config.get([:fed_sockets, :fetch_timeout], 12_000)
+ FetchRegistry.fetch(socket_pid, data, timeout)
end
def receive_package(%SocketInfo{} = fed_socket, json) do
@@ -61,21 +56,6 @@ defmodule Pleroma.Web.FedSockets.FedSocket do
|> process_package(fed_socket)
end
- defp wait_for_fetch_to_return(uuid, cntr) do
- case FetchRegistry.check_fetch(uuid) do
- {:error, :waiting} ->
- Process.sleep(:math.pow(cntr, 3) |> Kernel.trunc())
- wait_for_fetch_to_return(uuid, cntr + 1)
-
- {:error, :missing} ->
- Logger.error("FedSocket fetch timed out - #{inspect(uuid)}")
- {:error, :timeout}
-
- {:ok, _fr} ->
- FetchRegistry.pop_fetch(uuid)
- end
- end
-
defp process_package(%{"action" => "publish", "data" => data}, %{origin: origin} = _fed_socket) do
if Containment.contain_origin(origin, data) do
IngesterWorker.enqueue("ingest", %{"object" => data})
@@ -85,7 +65,7 @@ defmodule Pleroma.Web.FedSockets.FedSocket do
end
defp process_package(%{"action" => "fetch_reply", "uuid" => uuid, "data" => data}, _fed_socket) do
- FetchRegistry.register_fetch_received(uuid, data)
+ FetchRegistry.receive_callback(uuid, data)
{:noreply, nil}
end
@@ -129,7 +109,8 @@ defmodule Pleroma.Web.FedSockets.FedSocket do
end
end
- defp send_packet(data, socket_pid) do
+ @spec send_packet(binary(), pid()) :: :ok
+ def send_packet(data, socket_pid) do
Process.send(socket_pid, {:send, data}, [])
end
diff --git a/lib/pleroma/web/fed_sockets/fetch_registry.ex b/lib/pleroma/web/fed_sockets/fetch_registry.ex
index 7897f0fc6..b6cc01252 100644
--- a/lib/pleroma/web/fed_sockets/fetch_registry.ex
+++ b/lib/pleroma/web/fed_sockets/fetch_registry.ex
@@ -23,129 +23,83 @@ defmodule Pleroma.Web.FedSockets.FetchRegistry do
"""
- defmodule FetchRegistryData do
- defstruct uuid: nil,
- sent_json: nil,
- received_json: nil,
- sent_at: nil,
- received_at: nil
- end
-
alias Ecto.UUID
+ alias Pleroma.Web.FedSockets.FedSocket
require Logger
@fetches :fed_socket_fetches
- @doc """
- Registers a json request wth the FetchRegistry and returns the identifying UUID.
- """
- def register_fetch(json) do
- %FetchRegistryData{uuid: uuid} =
- json
- |> new_registry_data
- |> save_registry_data
+ @type fetch_id :: Ecto.UUID.t()
- uuid
- end
+ @doc "Synchronous version of `fetch_async/2`"
+ @spec fetch(any(), pid(), pos_integer()) :: {:ok, any()} | {:error, :timeout}
+ def fetch(socket_pid, data, timeout) do
+ fetch_id = fetch_async(socket_pid, data)
- @doc """
- Reports on the status of a Fetch given the identifying UUID.
-
- Will return
- * {:ok, fetched_object} if a fetch has completed
- * {:error, :waiting} if a fetch is still pending
- * {:error, other_error} usually :missing to indicate a fetch that has timed out
- """
- def check_fetch(uuid) do
- case get_registry_data(uuid) do
- {:ok, %FetchRegistryData{received_at: nil}} ->
- {:error, :waiting}
-
- {:ok, %FetchRegistryData{} = reg_data} ->
- {:ok, reg_data}
-
- e ->
- e
+ receive do
+ {:fetch, ^fetch_id, response} -> {:ok, response}
+ after
+ timeout ->
+ cancel(fetch_id)
+ {:error, :timeout}
end
end
@doc """
- Retrieves the response to a fetch given the identifying UUID.
- The completed fetch will be deleted from the FetchRegistry
-
- Will return
- * {:ok, fetched_object} if a fetch has completed
- * {:error, :waiting} if a fetch is still pending
- * {:error, other_error} usually :missing to indicate a fetch that has timed out
+ Starts a fetch and returns it's id.
+ Once a reply to a fetch is received, the following message is sent
+ to the caller:
+ `{:fetch, fetch_id, reply}`
"""
- def pop_fetch(uuid) do
- case check_fetch(uuid) do
- {:ok, %FetchRegistryData{received_json: received_json}} ->
- delete_registry_data(uuid)
- {:ok, received_json}
-
- e ->
- e
- end
- end
-
- @doc """
- This is called to register a fetch has returned.
- It expects the result data along with the UUID that was sent in the request
+ @spec fetch_async(any(), pid()) :: fetch_id()
+ def fetch_async(socket_pid, data) do
+ send_to = self()
+ uuid = UUID.generate()
+
+ # Set up a sentinel process to cancel the fetch if the caller exits
+ # before finishing the fetch (i.e the fetch was requested while processing
+ # an http request, but the caller got killed because the client closed the
+ # connection)
+ sentinel =
+ spawn(fn ->
+ ref = Process.monitor(send_to)
+
+ receive do
+ {:DOWN, ^ref, _, _, _} -> cancel(uuid)
+ end
+ end)
+
+ {:ok, true} = Cachex.put(@fetches, uuid, {send_to, sentinel})
+
+ %{action: :fetch, data: data, uuid: uuid}
+ |> Jason.encode!()
+ |> FedSocket.send_packet(socket_pid)
- Will return the fetched object or :error
- """
- def register_fetch_received(uuid, data) do
- case get_registry_data(uuid) do
- {:ok, %FetchRegistryData{received_at: nil} = reg_data} ->
- reg_data
- |> set_fetch_received(data)
- |> save_registry_data()
-
- {:ok, %FetchRegistryData{} = reg_data} ->
- Logger.warn("tried to add fetched data twice - #{uuid}")
- reg_data
-
- {:error, _} ->
- Logger.warn("Error adding fetch to registry - #{uuid}")
- :error
- end
+ uuid
end
- defp new_registry_data(json) do
- %FetchRegistryData{
- uuid: UUID.generate(),
- sent_json: json,
- sent_at: :erlang.monotonic_time(:millisecond)
- }
+ @doc "Removes the fetch from the registry. Any responses to it will be ignored afterwards."
+ @spec cancel(fetch_id()) :: :ok
+ def cancel(id) do
+ {:ok, true} = Cachex.del(@fetches, id)
end
- defp get_registry_data(origin) do
- case Cachex.get(@fetches, origin) do
- {:ok, nil} ->
- {:error, :missing}
-
- {:ok, reg_data} ->
- {:ok, reg_data}
+ @doc "This is called to register a fetch has returned."
+ @spec receive_callback(fetch_id(), any()) :: :ok
+ def receive_callback(uuid, data) do
+ case Cachex.get(@fetches, uuid) do
+ {:ok, {caller, sentinel}} ->
+ :ok = cancel(uuid)
+ Process.exit(sentinel, :normal)
+ send(caller, {:fetch, uuid, data})
- _ ->
- {:error, :cache_error}
+ {:ok, nil} ->
+ Logger.debug(fn ->
+ "#{__MODULE__}: Got a reply to #{uuid}, but no such fetch is registered. This is probably a timeout."
+ end)
end
- end
-
- defp set_fetch_received(%FetchRegistryData{} = reg_data, data),
- do: %FetchRegistryData{
- reg_data
- | received_at: :erlang.monotonic_time(:millisecond),
- received_json: data
- }
- defp save_registry_data(%FetchRegistryData{uuid: uuid} = reg_data) do
- {:ok, true} = Cachex.put(@fetches, uuid, reg_data)
- reg_data
+ :ok
end
-
- defp delete_registry_data(origin),
- do: {:ok, true} = Cachex.del(@fetches, origin)
end
diff --git a/lib/pleroma/web/fed_sockets/outgoing_handler.ex b/lib/pleroma/web/fed_sockets/outgoing_handler.ex
index 6ddef17fe..b340fad3a 100644
--- a/lib/pleroma/web/fed_sockets/outgoing_handler.ex
+++ b/lib/pleroma/web/fed_sockets/outgoing_handler.ex
@@ -85,15 +85,19 @@ defmodule Pleroma.Web.FedSockets.OutgoingHandler do
%{host: host, port: port, path: path} = URI.parse(ws_uri)
- with {:ok, conn_pid} <- :gun.open(to_charlist(host), port),
+ with {:ok, conn_pid} <- :gun.open(to_charlist(host), port, %{protocols: [:http]}),
{:ok, _} <- :gun.await_up(conn_pid),
reference <- :gun.get(conn_pid, to_charlist(path)),
- {:response, :fin, 204, _} <- :gun.await(conn_pid, reference),
+ {:response, :fin, 204, _} <- :gun.await(conn_pid, reference) |> IO.inspect(),
+ :ok <- :gun.flush(conn_pid),
headers <- build_headers(uri),
ref <- :gun.ws_upgrade(conn_pid, to_charlist(path), headers, %{silence_pings: false}) do
receive do
{:gun_upgrade, ^conn_pid, ^ref, [<<"websocket">>], _} ->
{:ok, ws_uri, conn_pid}
+
+ mes ->
+ IO.inspect(mes)
after
15_000 ->
Logger.debug("Fedsocket timeout connecting to #{inspect(uri)}")
@@ -118,7 +122,7 @@ defmodule Pleroma.Web.FedSockets.OutgoingHandler do
shake_size = byte_size(shake)
signature_opts = %{
- "(request-target)": shake,
+ # "(request-target)": shake,
"content-length": to_charlist("#{shake_size}"),
date: date,
digest: digest,
@@ -131,8 +135,8 @@ defmodule Pleroma.Web.FedSockets.OutgoingHandler do
{'signature', to_charlist(signature)},
{'date', date},
{'digest', to_charlist(digest)},
- {'content-length', to_charlist("#{shake_size}")},
- {to_charlist("(request-target)"), to_charlist(shake)}
+ {'content-length', to_charlist("#{shake_size}")}
+ # {to_charlist("(request-target)"), to_charlist(shake)}
]
end