aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/pleroma/web/fed_sockets/adapter.ex76
-rw-r--r--lib/pleroma/web/fed_sockets/adapter/cowboy.ex45
-rw-r--r--lib/pleroma/web/fed_sockets/adapter/gun.ex73
-rw-r--r--lib/pleroma/web/fed_sockets/registry.ex6
4 files changed, 109 insertions, 91 deletions
diff --git a/lib/pleroma/web/fed_sockets/adapter.ex b/lib/pleroma/web/fed_sockets/adapter.ex
index 05e74fa78..3e260cc49 100644
--- a/lib/pleroma/web/fed_sockets/adapter.ex
+++ b/lib/pleroma/web/fed_sockets/adapter.ex
@@ -6,11 +6,12 @@ defmodule Pleroma.Web.FedSockets.Adapter do
@type adapter_state :: map()
- @doc "A synchronous fetch."
- @callback fetch(pid(), adapter_state(), term(), timeout()) :: {:ok, term()} | {:error, term()}
+ @doc """
+ Send a message through the socket and wait for answer.
+ Accepts a non-encoded message payload, except the uuid, it will be added automatically.
+ """
- @doc "An asynchronous publish."
- @callback publish(pid(), adapter_state(), term()) :: :ok | {:error, term()}
+ @callback request(pid(), adapter_state(), map(), timeout()) :: {:ok, term()} | {:error, term()}
alias Pleroma.Object
alias Pleroma.Object.Containment
@@ -20,63 +21,86 @@ defmodule Pleroma.Web.FedSockets.Adapter do
alias Pleroma.Web.ActivityPub.Visibility
alias Pleroma.Web.Federator
+ @spec fetch(pid(), module(), adapter_state(), term(), timeout()) ::
+ {:ok, term()} | {:error, term()}
+ def fetch(pid, adapter, adapter_state, id, timeout) do
+ data = %{action: :fetch, data: id}
+ apply(adapter, :request, [pid, adapter_state, data, timeout])
+ end
+
+ @spec publish(pid(), module(), adapter_state(), term(), timeout()) ::
+ {:ok, term()} | {:error, term()}
+ def publish(pid, adapter, adapter_state, data, timeout) do
+ data = %{action: :publish, data: data}
+ apply(adapter, :request, [pid, adapter_state, data, timeout])
+ end
+
@type origin :: String.t()
@type fetch_id :: integer()
- @type waiting_fetches :: %{required(fetch_id()) => pid()}
- @doc "Processes incoming messages. Returns {:reply, websocket_frame, waiting_fetches} or `{:noreply, waiting_fetches}`"
- @spec process_message(binary() | map(), origin(), waiting_fetches()) ::
- {:reply, term(), waiting_fetches()} | {:noreply, waiting_fetches()}
- def process_message(message, origin, waiting_fetches) when is_binary(message) do
+ @type waiting_requests :: %{required(fetch_id()) => pid()}
+ @doc "Processes incoming messages. Returns {:reply, websocket_frame,waiting_requests} or `{:noreply,waiting_requests}`"
+ @spec process_message(binary() | map(), origin(), waiting_requests()) ::
+ {:reply, term(), waiting_requests()} | {:noreply, waiting_requests()}
+ def process_message(message, origin, waiting_requests) when is_binary(message) do
case Jason.decode(message) do
- {:ok, message} -> do_process_message(message, origin, waiting_fetches)
+ {:ok, message} -> do_process_message(message, origin, waiting_requests)
# 1003 indicates that an endpoint is terminating the connection
# because it has received a type of data it cannot accept.
{:error, decode_error} -> {:reply, {:close, 1003, Exception.message(decode_error)}}
end
end
- def process_message(message, origin, waiting_fetches),
- do: do_process_message(message, origin, waiting_fetches)
+ def process_message(message, origin, waiting_requests),
+ do: do_process_message(message, origin, waiting_requests)
- defp do_process_message(%{"action" => "publish", "data" => data}, origin, waiting_fetches) do
+ defp do_process_message(
+ %{"action" => "publish", "data" => data, "uuid" => uuid},
+ origin,
+ waiting_requests
+ ) do
if Containment.contain_origin(origin, data) do
Federator.incoming_ap_doc(data)
end
- {:noreply, waiting_fetches}
+ data = %{
+ "action" => "reply",
+ "uuid" => uuid,
+ "data" => "ok"
+ }
+
+ {:reply, {:text, Jason.encode!(data)}, waiting_requests}
end
defp do_process_message(
%{"action" => "fetch", "uuid" => uuid, "data" => ap_id},
_,
- waiting_fetches
+ waiting_requests
) do
data = %{
- "action" => "fetch_reply",
- "status" => "processed",
+ "action" => "reply",
"uuid" => uuid,
"data" => represent_item(ap_id)
}
- {:reply, {:text, Jason.encode!(data)}, waiting_fetches}
+ {:reply, {:text, Jason.encode!(data)}, waiting_requests}
end
defp do_process_message(
- %{"action" => "fetch_reply", "uuid" => uuid, "data" => data},
+ %{"action" => "reply", "uuid" => uuid, "data" => data},
_,
- waiting_fetches
+ waiting_requests
) do
- with {pid, waiting_fetches} when is_pid(pid) <- Map.pop(waiting_fetches, uuid) do
- send(pid, {:fetch_reply, uuid, data})
- {:noreply, waiting_fetches}
+ with {pid, waiting_requests} when is_pid(pid) <- Map.pop(waiting_requests, uuid) do
+ send(pid, {:request_reply, uuid, data})
+ {:noreply, waiting_requests}
else
_ ->
- {:noreply, waiting_fetches}
+ {:noreply, waiting_requests}
end
end
- defp do_process_message(_, _, waiting_fetches) do
- {:reply, {:close, 1003, "Unknown message type."}, waiting_fetches}
+ defp do_process_message(_, _, waiting_requests) do
+ {:reply, {:close, 1003, "Unknown message type."}, waiting_requests}
end
defp represent_item(ap_id) do
diff --git a/lib/pleroma/web/fed_sockets/adapter/cowboy.ex b/lib/pleroma/web/fed_sockets/adapter/cowboy.ex
index a7b3835dd..cb721f29b 100644
--- a/lib/pleroma/web/fed_sockets/adapter/cowboy.ex
+++ b/lib/pleroma/web/fed_sockets/adapter/cowboy.ex
@@ -17,26 +17,19 @@ defmodule Pleroma.Web.FedSockets.Adapter.Cowboy do
@behaviour Adapter
@impl true
- def fetch(pid, %{last_fetch_id_ref: last_fetch_id_ref}, id, timeout) do
- fetch_id = :atomics.add_get(last_fetch_id_ref, 1, 1)
- message = %{action: :fetch, data: id, uuid: fetch_id}
- send(pid, {:send_fetch, Jason.encode!(message), fetch_id, self()})
+ def request(pid, %{last_request_id_ref: last_request_id_ref}, message, timeout) do
+ request_id = :atomics.add_get(last_request_id_ref, 1, 1)
+ message = Map.put(message, :uuid, request_id)
+ send(pid, {:send_request, Jason.encode!(message), request_id, self()})
receive do
- {:fetch_reply, ^fetch_id, data} -> {:ok, data}
+ {:request_reply, ^request_id, data} -> {:ok, data}
after
timeout -> {:error, :timeout}
end
end
@impl true
- def publish(pid, _, data) do
- message = %{action: :publish, data: data}
- send(pid, {:send, Jason.encode!(message)})
- :ok
- end
-
- @impl true
def init(req, state) do
shake = FedSocket.shake()
@@ -87,15 +80,15 @@ defmodule Pleroma.Web.FedSockets.Adapter.Cowboy do
key = Pleroma.Web.FedSockets.Registry.key_from_uri(URI.parse(origin))
# Since, unlike with gun, we don't have calls.
# We store last fetch id in an atomic counter and use casts.
- last_fetch_id_ref = :atomics.new(1, [])
- :ok = :atomics.put(last_fetch_id_ref, 1, 0)
+ last_request_id_ref = :atomics.new(1, [])
+ :ok = :atomics.put(last_request_id_ref, 1, 0)
case Registry.register(@registry, key, %Value{
adapter: __MODULE__,
- adapter_state: %{last_fetch_id_ref: last_fetch_id_ref}
+ adapter_state: %{last_request_id_ref: last_request_id_ref}
}) do
{:ok, _owner} ->
- {:ok, %{origin: origin, waiting_fetches: %{}}}
+ {:ok, %{origin: origin, waiting_requests: %{}}}
{:error, {:already_registered, _}} ->
{:stop, origin}
@@ -107,25 +100,25 @@ defmodule Pleroma.Web.FedSockets.Adapter.Cowboy do
def websocket_handle(
{:text, raw_message},
- %{origin: origin, waiting_fetches: waiting_fetches} = state
+ %{origin: origin, waiting_requests: waiting_requests} = state
) do
- case Adapter.process_message(raw_message, origin, waiting_fetches) do
- {:reply, frame, waiting_fetches} ->
- state = %{state | waiting_fetches: waiting_fetches}
+ case Adapter.process_message(raw_message, origin, waiting_requests) do
+ {:reply, frame, waiting_requests} ->
+ state = %{state | waiting_requests: waiting_requests}
{:reply, frame, state}
- {:noreply, waiting_fetches} ->
- {:ok, %{state | waiting_fetches: waiting_fetches}}
+ {:noreply, waiting_requests} ->
+ {:ok, %{state | waiting_requests: waiting_requests}}
end
end
@impl true
def websocket_info(
- {:send_fetch, message, fetch_id, pid},
- %{waiting_fetches: waiting_fetches} = state
+ {:send_request, message, request_id, pid},
+ %{waiting_requests: waiting_requests} = state
) do
- waiting_fetches = Map.put(waiting_fetches, fetch_id, pid)
- {:reply, {:text, message}, %{state | waiting_fetches: waiting_fetches}}
+ waiting_requests = Map.put(waiting_requests, request_id, pid)
+ {:reply, {:text, message}, %{state | waiting_requests: waiting_requests}}
end
@impl true
diff --git a/lib/pleroma/web/fed_sockets/adapter/gun.ex b/lib/pleroma/web/fed_sockets/adapter/gun.ex
index 2fd0a5aa4..937c10696 100644
--- a/lib/pleroma/web/fed_sockets/adapter/gun.ex
+++ b/lib/pleroma/web/fed_sockets/adapter/gun.ex
@@ -17,50 +17,46 @@ defmodule Pleroma.Web.FedSockets.Adapter.Gun do
@registry Pleroma.Web.FedSockets.Registry
@impl true
- def fetch(pid, state, id, timeout) do
- with {:ok, conn_pid, last_fetch_id_ref} <- await_connected(pid, state) do
- fetch_id = :atomics.add_get(last_fetch_id_ref, 1, 1)
- message = %{action: :fetch, data: id, uuid: fetch_id}
- send(pid, {:register_fetch, fetch_id, self()})
+ def request(pid, state, message, timeout) do
+ with {:ok, conn_pid, last_request_id_ref} <- await_connected(pid, state) do
+ request_id = :atomics.add_get(last_request_id_ref, 1, 1)
+ message = Map.put(message, :uuid, request_id)
+ send(pid, {:register_request, request_id, self()})
send_json(conn_pid, message)
receive do
- {:fetch_reply, ^fetch_id, data} -> {:ok, data}
+ {:request_reply, ^request_id, data} -> {:ok, data}
after
timeout -> {:error, :timeout}
end
end
end
- @impl true
- def publish(pid, state, data) do
- with {:ok, conn_pid, _} <- await_connected(pid, state) do
- send_json(conn_pid, %{action: :publish, data: data})
- end
- end
-
- defp await_connected(_pid, %{conn_pid: conn_pid, last_fetch_id_ref: last_fetch_id_ref}),
- do: {:ok, conn_pid, last_fetch_id_ref}
+ defp await_connected(_pid, %{conn_pid: conn_pid, last_request_id_ref: last_request_id_ref}),
+ do: {:ok, conn_pid, last_request_id_ref}
defp await_connected(pid, _) do
monitor = Process.monitor(pid)
GenServer.cast(pid, {:await_connected, self()})
receive do
- {:DOWN, ^monitor, _, _, reason} -> {:error, reason}
- {:await_connected, ^pid, conn_pid, last_fetch_id_ref} -> {:ok, conn_pid, last_fetch_id_ref}
+ {:DOWN, ^monitor, _, _, reason} ->
+ {:error, reason}
+
+ {:await_connected, ^pid, conn_pid, last_request_id_ref} ->
+ {:ok, conn_pid, last_request_id_ref}
end
end
def start_link([key | _] = opts) do
- last_fetch_id_ref = :atomics.new(1, [])
- :ok = :atomics.put(last_fetch_id_ref, 1, 0)
+ last_request_id_ref = :atomics.new(1, [])
+ :ok = :atomics.put(last_request_id_ref, 1, 0)
- GenServer.start_link(__MODULE__, [last_fetch_id_ref | opts],
+ GenServer.start_link(__MODULE__, [last_request_id_ref | opts],
name:
{:via, Registry,
{@registry, key,
- %Value{adapter: __MODULE__, adapter_state: %{last_fetch_id_ref: last_fetch_id_ref}}}}
+ %Value{adapter: __MODULE__, adapter_state: %{last_request_id_ref: last_request_id_ref}}}}
)
end
@@ -70,7 +66,7 @@ defmodule Pleroma.Web.FedSockets.Adapter.Gun do
end
@impl true
- def handle_continue({:connect, [last_fetch_id_ref, key, uri] = opts}, _) do
+ def handle_continue({:connect, [last_request_id_ref, key, uri] = opts}, _) do
case initiate_connection(uri) do
{:ok, conn_pid} ->
Registry.update_value(@registry, key, fn value ->
@@ -80,8 +76,8 @@ defmodule Pleroma.Web.FedSockets.Adapter.Gun do
{:noreply,
%{
conn_pid: conn_pid,
- waiting_fetches: %{},
- last_fetch_id_ref: last_fetch_id_ref,
+ waiting_requests: %{},
+ last_request_id_ref: last_request_id_ref,
origin: uri,
key: key
}}
@@ -95,9 +91,9 @@ defmodule Pleroma.Web.FedSockets.Adapter.Gun do
@impl true
def handle_cast(
{:await_connected, pid},
- %{conn_pid: conn_pid, last_fetch_id_ref: last_fetch_id_ref} = state
+ %{conn_pid: conn_pid, last_request_id_ref: last_request_id_ref} = state
) do
- send(pid, {:await_connected, self(), conn_pid, last_fetch_id_ref})
+ send(pid, {:await_connected, self(), conn_pid, last_request_id_ref})
{:noreply, state}
end
@@ -106,27 +102,30 @@ defmodule Pleroma.Web.FedSockets.Adapter.Gun do
end
@impl true
- def handle_info({:register_fetch, fetch_id, pid}, %{waiting_fetches: waiting_fetches} = state) do
- waiting_fetches = Map.put(waiting_fetches, fetch_id, pid)
- {:noreply, %{state | waiting_fetches: waiting_fetches}}
+ def handle_info(
+ {:register_request, request_id, pid},
+ %{waiting_requests: waiting_requests} = state
+ ) do
+ waiting_requests = Map.put(waiting_requests, request_id, pid)
+ {:noreply, %{state | waiting_requests: waiting_requests}}
end
@impl true
def handle_info(
{:gun_ws, _conn_pid, _ref, {:text, raw_message}},
- %{conn_pid: conn_pid, origin: origin, waiting_fetches: waiting_fetches} = state
+ %{conn_pid: conn_pid, origin: origin, waiting_requests: waiting_requests} = state
) do
- waiting_fetches =
- case Adapter.process_message(raw_message, origin, waiting_fetches) do
- {:reply, frame, waiting_fetches} ->
+ waiting_requests =
+ case Adapter.process_message(raw_message, origin, waiting_requests) do
+ {:reply, frame, waiting_requests} ->
:gun.ws_send(conn_pid, frame)
- waiting_fetches
+ waiting_requests
- {:noreply, waiting_fetches} ->
- waiting_fetches
+ {:noreply, waiting_requests} ->
+ waiting_requests
end
- {:noreply, %{state | waiting_fetches: waiting_fetches}}
+ {:noreply, %{state | waiting_requests: waiting_requests}}
end
@impl true
diff --git a/lib/pleroma/web/fed_sockets/registry.ex b/lib/pleroma/web/fed_sockets/registry.ex
index b8e254fd6..cb59e45c7 100644
--- a/lib/pleroma/web/fed_sockets/registry.ex
+++ b/lib/pleroma/web/fed_sockets/registry.ex
@@ -3,6 +3,7 @@ defmodule Pleroma.Web.FedSockets.Registry.Value do
end
defmodule Pleroma.Web.FedSockets.Registry do
+ alias Pleroma.Web.FedSockets.Adapter
alias Pleroma.Web.FedSockets.Registry.Value
@registry __MODULE__
@@ -11,7 +12,7 @@ defmodule Pleroma.Web.FedSockets.Registry do
def fetch(object_id) do
case get_socket(object_id) do
{:ok, pid, %Value{adapter: adapter, adapter_state: adapter_state}} ->
- apply(adapter, :fetch, [pid, adapter_state, object_id, 5_000])
+ Adapter.fetch(pid, adapter, adapter_state, object_id, 5_000)
e ->
e
@@ -22,7 +23,8 @@ defmodule Pleroma.Web.FedSockets.Registry do
def publish(inbox, data) do
case get_socket(inbox) do
{:ok, pid, %Value{adapter: adapter, adapter_state: adapter_state}} ->
- apply(adapter, :publish, [pid, adapter_state, data])
+ Adapter.publish(pid, adapter, adapter_state, data, 5_000)
+ |> IO.inspect(label: "publish reply")
e ->
e