aboutsummaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorrinpatch <rinpatch@sdf.org>2020-09-22 21:23:13 +0300
committerrinpatch <rinpatch@sdf.org>2020-09-22 21:23:13 +0300
commit5fdfd9f2ee9afac24360b1a8d65e90f8dc1691cb (patch)
tree20f880e8e10220d19fefa1e3f0a18417c95ca82c /lib
parent656741787e1a383a4cf6efc5fe4da57bac8c501a (diff)
downloadpleroma-5fdfd9f2ee9afac24360b1a8d65e90f8dc1691cb.tar.gz
tmp
Diffstat (limited to 'lib')
-rw-r--r--lib/pleroma/web/activity_pub/publisher.ex3
-rw-r--r--lib/pleroma/web/fed_sockets/adapter.ex49
-rw-r--r--lib/pleroma/web/fed_sockets/adapter/cowboy.ex31
-rw-r--r--lib/pleroma/web/fed_sockets/adapter/gun.ex115
4 files changed, 100 insertions, 98 deletions
diff --git a/lib/pleroma/web/activity_pub/publisher.ex b/lib/pleroma/web/activity_pub/publisher.ex
index 4243f115d..e3b94c5cc 100644
--- a/lib/pleroma/web/activity_pub/publisher.ex
+++ b/lib/pleroma/web/activity_pub/publisher.ex
@@ -56,7 +56,8 @@ defmodule Pleroma.Web.ActivityPub.Publisher do
Logger.debug("Published via FedSocket - #{inspect(inbox)}")
:ok
- _ ->
+ e ->
+ Logger.debug("Shit broke - #{inspect(e)}")
Logger.debug("publishing via http - #{inspect(inbox)}")
http_publish(inbox, actor, json, params)
end
diff --git a/lib/pleroma/web/fed_sockets/adapter.ex b/lib/pleroma/web/fed_sockets/adapter.ex
index 5bd9caec9..00ab2d056 100644
--- a/lib/pleroma/web/fed_sockets/adapter.ex
+++ b/lib/pleroma/web/fed_sockets/adapter.ex
@@ -1,7 +1,7 @@
defmodule Pleroma.Web.FedSockets.Adapter do
@moduledoc """
A behavior both types of sockets (server and client) should implement
- and a collection of helper functions useful to both.
+ and a collection of helper functions useful to both
"""
@type adapter_state :: map()
@@ -20,23 +20,30 @@ defmodule Pleroma.Web.FedSockets.Adapter do
alias Pleroma.Web.ActivityPub.Visibility
alias Pleroma.Web.FedSockets.IngesterWorker
- @typedoc """
- Should be "fetch" or "publish"
- """
- @type common_action :: String.t()
@type origin :: String.t()
- @doc "Process non adapter-specific messages."
- @spec process_message(map(), origin()) ::
- {:reply, term()} | :noreply | {:error, :unknown_action}
- def process_message(%{"action" => "publish", "data" => data}, origin) do
+ @type fetch_id :: integer()
+ @type waiting_fetches :: %{required(fetch_id()) => pid()}
+ @doc "Processes incoming messages. Returns {:reply, websocket_frame, waiting_fetches} or `{:noreply, waiting_fetches}`"
+ @spec process_message(binary() | map(), origin(), waiting_fetches()) ::
+ {:reply, term(), waiting_fetches()} | {:noreply, waiting_fetches()}
+ def process_message(message, origin, waiting_fetches) when is_binary(message) do
+ case Jason.decode(message) do
+ {:ok, message} -> process_message(message, origin, waiting_fetches)
+ # 1003 indicates that an endpoint is terminating the connection
+ # because it has received a type of data it cannot accept.
+ {:error, decode_error} -> {:reply, {:close, 1003, Exception.message(decode_error)}}
+ end
+ end
+
+ def process_message(%{"action" => "publish", "data" => data}, origin, waiting_fetches) do
if Containment.contain_origin(origin, data) do
IngesterWorker.enqueue("ingest", %{"object" => data})
end
- :noreply
+ {:noreply, waiting_fetches}
end
- def process_message(%{"action" => "fetch", "uuid" => uuid, "data" => ap_id}, _) do
+ def process_message(%{"action" => "fetch", "uuid" => uuid, "data" => ap_id}, _, _) do
data = %{
"action" => "fetch_reply",
"status" => "processed",
@@ -44,11 +51,25 @@ defmodule Pleroma.Web.FedSockets.Adapter do
"data" => represent_item(ap_id)
}
- {:reply, data}
+ {:reply, {:text, Jason.encode!(data)}}
+ end
+
+ def process_message(
+ %{"action" => "fetch_reply", "uuid" => uuid, "data" => data},
+ _,
+ waiting_fetches
+ ) do
+ with {pid, waiting_fetches} when is_pid(pid) <- Map.pop(waiting_fetches, uuid) do
+ send(pid, {:fetch_reply, uuid, data})
+ {:noreply, waiting_fetches}
+ else
+ _ ->
+ {:noreply, waiting_fetches}
+ end
end
- def process_message(_, _) do
- {:error, :unknown_action}
+ def process_message(_, _, waiting_fetches) do
+ {:reply, {:close, 1003, "Unknown message type."}, waiting_fetches}
end
defp represent_item(ap_id) do
diff --git a/lib/pleroma/web/fed_sockets/adapter/cowboy.ex b/lib/pleroma/web/fed_sockets/adapter/cowboy.ex
index a732df159..880af929c 100644
--- a/lib/pleroma/web/fed_sockets/adapter/cowboy.ex
+++ b/lib/pleroma/web/fed_sockets/adapter/cowboy.ex
@@ -33,6 +33,7 @@ defmodule Pleroma.Web.FedSockets.Adapter.Cowboy do
def publish(pid, _, data) do
message = %{action: :publish, data: data}
send(pid, {:send, Jason.encode!(message)})
+ :ok
end
@impl true
@@ -104,28 +105,16 @@ defmodule Pleroma.Web.FedSockets.Adapter.Cowboy do
@impl true
def websocket_handle(:ping, socket_info), do: {:ok, socket_info}
- def websocket_handle({:text, raw_message}, %{origin: origin} = state) do
- case Jason.decode(raw_message) do
- {:ok, message} ->
- case message do
- %{"action" => "fetch_reply", "uuid" => uuid, "data" => data} ->
- with {pid, waiting_fetches} when is_pid(pid) <- Map.pop(state.waiting_fetches, uuid) do
- send(pid, {:fetch_reply, uuid, data})
- {:ok, %{state | waiting_fetches: waiting_fetches}}
- else
- _ ->
- {:ok, state}
- end
-
- message ->
- case Adapter.process_message(message, origin) do
- :noreply -> {:ok, state}
- {:reply, data} -> {:reply, {:text, Jason.encode!(data)}, state}
- end
- end
+ def websocket_handle(
+ {:text, raw_message},
+ %{origin: origin, waiting_fetches: waiting_fetches} = state
+ ) do
+ case Adapter.process_message(raw_message, origin, waiting_fetches) do
+ {:reply, frame, waiting_fetches} ->
+ {:reply, frame, %{state | waiting_fetches: waiting_fetches}}
- {:error, decode_error} ->
- exit({:malformed_message, decode_error})
+ {:noreply, waiting_fetches} ->
+ {:ok, %{state | waiting_fetches: waiting_fetches}}
end
end
diff --git a/lib/pleroma/web/fed_sockets/adapter/gun.ex b/lib/pleroma/web/fed_sockets/adapter/gun.ex
index 51c4e6be9..dc8ae6db7 100644
--- a/lib/pleroma/web/fed_sockets/adapter/gun.ex
+++ b/lib/pleroma/web/fed_sockets/adapter/gun.ex
@@ -18,20 +18,29 @@ defmodule Pleroma.Web.FedSockets.Adapter.Gun do
@impl true
def fetch(pid, state, id, timeout) do
- # TODO: refactor with atomics and encoding on the client
- with {:ok, _} <- await_connected(pid, state) do
- GenServer.call(pid, {:fetch, id}, timeout)
+ with {:ok, conn_pid, last_fetch_id_ref} <- await_connected(pid, state) do
+ fetch_id = :atomics.add_get(last_fetch_id_ref, 1, 1)
+ message = %{action: :fetch, data: id, uuid: fetch_id}
+ send(pid, {:register_fetch, fetch_id, self()})
+ send_json(conn_pid, message)
+
+ receive do
+ {:fetch_reply, ^fetch_id, data} -> {:ok, data}
+ after
+ timeout -> {:error, :timeout}
+ end
end
end
@impl true
def publish(pid, state, data) do
- with {:ok, conn_pid} <- await_connected(pid, state) do
+ with {:ok, conn_pid, _} <- await_connected(pid, state) do
send_json(conn_pid, %{action: :publish, data: data})
end
end
- defp await_connected(_pid, %{conn_pid: conn_pid}), do: {:ok, conn_pid}
+ defp await_connected(_pid, %{conn_pid: conn_pid, last_fetch_id_ref: last_fetch_id_ref}),
+ do: {:ok, conn_pid, last_fetch_id_ref}
defp await_connected(pid, _) do
monitor = Process.monitor(pid)
@@ -40,13 +49,19 @@ defmodule Pleroma.Web.FedSockets.Adapter.Gun do
receive do
{:DOWN, ^monitor, _, _, {:shutdown, reason}} -> reason
{:DOWN, ^monitor, _, _, reason} -> {:error, reason}
- {:await_connected, ^pid, conn_pid} -> {:ok, conn_pid}
+ {:await_connected, ^pid, conn_pid, last_fetch_id_ref} -> {:ok, conn_pid, last_fetch_id_ref}
end
end
def start_link([key | _] = opts) do
- GenServer.start_link(__MODULE__, opts,
- name: {:via, Registry, {@registry, key, %Value{adapter: __MODULE__, adapter_state: %{}}}}
+ last_fetch_id_ref = :atomics.new(1, [])
+ :ok = :atomics.put(last_fetch_id_ref, 1, 0)
+
+ GenServer.start_link(__MODULE__, [last_fetch_id_ref | opts],
+ name:
+ {:via, Registry,
+ {@registry, key,
+ %Value{adapter: __MODULE__, adapter_state: %{last_fetch_id_ref: last_fetch_id_ref}}}}
)
end
@@ -56,15 +71,21 @@ defmodule Pleroma.Web.FedSockets.Adapter.Gun do
end
@impl true
- def handle_continue({:connect, [key, uri] = opts}, _) do
+ def handle_continue({:connect, [last_fetch_id_ref, key, uri] = opts}, _) do
case initiate_connection(uri) do
{:ok, conn_pid} ->
Registry.update_value(@registry, key, fn value ->
- %{value | adapter_state: %{conn_pid: conn_pid}}
+ %{value | adapter_state: Map.put(value.adapter_state, :conn_pid, conn_pid)}
end)
{:noreply,
- %{conn_pid: conn_pid, waiting_fetches: %{}, last_fetch_id: 0, origin: uri, key: key}}
+ %{
+ conn_pid: conn_pid,
+ waiting_fetches: %{},
+ last_fetch_id_ref: last_fetch_id_ref,
+ origin: uri,
+ key: key
+ }}
{:error, reason} = e ->
Logger.debug("Outgoing connection failed - #{inspect(reason)}")
@@ -73,32 +94,12 @@ defmodule Pleroma.Web.FedSockets.Adapter.Gun do
end
@impl true
- def handle_cast({:await_connected, pid}, %{conn_pid: conn_pid} = state) do
- send(pid, {:await_connected, self(), conn_pid})
- {:noreply, state}
- end
-
- @impl true
- def handle_call(
- {:fetch, data},
- from,
- %{
- last_fetch_id: last_fetch_id,
- conn_pid: conn_pid,
- waiting_fetches: waiting_fetches
- } = state
+ def handle_cast(
+ {:await_connected, pid},
+ %{conn_pid: conn_pid, last_fetch_id_ref: last_fetch_id_ref} = state
) do
- last_fetch_id = last_fetch_id + 1
- request = %{action: :fetch, data: data, uuid: last_fetch_id}
- :ok = send_json(conn_pid, request)
- waiting_fetches = Map.put(waiting_fetches, last_fetch_id, from)
-
- {:noreply,
- %{
- state
- | waiting_fetches: waiting_fetches,
- last_fetch_id: last_fetch_id
- }}
+ send(pid, {:await_connected, self(), conn_pid, last_fetch_id_ref})
+ {:noreply, state}
end
defp send_json(conn_pid, data) do
@@ -106,37 +107,27 @@ defmodule Pleroma.Web.FedSockets.Adapter.Gun do
end
@impl true
+ def handle_info({:register_fetch, fetch_id, pid}, %{waiting_fetches: waiting_fetches} = state) do
+ waiting_fetches = Map.put(waiting_fetches, fetch_id, pid)
+ {:noreply, %{state | waiting_fetches: waiting_fetches}}
+ end
+
+ @impl true
def handle_info(
{:gun_ws, _conn_pid, _ref, {:text, raw_message}},
- %{conn_pid: conn_pid, origin: origin} = state
+ %{conn_pid: conn_pid, origin: origin, waiting_fetches: waiting_fetches} = state
) do
- state =
- case Jason.decode(raw_message) do
- {:ok, message} ->
- case message do
- %{"action" => "fetch_reply", "uuid" => uuid, "data" => data} ->
- with {{_, _} = client, waiting_fetches} <- Map.pop(state.waiting_fetches, uuid) do
- GenServer.reply(client, {:ok, data})
- %{state | waiting_fetches: waiting_fetches}
- else
- _ ->
- state
- end
-
- message ->
- case Adapter.process_message(message, origin) do
- :noreply -> :noop
- {:reply, data} -> send_json(conn_pid, data)
- end
-
- state
- end
-
- {:error, decode_error} ->
- exit({:malformed_message, decode_error})
+ waiting_fetches =
+ case Adapter.process_message(raw_message, origin, waiting_fetches) do
+ {:reply, frame, waiting_fetches} ->
+ :gun.ws_send(conn_pid, frame)
+ waiting_fetches
+
+ {:noreply, waiting_fetches} ->
+ waiting_fetches
end
- {:noreply, state}
+ {:noreply, %{state | waiting_fetches: waiting_fetches}}
end
@impl true