From e2fdf0e6dcf71ca43822689900aaaf9aab4f625e Mon Sep 17 00:00:00 2001 From: rinpatch Date: Wed, 23 Sep 2020 17:33:36 +0300 Subject: tmp --- lib/pleroma/web/fed_sockets/adapter.ex | 76 ++++++++++++++++++--------- lib/pleroma/web/fed_sockets/adapter/cowboy.ex | 45 +++++++--------- lib/pleroma/web/fed_sockets/adapter/gun.ex | 73 +++++++++++++------------ lib/pleroma/web/fed_sockets/registry.ex | 6 ++- 4 files changed, 109 insertions(+), 91 deletions(-) diff --git a/lib/pleroma/web/fed_sockets/adapter.ex b/lib/pleroma/web/fed_sockets/adapter.ex index 05e74fa78..3e260cc49 100644 --- a/lib/pleroma/web/fed_sockets/adapter.ex +++ b/lib/pleroma/web/fed_sockets/adapter.ex @@ -6,11 +6,12 @@ defmodule Pleroma.Web.FedSockets.Adapter do @type adapter_state :: map() - @doc "A synchronous fetch." - @callback fetch(pid(), adapter_state(), term(), timeout()) :: {:ok, term()} | {:error, term()} + @doc """ + Send a message through the socket and wait for answer. + Accepts a non-encoded message payload, except the uuid, it will be added automatically. + """ - @doc "An asynchronous publish." - @callback publish(pid(), adapter_state(), term()) :: :ok | {:error, term()} + @callback request(pid(), adapter_state(), map(), timeout()) :: {:ok, term()} | {:error, term()} alias Pleroma.Object alias Pleroma.Object.Containment @@ -20,63 +21,86 @@ defmodule Pleroma.Web.FedSockets.Adapter do alias Pleroma.Web.ActivityPub.Visibility alias Pleroma.Web.Federator + @spec fetch(pid(), module(), adapter_state(), term(), timeout()) :: + {:ok, term()} | {:error, term()} + def fetch(pid, adapter, adapter_state, id, timeout) do + data = %{action: :fetch, data: id} + apply(adapter, :request, [pid, adapter_state, data, timeout]) + end + + @spec publish(pid(), module(), adapter_state(), term(), timeout()) :: + {:ok, term()} | {:error, term()} + def publish(pid, adapter, adapter_state, data, timeout) do + data = %{action: :publish, data: data} + apply(adapter, :request, [pid, adapter_state, data, timeout]) + end + @type origin :: String.t() @type fetch_id :: integer() - @type waiting_fetches :: %{required(fetch_id()) => pid()} - @doc "Processes incoming messages. Returns {:reply, websocket_frame, waiting_fetches} or `{:noreply, waiting_fetches}`" - @spec process_message(binary() | map(), origin(), waiting_fetches()) :: - {:reply, term(), waiting_fetches()} | {:noreply, waiting_fetches()} - def process_message(message, origin, waiting_fetches) when is_binary(message) do + @type waiting_requests :: %{required(fetch_id()) => pid()} + @doc "Processes incoming messages. Returns {:reply, websocket_frame,waiting_requests} or `{:noreply,waiting_requests}`" + @spec process_message(binary() | map(), origin(), waiting_requests()) :: + {:reply, term(), waiting_requests()} | {:noreply, waiting_requests()} + def process_message(message, origin, waiting_requests) when is_binary(message) do case Jason.decode(message) do - {:ok, message} -> do_process_message(message, origin, waiting_fetches) + {:ok, message} -> do_process_message(message, origin, waiting_requests) # 1003 indicates that an endpoint is terminating the connection # because it has received a type of data it cannot accept. {:error, decode_error} -> {:reply, {:close, 1003, Exception.message(decode_error)}} end end - def process_message(message, origin, waiting_fetches), - do: do_process_message(message, origin, waiting_fetches) + def process_message(message, origin, waiting_requests), + do: do_process_message(message, origin, waiting_requests) - defp do_process_message(%{"action" => "publish", "data" => data}, origin, waiting_fetches) do + defp do_process_message( + %{"action" => "publish", "data" => data, "uuid" => uuid}, + origin, + waiting_requests + ) do if Containment.contain_origin(origin, data) do Federator.incoming_ap_doc(data) end - {:noreply, waiting_fetches} + data = %{ + "action" => "reply", + "uuid" => uuid, + "data" => "ok" + } + + {:reply, {:text, Jason.encode!(data)}, waiting_requests} end defp do_process_message( %{"action" => "fetch", "uuid" => uuid, "data" => ap_id}, _, - waiting_fetches + waiting_requests ) do data = %{ - "action" => "fetch_reply", - "status" => "processed", + "action" => "reply", "uuid" => uuid, "data" => represent_item(ap_id) } - {:reply, {:text, Jason.encode!(data)}, waiting_fetches} + {:reply, {:text, Jason.encode!(data)}, waiting_requests} end defp do_process_message( - %{"action" => "fetch_reply", "uuid" => uuid, "data" => data}, + %{"action" => "reply", "uuid" => uuid, "data" => data}, _, - waiting_fetches + waiting_requests ) do - with {pid, waiting_fetches} when is_pid(pid) <- Map.pop(waiting_fetches, uuid) do - send(pid, {:fetch_reply, uuid, data}) - {:noreply, waiting_fetches} + with {pid, waiting_requests} when is_pid(pid) <- Map.pop(waiting_requests, uuid) do + send(pid, {:request_reply, uuid, data}) + {:noreply, waiting_requests} else _ -> - {:noreply, waiting_fetches} + {:noreply, waiting_requests} end end - defp do_process_message(_, _, waiting_fetches) do - {:reply, {:close, 1003, "Unknown message type."}, waiting_fetches} + defp do_process_message(_, _, waiting_requests) do + {:reply, {:close, 1003, "Unknown message type."}, waiting_requests} end defp represent_item(ap_id) do diff --git a/lib/pleroma/web/fed_sockets/adapter/cowboy.ex b/lib/pleroma/web/fed_sockets/adapter/cowboy.ex index a7b3835dd..cb721f29b 100644 --- a/lib/pleroma/web/fed_sockets/adapter/cowboy.ex +++ b/lib/pleroma/web/fed_sockets/adapter/cowboy.ex @@ -17,25 +17,18 @@ defmodule Pleroma.Web.FedSockets.Adapter.Cowboy do @behaviour Adapter @impl true - def fetch(pid, %{last_fetch_id_ref: last_fetch_id_ref}, id, timeout) do - fetch_id = :atomics.add_get(last_fetch_id_ref, 1, 1) - message = %{action: :fetch, data: id, uuid: fetch_id} - send(pid, {:send_fetch, Jason.encode!(message), fetch_id, self()}) + def request(pid, %{last_request_id_ref: last_request_id_ref}, message, timeout) do + request_id = :atomics.add_get(last_request_id_ref, 1, 1) + message = Map.put(message, :uuid, request_id) + send(pid, {:send_request, Jason.encode!(message), request_id, self()}) receive do - {:fetch_reply, ^fetch_id, data} -> {:ok, data} + {:request_reply, ^request_id, data} -> {:ok, data} after timeout -> {:error, :timeout} end end - @impl true - def publish(pid, _, data) do - message = %{action: :publish, data: data} - send(pid, {:send, Jason.encode!(message)}) - :ok - end - @impl true def init(req, state) do shake = FedSocket.shake() @@ -87,15 +80,15 @@ defmodule Pleroma.Web.FedSockets.Adapter.Cowboy do key = Pleroma.Web.FedSockets.Registry.key_from_uri(URI.parse(origin)) # Since, unlike with gun, we don't have calls. # We store last fetch id in an atomic counter and use casts. - last_fetch_id_ref = :atomics.new(1, []) - :ok = :atomics.put(last_fetch_id_ref, 1, 0) + last_request_id_ref = :atomics.new(1, []) + :ok = :atomics.put(last_request_id_ref, 1, 0) case Registry.register(@registry, key, %Value{ adapter: __MODULE__, - adapter_state: %{last_fetch_id_ref: last_fetch_id_ref} + adapter_state: %{last_request_id_ref: last_request_id_ref} }) do {:ok, _owner} -> - {:ok, %{origin: origin, waiting_fetches: %{}}} + {:ok, %{origin: origin, waiting_requests: %{}}} {:error, {:already_registered, _}} -> {:stop, origin} @@ -107,25 +100,25 @@ defmodule Pleroma.Web.FedSockets.Adapter.Cowboy do def websocket_handle( {:text, raw_message}, - %{origin: origin, waiting_fetches: waiting_fetches} = state + %{origin: origin, waiting_requests: waiting_requests} = state ) do - case Adapter.process_message(raw_message, origin, waiting_fetches) do - {:reply, frame, waiting_fetches} -> - state = %{state | waiting_fetches: waiting_fetches} + case Adapter.process_message(raw_message, origin, waiting_requests) do + {:reply, frame, waiting_requests} -> + state = %{state | waiting_requests: waiting_requests} {:reply, frame, state} - {:noreply, waiting_fetches} -> - {:ok, %{state | waiting_fetches: waiting_fetches}} + {:noreply, waiting_requests} -> + {:ok, %{state | waiting_requests: waiting_requests}} end end @impl true def websocket_info( - {:send_fetch, message, fetch_id, pid}, - %{waiting_fetches: waiting_fetches} = state + {:send_request, message, request_id, pid}, + %{waiting_requests: waiting_requests} = state ) do - waiting_fetches = Map.put(waiting_fetches, fetch_id, pid) - {:reply, {:text, message}, %{state | waiting_fetches: waiting_fetches}} + waiting_requests = Map.put(waiting_requests, request_id, pid) + {:reply, {:text, message}, %{state | waiting_requests: waiting_requests}} end @impl true diff --git a/lib/pleroma/web/fed_sockets/adapter/gun.ex b/lib/pleroma/web/fed_sockets/adapter/gun.ex index 2fd0a5aa4..937c10696 100644 --- a/lib/pleroma/web/fed_sockets/adapter/gun.ex +++ b/lib/pleroma/web/fed_sockets/adapter/gun.ex @@ -17,50 +17,46 @@ defmodule Pleroma.Web.FedSockets.Adapter.Gun do @registry Pleroma.Web.FedSockets.Registry @impl true - def fetch(pid, state, id, timeout) do - with {:ok, conn_pid, last_fetch_id_ref} <- await_connected(pid, state) do - fetch_id = :atomics.add_get(last_fetch_id_ref, 1, 1) - message = %{action: :fetch, data: id, uuid: fetch_id} - send(pid, {:register_fetch, fetch_id, self()}) + def request(pid, state, message, timeout) do + with {:ok, conn_pid, last_request_id_ref} <- await_connected(pid, state) do + request_id = :atomics.add_get(last_request_id_ref, 1, 1) + message = Map.put(message, :uuid, request_id) + send(pid, {:register_request, request_id, self()}) send_json(conn_pid, message) receive do - {:fetch_reply, ^fetch_id, data} -> {:ok, data} + {:request_reply, ^request_id, data} -> {:ok, data} after timeout -> {:error, :timeout} end end end - @impl true - def publish(pid, state, data) do - with {:ok, conn_pid, _} <- await_connected(pid, state) do - send_json(conn_pid, %{action: :publish, data: data}) - end - end - - defp await_connected(_pid, %{conn_pid: conn_pid, last_fetch_id_ref: last_fetch_id_ref}), - do: {:ok, conn_pid, last_fetch_id_ref} + defp await_connected(_pid, %{conn_pid: conn_pid, last_request_id_ref: last_request_id_ref}), + do: {:ok, conn_pid, last_request_id_ref} defp await_connected(pid, _) do monitor = Process.monitor(pid) GenServer.cast(pid, {:await_connected, self()}) receive do - {:DOWN, ^monitor, _, _, reason} -> {:error, reason} - {:await_connected, ^pid, conn_pid, last_fetch_id_ref} -> {:ok, conn_pid, last_fetch_id_ref} + {:DOWN, ^monitor, _, _, reason} -> + {:error, reason} + + {:await_connected, ^pid, conn_pid, last_request_id_ref} -> + {:ok, conn_pid, last_request_id_ref} end end def start_link([key | _] = opts) do - last_fetch_id_ref = :atomics.new(1, []) - :ok = :atomics.put(last_fetch_id_ref, 1, 0) + last_request_id_ref = :atomics.new(1, []) + :ok = :atomics.put(last_request_id_ref, 1, 0) - GenServer.start_link(__MODULE__, [last_fetch_id_ref | opts], + GenServer.start_link(__MODULE__, [last_request_id_ref | opts], name: {:via, Registry, {@registry, key, - %Value{adapter: __MODULE__, adapter_state: %{last_fetch_id_ref: last_fetch_id_ref}}}} + %Value{adapter: __MODULE__, adapter_state: %{last_request_id_ref: last_request_id_ref}}}} ) end @@ -70,7 +66,7 @@ defmodule Pleroma.Web.FedSockets.Adapter.Gun do end @impl true - def handle_continue({:connect, [last_fetch_id_ref, key, uri] = opts}, _) do + def handle_continue({:connect, [last_request_id_ref, key, uri] = opts}, _) do case initiate_connection(uri) do {:ok, conn_pid} -> Registry.update_value(@registry, key, fn value -> @@ -80,8 +76,8 @@ defmodule Pleroma.Web.FedSockets.Adapter.Gun do {:noreply, %{ conn_pid: conn_pid, - waiting_fetches: %{}, - last_fetch_id_ref: last_fetch_id_ref, + waiting_requests: %{}, + last_request_id_ref: last_request_id_ref, origin: uri, key: key }} @@ -95,9 +91,9 @@ defmodule Pleroma.Web.FedSockets.Adapter.Gun do @impl true def handle_cast( {:await_connected, pid}, - %{conn_pid: conn_pid, last_fetch_id_ref: last_fetch_id_ref} = state + %{conn_pid: conn_pid, last_request_id_ref: last_request_id_ref} = state ) do - send(pid, {:await_connected, self(), conn_pid, last_fetch_id_ref}) + send(pid, {:await_connected, self(), conn_pid, last_request_id_ref}) {:noreply, state} end @@ -106,27 +102,30 @@ defmodule Pleroma.Web.FedSockets.Adapter.Gun do end @impl true - def handle_info({:register_fetch, fetch_id, pid}, %{waiting_fetches: waiting_fetches} = state) do - waiting_fetches = Map.put(waiting_fetches, fetch_id, pid) - {:noreply, %{state | waiting_fetches: waiting_fetches}} + def handle_info( + {:register_request, request_id, pid}, + %{waiting_requests: waiting_requests} = state + ) do + waiting_requests = Map.put(waiting_requests, request_id, pid) + {:noreply, %{state | waiting_requests: waiting_requests}} end @impl true def handle_info( {:gun_ws, _conn_pid, _ref, {:text, raw_message}}, - %{conn_pid: conn_pid, origin: origin, waiting_fetches: waiting_fetches} = state + %{conn_pid: conn_pid, origin: origin, waiting_requests: waiting_requests} = state ) do - waiting_fetches = - case Adapter.process_message(raw_message, origin, waiting_fetches) do - {:reply, frame, waiting_fetches} -> + waiting_requests = + case Adapter.process_message(raw_message, origin, waiting_requests) do + {:reply, frame, waiting_requests} -> :gun.ws_send(conn_pid, frame) - waiting_fetches + waiting_requests - {:noreply, waiting_fetches} -> - waiting_fetches + {:noreply, waiting_requests} -> + waiting_requests end - {:noreply, %{state | waiting_fetches: waiting_fetches}} + {:noreply, %{state | waiting_requests: waiting_requests}} end @impl true diff --git a/lib/pleroma/web/fed_sockets/registry.ex b/lib/pleroma/web/fed_sockets/registry.ex index b8e254fd6..cb59e45c7 100644 --- a/lib/pleroma/web/fed_sockets/registry.ex +++ b/lib/pleroma/web/fed_sockets/registry.ex @@ -3,6 +3,7 @@ defmodule Pleroma.Web.FedSockets.Registry.Value do end defmodule Pleroma.Web.FedSockets.Registry do + alias Pleroma.Web.FedSockets.Adapter alias Pleroma.Web.FedSockets.Registry.Value @registry __MODULE__ @@ -11,7 +12,7 @@ defmodule Pleroma.Web.FedSockets.Registry do def fetch(object_id) do case get_socket(object_id) do {:ok, pid, %Value{adapter: adapter, adapter_state: adapter_state}} -> - apply(adapter, :fetch, [pid, adapter_state, object_id, 5_000]) + Adapter.fetch(pid, adapter, adapter_state, object_id, 5_000) e -> e @@ -22,7 +23,8 @@ defmodule Pleroma.Web.FedSockets.Registry do def publish(inbox, data) do case get_socket(inbox) do {:ok, pid, %Value{adapter: adapter, adapter_state: adapter_state}} -> - apply(adapter, :publish, [pid, adapter_state, data]) + Adapter.publish(pid, adapter, adapter_state, data, 5_000) + |> IO.inspect(label: "publish reply") e -> e -- cgit v1.2.3