diff options
-rw-r--r-- | lib/pleroma/web/activity_pub/publisher.ex | 3 | ||||
-rw-r--r-- | lib/pleroma/web/fed_sockets/adapter.ex | 49 | ||||
-rw-r--r-- | lib/pleroma/web/fed_sockets/adapter/cowboy.ex | 31 | ||||
-rw-r--r-- | lib/pleroma/web/fed_sockets/adapter/gun.ex | 115 |
4 files changed, 100 insertions, 98 deletions
diff --git a/lib/pleroma/web/activity_pub/publisher.ex b/lib/pleroma/web/activity_pub/publisher.ex index 4243f115d..e3b94c5cc 100644 --- a/lib/pleroma/web/activity_pub/publisher.ex +++ b/lib/pleroma/web/activity_pub/publisher.ex @@ -56,7 +56,8 @@ defmodule Pleroma.Web.ActivityPub.Publisher do Logger.debug("Published via FedSocket - #{inspect(inbox)}") :ok - _ -> + e -> + Logger.debug("Shit broke - #{inspect(e)}") Logger.debug("publishing via http - #{inspect(inbox)}") http_publish(inbox, actor, json, params) end diff --git a/lib/pleroma/web/fed_sockets/adapter.ex b/lib/pleroma/web/fed_sockets/adapter.ex index 5bd9caec9..00ab2d056 100644 --- a/lib/pleroma/web/fed_sockets/adapter.ex +++ b/lib/pleroma/web/fed_sockets/adapter.ex @@ -1,7 +1,7 @@ defmodule Pleroma.Web.FedSockets.Adapter do @moduledoc """ A behavior both types of sockets (server and client) should implement - and a collection of helper functions useful to both. + and a collection of helper functions useful to both """ @type adapter_state :: map() @@ -20,23 +20,30 @@ defmodule Pleroma.Web.FedSockets.Adapter do alias Pleroma.Web.ActivityPub.Visibility alias Pleroma.Web.FedSockets.IngesterWorker - @typedoc """ - Should be "fetch" or "publish" - """ - @type common_action :: String.t() @type origin :: String.t() - @doc "Process non adapter-specific messages." - @spec process_message(map(), origin()) :: - {:reply, term()} | :noreply | {:error, :unknown_action} - def process_message(%{"action" => "publish", "data" => data}, origin) do + @type fetch_id :: integer() + @type waiting_fetches :: %{required(fetch_id()) => pid()} + @doc "Processes incoming messages. Returns {:reply, websocket_frame, waiting_fetches} or `{:noreply, waiting_fetches}`" + @spec process_message(binary() | map(), origin(), waiting_fetches()) :: + {:reply, term(), waiting_fetches()} | {:noreply, waiting_fetches()} + def process_message(message, origin, waiting_fetches) when is_binary(message) do + case Jason.decode(message) do + {:ok, message} -> process_message(message, origin, waiting_fetches) + # 1003 indicates that an endpoint is terminating the connection + # because it has received a type of data it cannot accept. + {:error, decode_error} -> {:reply, {:close, 1003, Exception.message(decode_error)}} + end + end + + def process_message(%{"action" => "publish", "data" => data}, origin, waiting_fetches) do if Containment.contain_origin(origin, data) do IngesterWorker.enqueue("ingest", %{"object" => data}) end - :noreply + {:noreply, waiting_fetches} end - def process_message(%{"action" => "fetch", "uuid" => uuid, "data" => ap_id}, _) do + def process_message(%{"action" => "fetch", "uuid" => uuid, "data" => ap_id}, _, _) do data = %{ "action" => "fetch_reply", "status" => "processed", @@ -44,11 +51,25 @@ defmodule Pleroma.Web.FedSockets.Adapter do "data" => represent_item(ap_id) } - {:reply, data} + {:reply, {:text, Jason.encode!(data)}} + end + + def process_message( + %{"action" => "fetch_reply", "uuid" => uuid, "data" => data}, + _, + waiting_fetches + ) do + with {pid, waiting_fetches} when is_pid(pid) <- Map.pop(waiting_fetches, uuid) do + send(pid, {:fetch_reply, uuid, data}) + {:noreply, waiting_fetches} + else + _ -> + {:noreply, waiting_fetches} + end end - def process_message(_, _) do - {:error, :unknown_action} + def process_message(_, _, waiting_fetches) do + {:reply, {:close, 1003, "Unknown message type."}, waiting_fetches} end defp represent_item(ap_id) do diff --git a/lib/pleroma/web/fed_sockets/adapter/cowboy.ex b/lib/pleroma/web/fed_sockets/adapter/cowboy.ex index a732df159..880af929c 100644 --- a/lib/pleroma/web/fed_sockets/adapter/cowboy.ex +++ b/lib/pleroma/web/fed_sockets/adapter/cowboy.ex @@ -33,6 +33,7 @@ defmodule Pleroma.Web.FedSockets.Adapter.Cowboy do def publish(pid, _, data) do message = %{action: :publish, data: data} send(pid, {:send, Jason.encode!(message)}) + :ok end @impl true @@ -104,28 +105,16 @@ defmodule Pleroma.Web.FedSockets.Adapter.Cowboy do @impl true def websocket_handle(:ping, socket_info), do: {:ok, socket_info} - def websocket_handle({:text, raw_message}, %{origin: origin} = state) do - case Jason.decode(raw_message) do - {:ok, message} -> - case message do - %{"action" => "fetch_reply", "uuid" => uuid, "data" => data} -> - with {pid, waiting_fetches} when is_pid(pid) <- Map.pop(state.waiting_fetches, uuid) do - send(pid, {:fetch_reply, uuid, data}) - {:ok, %{state | waiting_fetches: waiting_fetches}} - else - _ -> - {:ok, state} - end - - message -> - case Adapter.process_message(message, origin) do - :noreply -> {:ok, state} - {:reply, data} -> {:reply, {:text, Jason.encode!(data)}, state} - end - end + def websocket_handle( + {:text, raw_message}, + %{origin: origin, waiting_fetches: waiting_fetches} = state + ) do + case Adapter.process_message(raw_message, origin, waiting_fetches) do + {:reply, frame, waiting_fetches} -> + {:reply, frame, %{state | waiting_fetches: waiting_fetches}} - {:error, decode_error} -> - exit({:malformed_message, decode_error}) + {:noreply, waiting_fetches} -> + {:ok, %{state | waiting_fetches: waiting_fetches}} end end diff --git a/lib/pleroma/web/fed_sockets/adapter/gun.ex b/lib/pleroma/web/fed_sockets/adapter/gun.ex index 51c4e6be9..dc8ae6db7 100644 --- a/lib/pleroma/web/fed_sockets/adapter/gun.ex +++ b/lib/pleroma/web/fed_sockets/adapter/gun.ex @@ -18,20 +18,29 @@ defmodule Pleroma.Web.FedSockets.Adapter.Gun do @impl true def fetch(pid, state, id, timeout) do - # TODO: refactor with atomics and encoding on the client - with {:ok, _} <- await_connected(pid, state) do - GenServer.call(pid, {:fetch, id}, timeout) + with {:ok, conn_pid, last_fetch_id_ref} <- await_connected(pid, state) do + fetch_id = :atomics.add_get(last_fetch_id_ref, 1, 1) + message = %{action: :fetch, data: id, uuid: fetch_id} + send(pid, {:register_fetch, fetch_id, self()}) + send_json(conn_pid, message) + + receive do + {:fetch_reply, ^fetch_id, data} -> {:ok, data} + after + timeout -> {:error, :timeout} + end end end @impl true def publish(pid, state, data) do - with {:ok, conn_pid} <- await_connected(pid, state) do + with {:ok, conn_pid, _} <- await_connected(pid, state) do send_json(conn_pid, %{action: :publish, data: data}) end end - defp await_connected(_pid, %{conn_pid: conn_pid}), do: {:ok, conn_pid} + defp await_connected(_pid, %{conn_pid: conn_pid, last_fetch_id_ref: last_fetch_id_ref}), + do: {:ok, conn_pid, last_fetch_id_ref} defp await_connected(pid, _) do monitor = Process.monitor(pid) @@ -40,13 +49,19 @@ defmodule Pleroma.Web.FedSockets.Adapter.Gun do receive do {:DOWN, ^monitor, _, _, {:shutdown, reason}} -> reason {:DOWN, ^monitor, _, _, reason} -> {:error, reason} - {:await_connected, ^pid, conn_pid} -> {:ok, conn_pid} + {:await_connected, ^pid, conn_pid, last_fetch_id_ref} -> {:ok, conn_pid, last_fetch_id_ref} end end def start_link([key | _] = opts) do - GenServer.start_link(__MODULE__, opts, - name: {:via, Registry, {@registry, key, %Value{adapter: __MODULE__, adapter_state: %{}}}} + last_fetch_id_ref = :atomics.new(1, []) + :ok = :atomics.put(last_fetch_id_ref, 1, 0) + + GenServer.start_link(__MODULE__, [last_fetch_id_ref | opts], + name: + {:via, Registry, + {@registry, key, + %Value{adapter: __MODULE__, adapter_state: %{last_fetch_id_ref: last_fetch_id_ref}}}} ) end @@ -56,15 +71,21 @@ defmodule Pleroma.Web.FedSockets.Adapter.Gun do end @impl true - def handle_continue({:connect, [key, uri] = opts}, _) do + def handle_continue({:connect, [last_fetch_id_ref, key, uri] = opts}, _) do case initiate_connection(uri) do {:ok, conn_pid} -> Registry.update_value(@registry, key, fn value -> - %{value | adapter_state: %{conn_pid: conn_pid}} + %{value | adapter_state: Map.put(value.adapter_state, :conn_pid, conn_pid)} end) {:noreply, - %{conn_pid: conn_pid, waiting_fetches: %{}, last_fetch_id: 0, origin: uri, key: key}} + %{ + conn_pid: conn_pid, + waiting_fetches: %{}, + last_fetch_id_ref: last_fetch_id_ref, + origin: uri, + key: key + }} {:error, reason} = e -> Logger.debug("Outgoing connection failed - #{inspect(reason)}") @@ -73,32 +94,12 @@ defmodule Pleroma.Web.FedSockets.Adapter.Gun do end @impl true - def handle_cast({:await_connected, pid}, %{conn_pid: conn_pid} = state) do - send(pid, {:await_connected, self(), conn_pid}) - {:noreply, state} - end - - @impl true - def handle_call( - {:fetch, data}, - from, - %{ - last_fetch_id: last_fetch_id, - conn_pid: conn_pid, - waiting_fetches: waiting_fetches - } = state + def handle_cast( + {:await_connected, pid}, + %{conn_pid: conn_pid, last_fetch_id_ref: last_fetch_id_ref} = state ) do - last_fetch_id = last_fetch_id + 1 - request = %{action: :fetch, data: data, uuid: last_fetch_id} - :ok = send_json(conn_pid, request) - waiting_fetches = Map.put(waiting_fetches, last_fetch_id, from) - - {:noreply, - %{ - state - | waiting_fetches: waiting_fetches, - last_fetch_id: last_fetch_id - }} + send(pid, {:await_connected, self(), conn_pid, last_fetch_id_ref}) + {:noreply, state} end defp send_json(conn_pid, data) do @@ -106,37 +107,27 @@ defmodule Pleroma.Web.FedSockets.Adapter.Gun do end @impl true + def handle_info({:register_fetch, fetch_id, pid}, %{waiting_fetches: waiting_fetches} = state) do + waiting_fetches = Map.put(waiting_fetches, fetch_id, pid) + {:noreply, %{state | waiting_fetches: waiting_fetches}} + end + + @impl true def handle_info( {:gun_ws, _conn_pid, _ref, {:text, raw_message}}, - %{conn_pid: conn_pid, origin: origin} = state + %{conn_pid: conn_pid, origin: origin, waiting_fetches: waiting_fetches} = state ) do - state = - case Jason.decode(raw_message) do - {:ok, message} -> - case message do - %{"action" => "fetch_reply", "uuid" => uuid, "data" => data} -> - with {{_, _} = client, waiting_fetches} <- Map.pop(state.waiting_fetches, uuid) do - GenServer.reply(client, {:ok, data}) - %{state | waiting_fetches: waiting_fetches} - else - _ -> - state - end - - message -> - case Adapter.process_message(message, origin) do - :noreply -> :noop - {:reply, data} -> send_json(conn_pid, data) - end - - state - end - - {:error, decode_error} -> - exit({:malformed_message, decode_error}) + waiting_fetches = + case Adapter.process_message(raw_message, origin, waiting_fetches) do + {:reply, frame, waiting_fetches} -> + :gun.ws_send(conn_pid, frame) + waiting_fetches + + {:noreply, waiting_fetches} -> + waiting_fetches end - {:noreply, state} + {:noreply, %{state | waiting_fetches: waiting_fetches}} end @impl true |