diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/pleroma/web/fed_sockets/fed_socket.ex | 31 | ||||
-rw-r--r-- | lib/pleroma/web/fed_sockets/fetch_registry.ex | 160 | ||||
-rw-r--r-- | lib/pleroma/web/fed_sockets/outgoing_handler.ex | 14 |
3 files changed, 72 insertions, 133 deletions
diff --git a/lib/pleroma/web/fed_sockets/fed_socket.ex b/lib/pleroma/web/fed_sockets/fed_socket.ex index 98d64e65a..57b17174a 100644 --- a/lib/pleroma/web/fed_sockets/fed_socket.ex +++ b/lib/pleroma/web/fed_sockets/fed_socket.ex @@ -45,14 +45,9 @@ defmodule Pleroma.Web.FedSockets.FedSocket do |> send_packet(socket_pid) end - def fetch(%SocketInfo{pid: socket_pid}, id) do - fetch_uuid = FetchRegistry.register_fetch(id) - - %{action: :fetch, data: id, uuid: fetch_uuid} - |> Jason.encode!() - |> send_packet(socket_pid) - - wait_for_fetch_to_return(fetch_uuid, 0) + def fetch(%SocketInfo{pid: socket_pid}, data) do + timeout = Pleroma.Config.get([:fed_sockets, :fetch_timeout], 12_000) + FetchRegistry.fetch(socket_pid, data, timeout) end def receive_package(%SocketInfo{} = fed_socket, json) do @@ -61,21 +56,6 @@ defmodule Pleroma.Web.FedSockets.FedSocket do |> process_package(fed_socket) end - defp wait_for_fetch_to_return(uuid, cntr) do - case FetchRegistry.check_fetch(uuid) do - {:error, :waiting} -> - Process.sleep(:math.pow(cntr, 3) |> Kernel.trunc()) - wait_for_fetch_to_return(uuid, cntr + 1) - - {:error, :missing} -> - Logger.error("FedSocket fetch timed out - #{inspect(uuid)}") - {:error, :timeout} - - {:ok, _fr} -> - FetchRegistry.pop_fetch(uuid) - end - end - defp process_package(%{"action" => "publish", "data" => data}, %{origin: origin} = _fed_socket) do if Containment.contain_origin(origin, data) do IngesterWorker.enqueue("ingest", %{"object" => data}) @@ -85,7 +65,7 @@ defmodule Pleroma.Web.FedSockets.FedSocket do end defp process_package(%{"action" => "fetch_reply", "uuid" => uuid, "data" => data}, _fed_socket) do - FetchRegistry.register_fetch_received(uuid, data) + FetchRegistry.receive_callback(uuid, data) {:noreply, nil} end @@ -129,7 +109,8 @@ defmodule Pleroma.Web.FedSockets.FedSocket do end end - defp send_packet(data, socket_pid) do + @spec send_packet(binary(), pid()) :: :ok + def send_packet(data, socket_pid) do Process.send(socket_pid, {:send, data}, []) end diff --git a/lib/pleroma/web/fed_sockets/fetch_registry.ex b/lib/pleroma/web/fed_sockets/fetch_registry.ex index 7897f0fc6..b6cc01252 100644 --- a/lib/pleroma/web/fed_sockets/fetch_registry.ex +++ b/lib/pleroma/web/fed_sockets/fetch_registry.ex @@ -23,129 +23,83 @@ defmodule Pleroma.Web.FedSockets.FetchRegistry do """ - defmodule FetchRegistryData do - defstruct uuid: nil, - sent_json: nil, - received_json: nil, - sent_at: nil, - received_at: nil - end - alias Ecto.UUID + alias Pleroma.Web.FedSockets.FedSocket require Logger @fetches :fed_socket_fetches - @doc """ - Registers a json request wth the FetchRegistry and returns the identifying UUID. - """ - def register_fetch(json) do - %FetchRegistryData{uuid: uuid} = - json - |> new_registry_data - |> save_registry_data + @type fetch_id :: Ecto.UUID.t() - uuid - end + @doc "Synchronous version of `fetch_async/2`" + @spec fetch(any(), pid(), pos_integer()) :: {:ok, any()} | {:error, :timeout} + def fetch(socket_pid, data, timeout) do + fetch_id = fetch_async(socket_pid, data) - @doc """ - Reports on the status of a Fetch given the identifying UUID. - - Will return - * {:ok, fetched_object} if a fetch has completed - * {:error, :waiting} if a fetch is still pending - * {:error, other_error} usually :missing to indicate a fetch that has timed out - """ - def check_fetch(uuid) do - case get_registry_data(uuid) do - {:ok, %FetchRegistryData{received_at: nil}} -> - {:error, :waiting} - - {:ok, %FetchRegistryData{} = reg_data} -> - {:ok, reg_data} - - e -> - e + receive do + {:fetch, ^fetch_id, response} -> {:ok, response} + after + timeout -> + cancel(fetch_id) + {:error, :timeout} end end @doc """ - Retrieves the response to a fetch given the identifying UUID. - The completed fetch will be deleted from the FetchRegistry - - Will return - * {:ok, fetched_object} if a fetch has completed - * {:error, :waiting} if a fetch is still pending - * {:error, other_error} usually :missing to indicate a fetch that has timed out + Starts a fetch and returns it's id. + Once a reply to a fetch is received, the following message is sent + to the caller: + `{:fetch, fetch_id, reply}` """ - def pop_fetch(uuid) do - case check_fetch(uuid) do - {:ok, %FetchRegistryData{received_json: received_json}} -> - delete_registry_data(uuid) - {:ok, received_json} - - e -> - e - end - end - - @doc """ - This is called to register a fetch has returned. - It expects the result data along with the UUID that was sent in the request + @spec fetch_async(any(), pid()) :: fetch_id() + def fetch_async(socket_pid, data) do + send_to = self() + uuid = UUID.generate() + + # Set up a sentinel process to cancel the fetch if the caller exits + # before finishing the fetch (i.e the fetch was requested while processing + # an http request, but the caller got killed because the client closed the + # connection) + sentinel = + spawn(fn -> + ref = Process.monitor(send_to) + + receive do + {:DOWN, ^ref, _, _, _} -> cancel(uuid) + end + end) + + {:ok, true} = Cachex.put(@fetches, uuid, {send_to, sentinel}) + + %{action: :fetch, data: data, uuid: uuid} + |> Jason.encode!() + |> FedSocket.send_packet(socket_pid) - Will return the fetched object or :error - """ - def register_fetch_received(uuid, data) do - case get_registry_data(uuid) do - {:ok, %FetchRegistryData{received_at: nil} = reg_data} -> - reg_data - |> set_fetch_received(data) - |> save_registry_data() - - {:ok, %FetchRegistryData{} = reg_data} -> - Logger.warn("tried to add fetched data twice - #{uuid}") - reg_data - - {:error, _} -> - Logger.warn("Error adding fetch to registry - #{uuid}") - :error - end + uuid end - defp new_registry_data(json) do - %FetchRegistryData{ - uuid: UUID.generate(), - sent_json: json, - sent_at: :erlang.monotonic_time(:millisecond) - } + @doc "Removes the fetch from the registry. Any responses to it will be ignored afterwards." + @spec cancel(fetch_id()) :: :ok + def cancel(id) do + {:ok, true} = Cachex.del(@fetches, id) end - defp get_registry_data(origin) do - case Cachex.get(@fetches, origin) do - {:ok, nil} -> - {:error, :missing} - - {:ok, reg_data} -> - {:ok, reg_data} + @doc "This is called to register a fetch has returned." + @spec receive_callback(fetch_id(), any()) :: :ok + def receive_callback(uuid, data) do + case Cachex.get(@fetches, uuid) do + {:ok, {caller, sentinel}} -> + :ok = cancel(uuid) + Process.exit(sentinel, :normal) + send(caller, {:fetch, uuid, data}) - _ -> - {:error, :cache_error} + {:ok, nil} -> + Logger.debug(fn -> + "#{__MODULE__}: Got a reply to #{uuid}, but no such fetch is registered. This is probably a timeout." + end) end - end - - defp set_fetch_received(%FetchRegistryData{} = reg_data, data), - do: %FetchRegistryData{ - reg_data - | received_at: :erlang.monotonic_time(:millisecond), - received_json: data - } - defp save_registry_data(%FetchRegistryData{uuid: uuid} = reg_data) do - {:ok, true} = Cachex.put(@fetches, uuid, reg_data) - reg_data + :ok end - - defp delete_registry_data(origin), - do: {:ok, true} = Cachex.del(@fetches, origin) end diff --git a/lib/pleroma/web/fed_sockets/outgoing_handler.ex b/lib/pleroma/web/fed_sockets/outgoing_handler.ex index 6ddef17fe..b340fad3a 100644 --- a/lib/pleroma/web/fed_sockets/outgoing_handler.ex +++ b/lib/pleroma/web/fed_sockets/outgoing_handler.ex @@ -85,15 +85,19 @@ defmodule Pleroma.Web.FedSockets.OutgoingHandler do %{host: host, port: port, path: path} = URI.parse(ws_uri) - with {:ok, conn_pid} <- :gun.open(to_charlist(host), port), + with {:ok, conn_pid} <- :gun.open(to_charlist(host), port, %{protocols: [:http]}), {:ok, _} <- :gun.await_up(conn_pid), reference <- :gun.get(conn_pid, to_charlist(path)), - {:response, :fin, 204, _} <- :gun.await(conn_pid, reference), + {:response, :fin, 204, _} <- :gun.await(conn_pid, reference) |> IO.inspect(), + :ok <- :gun.flush(conn_pid), headers <- build_headers(uri), ref <- :gun.ws_upgrade(conn_pid, to_charlist(path), headers, %{silence_pings: false}) do receive do {:gun_upgrade, ^conn_pid, ^ref, [<<"websocket">>], _} -> {:ok, ws_uri, conn_pid} + + mes -> + IO.inspect(mes) after 15_000 -> Logger.debug("Fedsocket timeout connecting to #{inspect(uri)}") @@ -118,7 +122,7 @@ defmodule Pleroma.Web.FedSockets.OutgoingHandler do shake_size = byte_size(shake) signature_opts = %{ - "(request-target)": shake, + # "(request-target)": shake, "content-length": to_charlist("#{shake_size}"), date: date, digest: digest, @@ -131,8 +135,8 @@ defmodule Pleroma.Web.FedSockets.OutgoingHandler do {'signature', to_charlist(signature)}, {'date', date}, {'digest', to_charlist(digest)}, - {'content-length', to_charlist("#{shake_size}")}, - {to_charlist("(request-target)"), to_charlist(shake)} + {'content-length', to_charlist("#{shake_size}")} + # {to_charlist("(request-target)"), to_charlist(shake)} ] end |