diff options
-rw-r--r-- | config/config.exs | 2 | ||||
-rw-r--r-- | lib/pleroma/object/fetcher.ex | 7 | ||||
-rw-r--r-- | lib/pleroma/web/activity_pub/publisher.ex | 8 | ||||
-rw-r--r-- | lib/pleroma/web/fed_sockets/adapter/cowboy.ex | 11 | ||||
-rw-r--r-- | lib/pleroma/web/fed_sockets/fed_registry.ex | 185 | ||||
-rw-r--r-- | lib/pleroma/web/fed_sockets/fed_socket.ex | 97 | ||||
-rw-r--r-- | lib/pleroma/web/fed_sockets/fed_sockets.ex | 94 | ||||
-rw-r--r-- | lib/pleroma/web/fed_sockets/fetch_registry.ex | 105 |
8 files changed, 19 insertions, 490 deletions
diff --git a/config/config.exs b/config/config.exs index 00624bf00..7e3a3c848 100644 --- a/config/config.exs +++ b/config/config.exs @@ -130,7 +130,7 @@ config :pleroma, Pleroma.Web.Endpoint, dispatch: [ {:_, [ - {"/api/fedsocket/v1", Pleroma.Web.FedSockets.IncomingHandler, []}, + {"/api/fedsocket/v1", Pleroma.Web.FedSockets.Adapter.Cowboy, []}, {"/api/v1/streaming", Pleroma.Web.MastodonAPI.WebsocketHandler, []}, {"/websocket", Phoenix.Endpoint.CowboyWebSocket, {Phoenix.Transports.WebSocket, diff --git a/lib/pleroma/object/fetcher.ex b/lib/pleroma/object/fetcher.ex index eb9a4c478..9468087cb 100644 --- a/lib/pleroma/object/fetcher.ex +++ b/lib/pleroma/object/fetcher.ex @@ -192,8 +192,7 @@ defmodule Pleroma.Object.Fetcher do Logger.debug("Fetching object #{id} via AP") with {:scheme, true} <- {:scheme, String.starts_with?(id, "http")}, - {:ok, body} <- get_object(id, opts), - {:ok, data} <- safe_json_decode(body), + {:ok, data} <- get_object(id, opts), :ok <- Containment.contain_origin_from_id(id, data) do {:ok, data} else @@ -214,7 +213,7 @@ defmodule Pleroma.Object.Fetcher do defp get_object(id, opts) do with false <- Keyword.get(opts, :force_http, false) do Logger.debug("fetching via fedsocket - #{inspect(id)}") - FedSockets.Registry.fetch(id) + FedSockets.fetch(id) else _other -> Logger.debug("fetching via http - #{inspect(id)}") @@ -232,7 +231,7 @@ defmodule Pleroma.Object.Fetcher do case HTTP.get(id, headers) do {:ok, %{body: body, status: code}} when code in 200..299 -> - {:ok, body} + safe_json_decode(body) {:ok, %{status: code}} when code in [404, 410] -> {:error, "Object has been deleted"} diff --git a/lib/pleroma/web/activity_pub/publisher.ex b/lib/pleroma/web/activity_pub/publisher.ex index 9c3956683..4243f115d 100644 --- a/lib/pleroma/web/activity_pub/publisher.ex +++ b/lib/pleroma/web/activity_pub/publisher.ex @@ -51,10 +51,10 @@ defmodule Pleroma.Web.ActivityPub.Publisher do def publish_one(%{inbox: inbox, json: json, actor: %User{} = actor, id: id} = params) do Logger.debug("Federating #{id} to #{inbox}") - case FedSockets.get_or_create_fed_socket(inbox) do - {:ok, fedsocket} -> - Logger.debug("publishing via fedsockets - #{inspect(inbox)}") - FedSockets.publish(fedsocket, json) + case FedSockets.publish(inbox, json) do + :ok -> + Logger.debug("Published via FedSocket - #{inspect(inbox)}") + :ok _ -> Logger.debug("publishing via http - #{inspect(inbox)}") diff --git a/lib/pleroma/web/fed_sockets/adapter/cowboy.ex b/lib/pleroma/web/fed_sockets/adapter/cowboy.ex index b4f785c57..a732df159 100644 --- a/lib/pleroma/web/fed_sockets/adapter/cowboy.ex +++ b/lib/pleroma/web/fed_sockets/adapter/cowboy.ex @@ -30,7 +30,10 @@ defmodule Pleroma.Web.FedSockets.Adapter.Cowboy do end @impl true - def publish(_socket, _data), do: {:error, :not_implemented} + def publish(pid, _, data) do + message = %{action: :publish, data: data} + send(pid, {:send, Jason.encode!(message)}) + end @impl true def init(req, state) do @@ -53,7 +56,9 @@ defmodule Pleroma.Web.FedSockets.Adapter.Cowboy do else {:has_request_target, headers} -> Logger.debug(fn -> - "#{__MODULE__}: Wrong or no \"(request-target)\" header. Rejecting websocket switch. Headers:\n#{inspect(headers)}" + "#{__MODULE__}: Wrong or no \"(request-target)\" header. Rejecting websocket switch. Headers:\n#{ + inspect(headers) + }" end) :cowboy_req.reply(400, req) @@ -115,7 +120,7 @@ defmodule Pleroma.Web.FedSockets.Adapter.Cowboy do message -> case Adapter.process_message(message, origin) do :noreply -> {:ok, state} - {:reply, data} -> {:reply, Jason.encode!(data), state} + {:reply, data} -> {:reply, {:text, Jason.encode!(data)}, state} end end diff --git a/lib/pleroma/web/fed_sockets/fed_registry.ex b/lib/pleroma/web/fed_sockets/fed_registry.ex deleted file mode 100644 index e00ea69c0..000000000 --- a/lib/pleroma/web/fed_sockets/fed_registry.ex +++ /dev/null @@ -1,185 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Web.FedSockets.FedRegistry do - @moduledoc """ - The FedRegistry stores the active FedSockets for quick retrieval. - - The storage and retrieval portion of the FedRegistry is done in process through - elixir's `Registry` module for speed and its ability to monitor for terminated processes. - - Dropped connections will be caught by `Registry` and deleted. Since the next - message will initiate a new connection there is no reason to try and reconnect at that point. - - Normally outside modules should have no need to call or use the FedRegistry themselves. - """ - - alias Pleroma.Web.FedSockets.FedSocket - alias Pleroma.Web.FedSockets.SocketInfo - - require Logger - - @default_rejection_duration 15 * 60 * 1000 - @rejections :fed_socket_rejections - - @doc """ - Retrieves a FedSocket from the Registry given it's origin. - - The origin is expected to be a string identifying the endpoint "example.com" or "example2.com:8080" - - Will return: - * {:ok, fed_socket} for working FedSockets - * {:error, :rejected} for origins that have been tried and refused within the rejection duration interval - * {:error, some_reason} usually :missing for unknown origins - """ - def get_fed_socket(origin) do - case get_registry_data(origin) do - {:error, reason} -> - {:error, reason} - - {:ok, %{state: :connected} = socket_info} -> - {:ok, socket_info} - end - end - - @doc """ - Adds a connected FedSocket to the Registry. - - Always returns {:ok, fed_socket} - """ - def add_fed_socket(origin, pid \\ nil) do - origin - |> SocketInfo.build(pid) - |> SocketInfo.connect() - |> add_socket_info - end - - defp add_socket_info(%{origin: origin, state: :connected} = socket_info) do - case Registry.register(FedSockets.Registry, origin, socket_info) do - {:ok, _owner} -> - clear_prior_rejection(origin) - Logger.debug("fedsocket added: #{inspect(origin)}") - - {:ok, socket_info} - - {:error, {:already_registered, _pid}} -> - FedSocket.close(socket_info) - existing_socket_info = Registry.lookup(FedSockets.Registry, origin) - - {:ok, existing_socket_info} - - _ -> - {:error, :error_adding_socket} - end - end - - @doc """ - Mark this origin as having rejected a connection attempt. - This will keep it from getting additional connection attempts - for a period of time specified in the config. - - Always returns {:ok, new_reg_data} - """ - def set_host_rejected(uri) do - new_reg_data = - uri - |> SocketInfo.origin() - |> get_or_create_registry_data() - |> set_to_rejected() - |> save_registry_data() - - {:ok, new_reg_data} - end - - @doc """ - Retrieves the FedRegistryData from the Registry given it's origin. - - The origin is expected to be a string identifying the endpoint "example.com" or "example2.com:8080" - - Will return: - * {:ok, fed_registry_data} for known origins - * {:error, :missing} for uniknown origins - * {:error, :cache_error} indicating some low level runtime issues - """ - def get_registry_data(origin) do - case Registry.lookup(FedSockets.Registry, origin) do - [] -> - if is_rejected?(origin) do - Logger.debug("previously rejected fedsocket requested") - {:error, :rejected} - else - {:error, :missing} - end - - [{_pid, %{state: :connected} = socket_info}] -> - {:ok, socket_info} - - _ -> - {:error, :cache_error} - end - end - - @doc """ - Retrieves a map of all sockets from the Registry. The keys are the origins and the values are the corresponding SocketInfo - """ - def list_all do - (list_all_connected() ++ list_all_rejected()) - |> Enum.into(%{}) - end - - defp list_all_connected do - FedSockets.Registry - |> Registry.select([{{:"$1", :_, :"$3"}, [], [{{:"$1", :"$3"}}]}]) - end - - defp list_all_rejected do - {:ok, keys} = Cachex.keys(@rejections) - - {:ok, registry_data} = - Cachex.execute(@rejections, fn worker -> - Enum.map(keys, fn k -> {k, Cachex.get!(worker, k)} end) - end) - - registry_data - end - - defp clear_prior_rejection(origin), - do: Cachex.del(@rejections, origin) - - defp is_rejected?(origin) do - case Cachex.get(@rejections, origin) do - {:ok, nil} -> - false - - {:ok, _} -> - true - end - end - - defp get_or_create_registry_data(origin) do - case get_registry_data(origin) do - {:error, :missing} -> - %SocketInfo{origin: origin} - - {:ok, socket_info} -> - socket_info - end - end - - defp save_registry_data(%SocketInfo{origin: origin, state: :connected} = socket_info) do - {:ok, true} = Registry.update_value(FedSockets.Registry, origin, fn _ -> socket_info end) - socket_info - end - - defp save_registry_data(%SocketInfo{origin: origin, state: :rejected} = socket_info) do - rejection_expiration = - Pleroma.Config.get([:fed_sockets, :rejection_duration], @default_rejection_duration) - - {:ok, true} = Cachex.put(@rejections, origin, socket_info, ttl: rejection_expiration) - socket_info - end - - defp set_to_rejected(%SocketInfo{} = socket_info), - do: %SocketInfo{socket_info | state: :rejected} -end diff --git a/lib/pleroma/web/fed_sockets/fed_socket.ex b/lib/pleroma/web/fed_sockets/fed_socket.ex index fa03b78cc..6231a102c 100644 --- a/lib/pleroma/web/fed_sockets/fed_socket.ex +++ b/lib/pleroma/web/fed_sockets/fed_socket.ex @@ -3,106 +3,9 @@ # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Web.FedSockets.FedSocket do - @moduledoc """ - The FedSocket module abstracts the actions to be taken taken on connections regardless of - whether the connection started as inbound or outbound. - - - Normally outside modules will have no need to call the FedSocket module directly. - """ - - alias Pleroma.Object - alias Pleroma.Object.Containment - alias Pleroma.User - alias Pleroma.Web.ActivityPub.ObjectView - alias Pleroma.Web.ActivityPub.UserView - alias Pleroma.Web.ActivityPub.Visibility - alias Pleroma.Web.FedSockets.IngesterWorker - alias Pleroma.Web.FedSockets.OutgoingHandler - alias Pleroma.Web.FedSockets.SocketInfo - require Logger @shake "61dd18f7-f1e6-49a4-939a-a749fcdc1103" - def connect_to_host(uri) do - case OutgoingHandler.start_link(uri) do - {:ok, pid} -> - {:ok, pid} - - error -> - {:error, error} - end - end - - def close(%SocketInfo{pid: socket_pid}), - do: Process.send(socket_pid, :close, []) - - def publish(%SocketInfo{pid: socket_pid}, json) do - %{action: :publish, data: json} - |> Jason.encode!() - |> send_packet(socket_pid) - end - - def fetch(%SocketInfo{pid: socket_pid}, data) do - _timeout = Pleroma.Config.get([:fed_sockets, :fetch_timeout], 12_000) - OutgoingHandler.fetch(socket_pid, data) - end - - def receive_package(%SocketInfo{} = fed_socket, json) do - json - |> Jason.decode!() - |> process_package(fed_socket) - end - - defp process_package(%{"action" => "publish", "data" => data}, %{origin: origin} = _fed_socket) do - if Containment.contain_origin(origin, data) do - IngesterWorker.enqueue("ingest", %{"object" => data}) - end - - {:reply, %{"action" => "publish_reply", "status" => "processed"}} - end - - defp process_package(%{"action" => "fetch", "uuid" => uuid, "data" => ap_id}, _fed_socket) do - {:ok, data} = render_fetched_data(ap_id, uuid) - {:reply, data} - end - - defp process_package(other, _fed_socket) do - Logger.warn("unknown json packages received #{inspect(other)}") - {:noreply, nil} - end - - defp render_fetched_data(ap_id, uuid) do - {:ok, - %{ - "action" => "fetch_reply", - "status" => "processed", - "uuid" => uuid, - "data" => represent_item(ap_id) - }} - end - - defp represent_item(ap_id) do - case User.get_by_ap_id(ap_id) do - nil -> - object = Object.get_cached_by_ap_id(ap_id) - - if Visibility.is_public?(object) do - Phoenix.View.render_to_string(ObjectView, "object.json", object: object) - else - nil - end - - user -> - Phoenix.View.render_to_string(UserView, "user.json", user: user) - end - end - - @spec send_packet(binary(), pid()) :: :ok - def send_packet(data, socket_pid) do - Process.send(socket_pid, {:send, data}, []) - end - def shake, do: @shake end diff --git a/lib/pleroma/web/fed_sockets/fed_sockets.ex b/lib/pleroma/web/fed_sockets/fed_sockets.ex index 1fd5899c8..868a4c4cd 100644 --- a/lib/pleroma/web/fed_sockets/fed_sockets.ex +++ b/lib/pleroma/web/fed_sockets/fed_sockets.ex @@ -80,9 +80,7 @@ defmodule Pleroma.Web.FedSockets do """ require Logger - alias Pleroma.Web.FedSockets.FedRegistry - alias Pleroma.Web.FedSockets.FedSocket - alias Pleroma.Web.FedSockets.SocketInfo + alias Pleroma.Web.FedSockets.Registry @doc """ returns a FedSocket for the given origin. Will reuse an existing one or create a new one. @@ -93,93 +91,7 @@ defmodule Pleroma.Web.FedSockets do It can and usually does include additional path parameters, but these are ignored as the FedSockets are organized by host and port info alone. """ - def get_or_create_fed_socket(address) do - with {:cache, {:error, :missing}} <- {:cache, get_fed_socket(address)}, - {:connect, {:ok, _pid}} <- {:connect, FedSocket.connect_to_host(address)}, - {:cache, {:ok, fed_socket}} <- {:cache, get_fed_socket(address)} do - Logger.debug("fedsocket created for - #{inspect(address)}") - {:ok, fed_socket} - else - {:cache, {:ok, socket}} -> - Logger.debug("fedsocket found in cache - #{inspect(address)}") - {:ok, socket} - - {:cache, {:error, :rejected} = e} -> - e - - {:connect, {:error, _host}} -> - Logger.debug("set host rejected for - #{inspect(address)}") - FedRegistry.set_host_rejected(address) - {:error, :rejected} - - {_, {:error, :disabled}} -> - {:error, :disabled} - - {_, {:error, reason}} -> - Logger.warn("get_or_create_fed_socket error - #{inspect(reason)}") - {:error, reason} - end - end - - @doc """ - returns a FedSocket for the given origin. Will not create a new FedSocket if one does not exist. - - address is expected to be a fully formed URL such as: - "http://www.example.com" or "http://www.example.com:8080" - """ - def get_fed_socket(address) do - origin = SocketInfo.origin(address) - - with {:config, true} <- {:config, Pleroma.Config.get([:fed_sockets, :enabled], false)}, - {:ok, socket} <- FedRegistry.get_fed_socket(origin) do - {:ok, socket} - else - {:config, _} -> - {:error, :disabled} - - {:error, :rejected} -> - Logger.debug("FedSocket previously rejected - #{inspect(origin)}") - {:error, :rejected} - - {:error, reason} -> - {:error, reason} - end - end - - @doc """ - Sends the supplied data via the publish protocol. - It will not block waiting for a reply. - Returns :ok but this is not an indication of a successful transfer. - the data is expected to be JSON encoded binary data. - """ - def publish(%SocketInfo{} = fed_socket, json) do - FedSocket.publish(fed_socket, json) - end - - @doc """ - Sends the supplied data via the fetch protocol. - It will block waiting for a reply or timeout. - - Returns {:ok, object} where object is the requested object (or nil) - {:error, :timeout} in the event the message was not responded to - - the id is expected to be the URI of an ActivityPub object. - """ - def fetch(%SocketInfo{} = fed_socket, id) do - FedSocket.fetch(fed_socket, id) - end - - @doc """ - Disconnect all and restart FedSockets. - This is mainly used in development and testing but could be useful in production. - """ - def reset do - FedRegistry - |> Process.whereis() - |> Process.exit(:testing) - end - - def uri_for_origin(origin), - do: "ws://#{origin}/api/fedsocket/v1" + defdelegate fetch(id), to: Registry + defdelegate publish(inbox, data), to: Registry end diff --git a/lib/pleroma/web/fed_sockets/fetch_registry.ex b/lib/pleroma/web/fed_sockets/fetch_registry.ex deleted file mode 100644 index b6cc01252..000000000 --- a/lib/pleroma/web/fed_sockets/fetch_registry.ex +++ /dev/null @@ -1,105 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Web.FedSockets.FetchRegistry do - @moduledoc """ - The FetchRegistry acts as a broker for fetch requests and return values. - This allows calling processes to block while waiting for a reply. - It doesn't impose it's own process instead using `Cachex` to handle fetches in process, allowing - multi threaded processes to avoid bottlenecking. - - Normally outside modules will have no need to call or use the FetchRegistry themselves. - - The `Cachex` parameters can be controlled from the config. Since exact timeout intervals - aren't necessary the following settings are used by default: - - config :pleroma, :fed_sockets, - fed_socket_fetches: [ - default: 12_000, - interval: 3_000, - lazy: false - ] - - """ - - alias Ecto.UUID - alias Pleroma.Web.FedSockets.FedSocket - - require Logger - - @fetches :fed_socket_fetches - - @type fetch_id :: Ecto.UUID.t() - - @doc "Synchronous version of `fetch_async/2`" - @spec fetch(any(), pid(), pos_integer()) :: {:ok, any()} | {:error, :timeout} - def fetch(socket_pid, data, timeout) do - fetch_id = fetch_async(socket_pid, data) - - receive do - {:fetch, ^fetch_id, response} -> {:ok, response} - after - timeout -> - cancel(fetch_id) - {:error, :timeout} - end - end - - @doc """ - Starts a fetch and returns it's id. - Once a reply to a fetch is received, the following message is sent - to the caller: - `{:fetch, fetch_id, reply}` - """ - @spec fetch_async(any(), pid()) :: fetch_id() - def fetch_async(socket_pid, data) do - send_to = self() - uuid = UUID.generate() - - # Set up a sentinel process to cancel the fetch if the caller exits - # before finishing the fetch (i.e the fetch was requested while processing - # an http request, but the caller got killed because the client closed the - # connection) - sentinel = - spawn(fn -> - ref = Process.monitor(send_to) - - receive do - {:DOWN, ^ref, _, _, _} -> cancel(uuid) - end - end) - - {:ok, true} = Cachex.put(@fetches, uuid, {send_to, sentinel}) - - %{action: :fetch, data: data, uuid: uuid} - |> Jason.encode!() - |> FedSocket.send_packet(socket_pid) - - uuid - end - - @doc "Removes the fetch from the registry. Any responses to it will be ignored afterwards." - @spec cancel(fetch_id()) :: :ok - def cancel(id) do - {:ok, true} = Cachex.del(@fetches, id) - end - - @doc "This is called to register a fetch has returned." - @spec receive_callback(fetch_id(), any()) :: :ok - def receive_callback(uuid, data) do - case Cachex.get(@fetches, uuid) do - {:ok, {caller, sentinel}} -> - :ok = cancel(uuid) - Process.exit(sentinel, :normal) - send(caller, {:fetch, uuid, data}) - - {:ok, nil} -> - Logger.debug(fn -> - "#{__MODULE__}: Got a reply to #{uuid}, but no such fetch is registered. This is probably a timeout." - end) - end - - :ok - end -end |