aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--config/config.exs2
-rw-r--r--lib/pleroma/object/fetcher.ex7
-rw-r--r--lib/pleroma/web/activity_pub/publisher.ex8
-rw-r--r--lib/pleroma/web/fed_sockets/adapter/cowboy.ex11
-rw-r--r--lib/pleroma/web/fed_sockets/fed_registry.ex185
-rw-r--r--lib/pleroma/web/fed_sockets/fed_socket.ex97
-rw-r--r--lib/pleroma/web/fed_sockets/fed_sockets.ex94
-rw-r--r--lib/pleroma/web/fed_sockets/fetch_registry.ex105
8 files changed, 19 insertions, 490 deletions
diff --git a/config/config.exs b/config/config.exs
index 00624bf00..7e3a3c848 100644
--- a/config/config.exs
+++ b/config/config.exs
@@ -130,7 +130,7 @@ config :pleroma, Pleroma.Web.Endpoint,
dispatch: [
{:_,
[
- {"/api/fedsocket/v1", Pleroma.Web.FedSockets.IncomingHandler, []},
+ {"/api/fedsocket/v1", Pleroma.Web.FedSockets.Adapter.Cowboy, []},
{"/api/v1/streaming", Pleroma.Web.MastodonAPI.WebsocketHandler, []},
{"/websocket", Phoenix.Endpoint.CowboyWebSocket,
{Phoenix.Transports.WebSocket,
diff --git a/lib/pleroma/object/fetcher.ex b/lib/pleroma/object/fetcher.ex
index eb9a4c478..9468087cb 100644
--- a/lib/pleroma/object/fetcher.ex
+++ b/lib/pleroma/object/fetcher.ex
@@ -192,8 +192,7 @@ defmodule Pleroma.Object.Fetcher do
Logger.debug("Fetching object #{id} via AP")
with {:scheme, true} <- {:scheme, String.starts_with?(id, "http")},
- {:ok, body} <- get_object(id, opts),
- {:ok, data} <- safe_json_decode(body),
+ {:ok, data} <- get_object(id, opts),
:ok <- Containment.contain_origin_from_id(id, data) do
{:ok, data}
else
@@ -214,7 +213,7 @@ defmodule Pleroma.Object.Fetcher do
defp get_object(id, opts) do
with false <- Keyword.get(opts, :force_http, false) do
Logger.debug("fetching via fedsocket - #{inspect(id)}")
- FedSockets.Registry.fetch(id)
+ FedSockets.fetch(id)
else
_other ->
Logger.debug("fetching via http - #{inspect(id)}")
@@ -232,7 +231,7 @@ defmodule Pleroma.Object.Fetcher do
case HTTP.get(id, headers) do
{:ok, %{body: body, status: code}} when code in 200..299 ->
- {:ok, body}
+ safe_json_decode(body)
{:ok, %{status: code}} when code in [404, 410] ->
{:error, "Object has been deleted"}
diff --git a/lib/pleroma/web/activity_pub/publisher.ex b/lib/pleroma/web/activity_pub/publisher.ex
index 9c3956683..4243f115d 100644
--- a/lib/pleroma/web/activity_pub/publisher.ex
+++ b/lib/pleroma/web/activity_pub/publisher.ex
@@ -51,10 +51,10 @@ defmodule Pleroma.Web.ActivityPub.Publisher do
def publish_one(%{inbox: inbox, json: json, actor: %User{} = actor, id: id} = params) do
Logger.debug("Federating #{id} to #{inbox}")
- case FedSockets.get_or_create_fed_socket(inbox) do
- {:ok, fedsocket} ->
- Logger.debug("publishing via fedsockets - #{inspect(inbox)}")
- FedSockets.publish(fedsocket, json)
+ case FedSockets.publish(inbox, json) do
+ :ok ->
+ Logger.debug("Published via FedSocket - #{inspect(inbox)}")
+ :ok
_ ->
Logger.debug("publishing via http - #{inspect(inbox)}")
diff --git a/lib/pleroma/web/fed_sockets/adapter/cowboy.ex b/lib/pleroma/web/fed_sockets/adapter/cowboy.ex
index b4f785c57..a732df159 100644
--- a/lib/pleroma/web/fed_sockets/adapter/cowboy.ex
+++ b/lib/pleroma/web/fed_sockets/adapter/cowboy.ex
@@ -30,7 +30,10 @@ defmodule Pleroma.Web.FedSockets.Adapter.Cowboy do
end
@impl true
- def publish(_socket, _data), do: {:error, :not_implemented}
+ def publish(pid, _, data) do
+ message = %{action: :publish, data: data}
+ send(pid, {:send, Jason.encode!(message)})
+ end
@impl true
def init(req, state) do
@@ -53,7 +56,9 @@ defmodule Pleroma.Web.FedSockets.Adapter.Cowboy do
else
{:has_request_target, headers} ->
Logger.debug(fn ->
- "#{__MODULE__}: Wrong or no \"(request-target)\" header. Rejecting websocket switch. Headers:\n#{inspect(headers)}"
+ "#{__MODULE__}: Wrong or no \"(request-target)\" header. Rejecting websocket switch. Headers:\n#{
+ inspect(headers)
+ }"
end)
:cowboy_req.reply(400, req)
@@ -115,7 +120,7 @@ defmodule Pleroma.Web.FedSockets.Adapter.Cowboy do
message ->
case Adapter.process_message(message, origin) do
:noreply -> {:ok, state}
- {:reply, data} -> {:reply, Jason.encode!(data), state}
+ {:reply, data} -> {:reply, {:text, Jason.encode!(data)}, state}
end
end
diff --git a/lib/pleroma/web/fed_sockets/fed_registry.ex b/lib/pleroma/web/fed_sockets/fed_registry.ex
deleted file mode 100644
index e00ea69c0..000000000
--- a/lib/pleroma/web/fed_sockets/fed_registry.ex
+++ /dev/null
@@ -1,185 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Web.FedSockets.FedRegistry do
- @moduledoc """
- The FedRegistry stores the active FedSockets for quick retrieval.
-
- The storage and retrieval portion of the FedRegistry is done in process through
- elixir's `Registry` module for speed and its ability to monitor for terminated processes.
-
- Dropped connections will be caught by `Registry` and deleted. Since the next
- message will initiate a new connection there is no reason to try and reconnect at that point.
-
- Normally outside modules should have no need to call or use the FedRegistry themselves.
- """
-
- alias Pleroma.Web.FedSockets.FedSocket
- alias Pleroma.Web.FedSockets.SocketInfo
-
- require Logger
-
- @default_rejection_duration 15 * 60 * 1000
- @rejections :fed_socket_rejections
-
- @doc """
- Retrieves a FedSocket from the Registry given it's origin.
-
- The origin is expected to be a string identifying the endpoint "example.com" or "example2.com:8080"
-
- Will return:
- * {:ok, fed_socket} for working FedSockets
- * {:error, :rejected} for origins that have been tried and refused within the rejection duration interval
- * {:error, some_reason} usually :missing for unknown origins
- """
- def get_fed_socket(origin) do
- case get_registry_data(origin) do
- {:error, reason} ->
- {:error, reason}
-
- {:ok, %{state: :connected} = socket_info} ->
- {:ok, socket_info}
- end
- end
-
- @doc """
- Adds a connected FedSocket to the Registry.
-
- Always returns {:ok, fed_socket}
- """
- def add_fed_socket(origin, pid \\ nil) do
- origin
- |> SocketInfo.build(pid)
- |> SocketInfo.connect()
- |> add_socket_info
- end
-
- defp add_socket_info(%{origin: origin, state: :connected} = socket_info) do
- case Registry.register(FedSockets.Registry, origin, socket_info) do
- {:ok, _owner} ->
- clear_prior_rejection(origin)
- Logger.debug("fedsocket added: #{inspect(origin)}")
-
- {:ok, socket_info}
-
- {:error, {:already_registered, _pid}} ->
- FedSocket.close(socket_info)
- existing_socket_info = Registry.lookup(FedSockets.Registry, origin)
-
- {:ok, existing_socket_info}
-
- _ ->
- {:error, :error_adding_socket}
- end
- end
-
- @doc """
- Mark this origin as having rejected a connection attempt.
- This will keep it from getting additional connection attempts
- for a period of time specified in the config.
-
- Always returns {:ok, new_reg_data}
- """
- def set_host_rejected(uri) do
- new_reg_data =
- uri
- |> SocketInfo.origin()
- |> get_or_create_registry_data()
- |> set_to_rejected()
- |> save_registry_data()
-
- {:ok, new_reg_data}
- end
-
- @doc """
- Retrieves the FedRegistryData from the Registry given it's origin.
-
- The origin is expected to be a string identifying the endpoint "example.com" or "example2.com:8080"
-
- Will return:
- * {:ok, fed_registry_data} for known origins
- * {:error, :missing} for uniknown origins
- * {:error, :cache_error} indicating some low level runtime issues
- """
- def get_registry_data(origin) do
- case Registry.lookup(FedSockets.Registry, origin) do
- [] ->
- if is_rejected?(origin) do
- Logger.debug("previously rejected fedsocket requested")
- {:error, :rejected}
- else
- {:error, :missing}
- end
-
- [{_pid, %{state: :connected} = socket_info}] ->
- {:ok, socket_info}
-
- _ ->
- {:error, :cache_error}
- end
- end
-
- @doc """
- Retrieves a map of all sockets from the Registry. The keys are the origins and the values are the corresponding SocketInfo
- """
- def list_all do
- (list_all_connected() ++ list_all_rejected())
- |> Enum.into(%{})
- end
-
- defp list_all_connected do
- FedSockets.Registry
- |> Registry.select([{{:"$1", :_, :"$3"}, [], [{{:"$1", :"$3"}}]}])
- end
-
- defp list_all_rejected do
- {:ok, keys} = Cachex.keys(@rejections)
-
- {:ok, registry_data} =
- Cachex.execute(@rejections, fn worker ->
- Enum.map(keys, fn k -> {k, Cachex.get!(worker, k)} end)
- end)
-
- registry_data
- end
-
- defp clear_prior_rejection(origin),
- do: Cachex.del(@rejections, origin)
-
- defp is_rejected?(origin) do
- case Cachex.get(@rejections, origin) do
- {:ok, nil} ->
- false
-
- {:ok, _} ->
- true
- end
- end
-
- defp get_or_create_registry_data(origin) do
- case get_registry_data(origin) do
- {:error, :missing} ->
- %SocketInfo{origin: origin}
-
- {:ok, socket_info} ->
- socket_info
- end
- end
-
- defp save_registry_data(%SocketInfo{origin: origin, state: :connected} = socket_info) do
- {:ok, true} = Registry.update_value(FedSockets.Registry, origin, fn _ -> socket_info end)
- socket_info
- end
-
- defp save_registry_data(%SocketInfo{origin: origin, state: :rejected} = socket_info) do
- rejection_expiration =
- Pleroma.Config.get([:fed_sockets, :rejection_duration], @default_rejection_duration)
-
- {:ok, true} = Cachex.put(@rejections, origin, socket_info, ttl: rejection_expiration)
- socket_info
- end
-
- defp set_to_rejected(%SocketInfo{} = socket_info),
- do: %SocketInfo{socket_info | state: :rejected}
-end
diff --git a/lib/pleroma/web/fed_sockets/fed_socket.ex b/lib/pleroma/web/fed_sockets/fed_socket.ex
index fa03b78cc..6231a102c 100644
--- a/lib/pleroma/web/fed_sockets/fed_socket.ex
+++ b/lib/pleroma/web/fed_sockets/fed_socket.ex
@@ -3,106 +3,9 @@
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.FedSockets.FedSocket do
- @moduledoc """
- The FedSocket module abstracts the actions to be taken taken on connections regardless of
- whether the connection started as inbound or outbound.
-
-
- Normally outside modules will have no need to call the FedSocket module directly.
- """
-
- alias Pleroma.Object
- alias Pleroma.Object.Containment
- alias Pleroma.User
- alias Pleroma.Web.ActivityPub.ObjectView
- alias Pleroma.Web.ActivityPub.UserView
- alias Pleroma.Web.ActivityPub.Visibility
- alias Pleroma.Web.FedSockets.IngesterWorker
- alias Pleroma.Web.FedSockets.OutgoingHandler
- alias Pleroma.Web.FedSockets.SocketInfo
-
require Logger
@shake "61dd18f7-f1e6-49a4-939a-a749fcdc1103"
- def connect_to_host(uri) do
- case OutgoingHandler.start_link(uri) do
- {:ok, pid} ->
- {:ok, pid}
-
- error ->
- {:error, error}
- end
- end
-
- def close(%SocketInfo{pid: socket_pid}),
- do: Process.send(socket_pid, :close, [])
-
- def publish(%SocketInfo{pid: socket_pid}, json) do
- %{action: :publish, data: json}
- |> Jason.encode!()
- |> send_packet(socket_pid)
- end
-
- def fetch(%SocketInfo{pid: socket_pid}, data) do
- _timeout = Pleroma.Config.get([:fed_sockets, :fetch_timeout], 12_000)
- OutgoingHandler.fetch(socket_pid, data)
- end
-
- def receive_package(%SocketInfo{} = fed_socket, json) do
- json
- |> Jason.decode!()
- |> process_package(fed_socket)
- end
-
- defp process_package(%{"action" => "publish", "data" => data}, %{origin: origin} = _fed_socket) do
- if Containment.contain_origin(origin, data) do
- IngesterWorker.enqueue("ingest", %{"object" => data})
- end
-
- {:reply, %{"action" => "publish_reply", "status" => "processed"}}
- end
-
- defp process_package(%{"action" => "fetch", "uuid" => uuid, "data" => ap_id}, _fed_socket) do
- {:ok, data} = render_fetched_data(ap_id, uuid)
- {:reply, data}
- end
-
- defp process_package(other, _fed_socket) do
- Logger.warn("unknown json packages received #{inspect(other)}")
- {:noreply, nil}
- end
-
- defp render_fetched_data(ap_id, uuid) do
- {:ok,
- %{
- "action" => "fetch_reply",
- "status" => "processed",
- "uuid" => uuid,
- "data" => represent_item(ap_id)
- }}
- end
-
- defp represent_item(ap_id) do
- case User.get_by_ap_id(ap_id) do
- nil ->
- object = Object.get_cached_by_ap_id(ap_id)
-
- if Visibility.is_public?(object) do
- Phoenix.View.render_to_string(ObjectView, "object.json", object: object)
- else
- nil
- end
-
- user ->
- Phoenix.View.render_to_string(UserView, "user.json", user: user)
- end
- end
-
- @spec send_packet(binary(), pid()) :: :ok
- def send_packet(data, socket_pid) do
- Process.send(socket_pid, {:send, data}, [])
- end
-
def shake, do: @shake
end
diff --git a/lib/pleroma/web/fed_sockets/fed_sockets.ex b/lib/pleroma/web/fed_sockets/fed_sockets.ex
index 1fd5899c8..868a4c4cd 100644
--- a/lib/pleroma/web/fed_sockets/fed_sockets.ex
+++ b/lib/pleroma/web/fed_sockets/fed_sockets.ex
@@ -80,9 +80,7 @@ defmodule Pleroma.Web.FedSockets do
"""
require Logger
- alias Pleroma.Web.FedSockets.FedRegistry
- alias Pleroma.Web.FedSockets.FedSocket
- alias Pleroma.Web.FedSockets.SocketInfo
+ alias Pleroma.Web.FedSockets.Registry
@doc """
returns a FedSocket for the given origin. Will reuse an existing one or create a new one.
@@ -93,93 +91,7 @@ defmodule Pleroma.Web.FedSockets do
It can and usually does include additional path parameters,
but these are ignored as the FedSockets are organized by host and port info alone.
"""
- def get_or_create_fed_socket(address) do
- with {:cache, {:error, :missing}} <- {:cache, get_fed_socket(address)},
- {:connect, {:ok, _pid}} <- {:connect, FedSocket.connect_to_host(address)},
- {:cache, {:ok, fed_socket}} <- {:cache, get_fed_socket(address)} do
- Logger.debug("fedsocket created for - #{inspect(address)}")
- {:ok, fed_socket}
- else
- {:cache, {:ok, socket}} ->
- Logger.debug("fedsocket found in cache - #{inspect(address)}")
- {:ok, socket}
-
- {:cache, {:error, :rejected} = e} ->
- e
-
- {:connect, {:error, _host}} ->
- Logger.debug("set host rejected for - #{inspect(address)}")
- FedRegistry.set_host_rejected(address)
- {:error, :rejected}
-
- {_, {:error, :disabled}} ->
- {:error, :disabled}
-
- {_, {:error, reason}} ->
- Logger.warn("get_or_create_fed_socket error - #{inspect(reason)}")
- {:error, reason}
- end
- end
-
- @doc """
- returns a FedSocket for the given origin. Will not create a new FedSocket if one does not exist.
-
- address is expected to be a fully formed URL such as:
- "http://www.example.com" or "http://www.example.com:8080"
- """
- def get_fed_socket(address) do
- origin = SocketInfo.origin(address)
-
- with {:config, true} <- {:config, Pleroma.Config.get([:fed_sockets, :enabled], false)},
- {:ok, socket} <- FedRegistry.get_fed_socket(origin) do
- {:ok, socket}
- else
- {:config, _} ->
- {:error, :disabled}
-
- {:error, :rejected} ->
- Logger.debug("FedSocket previously rejected - #{inspect(origin)}")
- {:error, :rejected}
-
- {:error, reason} ->
- {:error, reason}
- end
- end
-
- @doc """
- Sends the supplied data via the publish protocol.
- It will not block waiting for a reply.
- Returns :ok but this is not an indication of a successful transfer.
- the data is expected to be JSON encoded binary data.
- """
- def publish(%SocketInfo{} = fed_socket, json) do
- FedSocket.publish(fed_socket, json)
- end
-
- @doc """
- Sends the supplied data via the fetch protocol.
- It will block waiting for a reply or timeout.
-
- Returns {:ok, object} where object is the requested object (or nil)
- {:error, :timeout} in the event the message was not responded to
-
- the id is expected to be the URI of an ActivityPub object.
- """
- def fetch(%SocketInfo{} = fed_socket, id) do
- FedSocket.fetch(fed_socket, id)
- end
-
- @doc """
- Disconnect all and restart FedSockets.
- This is mainly used in development and testing but could be useful in production.
- """
- def reset do
- FedRegistry
- |> Process.whereis()
- |> Process.exit(:testing)
- end
-
- def uri_for_origin(origin),
- do: "ws://#{origin}/api/fedsocket/v1"
+ defdelegate fetch(id), to: Registry
+ defdelegate publish(inbox, data), to: Registry
end
diff --git a/lib/pleroma/web/fed_sockets/fetch_registry.ex b/lib/pleroma/web/fed_sockets/fetch_registry.ex
deleted file mode 100644
index b6cc01252..000000000
--- a/lib/pleroma/web/fed_sockets/fetch_registry.ex
+++ /dev/null
@@ -1,105 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Web.FedSockets.FetchRegistry do
- @moduledoc """
- The FetchRegistry acts as a broker for fetch requests and return values.
- This allows calling processes to block while waiting for a reply.
- It doesn't impose it's own process instead using `Cachex` to handle fetches in process, allowing
- multi threaded processes to avoid bottlenecking.
-
- Normally outside modules will have no need to call or use the FetchRegistry themselves.
-
- The `Cachex` parameters can be controlled from the config. Since exact timeout intervals
- aren't necessary the following settings are used by default:
-
- config :pleroma, :fed_sockets,
- fed_socket_fetches: [
- default: 12_000,
- interval: 3_000,
- lazy: false
- ]
-
- """
-
- alias Ecto.UUID
- alias Pleroma.Web.FedSockets.FedSocket
-
- require Logger
-
- @fetches :fed_socket_fetches
-
- @type fetch_id :: Ecto.UUID.t()
-
- @doc "Synchronous version of `fetch_async/2`"
- @spec fetch(any(), pid(), pos_integer()) :: {:ok, any()} | {:error, :timeout}
- def fetch(socket_pid, data, timeout) do
- fetch_id = fetch_async(socket_pid, data)
-
- receive do
- {:fetch, ^fetch_id, response} -> {:ok, response}
- after
- timeout ->
- cancel(fetch_id)
- {:error, :timeout}
- end
- end
-
- @doc """
- Starts a fetch and returns it's id.
- Once a reply to a fetch is received, the following message is sent
- to the caller:
- `{:fetch, fetch_id, reply}`
- """
- @spec fetch_async(any(), pid()) :: fetch_id()
- def fetch_async(socket_pid, data) do
- send_to = self()
- uuid = UUID.generate()
-
- # Set up a sentinel process to cancel the fetch if the caller exits
- # before finishing the fetch (i.e the fetch was requested while processing
- # an http request, but the caller got killed because the client closed the
- # connection)
- sentinel =
- spawn(fn ->
- ref = Process.monitor(send_to)
-
- receive do
- {:DOWN, ^ref, _, _, _} -> cancel(uuid)
- end
- end)
-
- {:ok, true} = Cachex.put(@fetches, uuid, {send_to, sentinel})
-
- %{action: :fetch, data: data, uuid: uuid}
- |> Jason.encode!()
- |> FedSocket.send_packet(socket_pid)
-
- uuid
- end
-
- @doc "Removes the fetch from the registry. Any responses to it will be ignored afterwards."
- @spec cancel(fetch_id()) :: :ok
- def cancel(id) do
- {:ok, true} = Cachex.del(@fetches, id)
- end
-
- @doc "This is called to register a fetch has returned."
- @spec receive_callback(fetch_id(), any()) :: :ok
- def receive_callback(uuid, data) do
- case Cachex.get(@fetches, uuid) do
- {:ok, {caller, sentinel}} ->
- :ok = cancel(uuid)
- Process.exit(sentinel, :normal)
- send(caller, {:fetch, uuid, data})
-
- {:ok, nil} ->
- Logger.debug(fn ->
- "#{__MODULE__}: Got a reply to #{uuid}, but no such fetch is registered. This is probably a timeout."
- end)
- end
-
- :ok
- end
-end