diff options
author | rinpatch <rinpatch@sdf.org> | 2020-09-20 14:15:34 +0300 |
---|---|---|
committer | rinpatch <rinpatch@sdf.org> | 2020-09-20 14:15:34 +0300 |
commit | cd139dbd86570ca4c4fd9784d64b0a5a4ed24dff (patch) | |
tree | 08c1260e13249c7040bd8400974a3c148d51ba51 | |
parent | 3cd9ded4744974d5ac0b4d351536815e4c0db56b (diff) | |
download | pleroma-cd139dbd86570ca4c4fd9784d64b0a5a4ed24dff.tar.gz |
tmp
-rw-r--r-- | lib/pleroma/web/fed_sockets/fed_socket.ex | 14 | ||||
-rw-r--r-- | lib/pleroma/web/fed_sockets/socket/client.ex (renamed from lib/pleroma/web/fed_sockets/outgoing_handler.ex) | 126 | ||||
-rw-r--r-- | lib/pleroma/web/fed_sockets/socket/server.ex (renamed from lib/pleroma/web/fed_sockets/incoming_handler.ex) | 49 | ||||
-rw-r--r-- | lib/pleroma/web/fed_sockets/supervisor.ex | 6 |
4 files changed, 128 insertions, 67 deletions
diff --git a/lib/pleroma/web/fed_sockets/fed_socket.ex b/lib/pleroma/web/fed_sockets/fed_socket.ex index 57b17174a..fa03b78cc 100644 --- a/lib/pleroma/web/fed_sockets/fed_socket.ex +++ b/lib/pleroma/web/fed_sockets/fed_socket.ex @@ -17,7 +17,6 @@ defmodule Pleroma.Web.FedSockets.FedSocket do alias Pleroma.Web.ActivityPub.ObjectView alias Pleroma.Web.ActivityPub.UserView alias Pleroma.Web.ActivityPub.Visibility - alias Pleroma.Web.FedSockets.FetchRegistry alias Pleroma.Web.FedSockets.IngesterWorker alias Pleroma.Web.FedSockets.OutgoingHandler alias Pleroma.Web.FedSockets.SocketInfo @@ -46,8 +45,8 @@ defmodule Pleroma.Web.FedSockets.FedSocket do end def fetch(%SocketInfo{pid: socket_pid}, data) do - timeout = Pleroma.Config.get([:fed_sockets, :fetch_timeout], 12_000) - FetchRegistry.fetch(socket_pid, data, timeout) + _timeout = Pleroma.Config.get([:fed_sockets, :fetch_timeout], 12_000) + OutgoingHandler.fetch(socket_pid, data) end def receive_package(%SocketInfo{} = fed_socket, json) do @@ -64,20 +63,11 @@ defmodule Pleroma.Web.FedSockets.FedSocket do {:reply, %{"action" => "publish_reply", "status" => "processed"}} end - defp process_package(%{"action" => "fetch_reply", "uuid" => uuid, "data" => data}, _fed_socket) do - FetchRegistry.receive_callback(uuid, data) - {:noreply, nil} - end - defp process_package(%{"action" => "fetch", "uuid" => uuid, "data" => ap_id}, _fed_socket) do {:ok, data} = render_fetched_data(ap_id, uuid) {:reply, data} end - defp process_package(%{"action" => "publish_reply"}, _fed_socket) do - {:noreply, nil} - end - defp process_package(other, _fed_socket) do Logger.warn("unknown json packages received #{inspect(other)}") {:noreply, nil} diff --git a/lib/pleroma/web/fed_sockets/outgoing_handler.ex b/lib/pleroma/web/fed_sockets/socket/client.ex index b340fad3a..480cc2274 100644 --- a/lib/pleroma/web/fed_sockets/outgoing_handler.ex +++ b/lib/pleroma/web/fed_sockets/socket/client.ex @@ -2,7 +2,7 @@ # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> # SPDX-License-Identifier: AGPL-3.0-only -defmodule Pleroma.Web.FedSockets.OutgoingHandler do +defmodule Pleroma.Web.FedSockets.Socket.Client do use GenServer require Logger @@ -11,16 +11,61 @@ defmodule Pleroma.Web.FedSockets.OutgoingHandler do alias Pleroma.Web.FedSockets alias Pleroma.Web.FedSockets.FedRegistry alias Pleroma.Web.FedSockets.FedSocket + alias Pleroma.Web.FedSockets.Socket alias Pleroma.Web.FedSockets.SocketInfo + @behaviour Socket + + @impl true + def fetch(%{pid: handler_pid}, data, timeout) do + GenServer.call(handler_pid, {:fetch, data}, timeout) + end + + @impl true + def publish(socket_info, data) do + send_json(socket_info, %{action: :publish, data: data}) + end + def start_link(uri) do GenServer.start_link(__MODULE__, %{uri: uri}) end + @impl true + def handle_call( + {:fetch, data}, + from, + %{ + last_fetch_id: last_fetch_id, + socket_info: socket_info, + waiting_fetches: waiting_fetches + } = state + ) do + last_fetch_id = last_fetch_id + 1 + request = %{action: :fetch, data: data, uuid: last_fetch_id} + socket_info = send_json(socket_info, request) + waiting_fetches = Map.put(waiting_fetches, last_fetch_id, from) + + {:noreply, + %{ + state + | socket_info: socket_info, + waiting_fetches: waiting_fetches, + last_fetch_id: last_fetch_id + }} + end + + defp send_json(%{conn_pid: conn_pid} = socket_info, data) do + socket_info = SocketInfo.touch(socket_info) + :gun.ws_send(conn_pid, {:text, Jason.encode!(data)}) + socket_info + end + + @impl true def init(%{uri: uri}) do case initiate_connection(uri) do {:ok, ws_origin, conn_pid} -> - FedRegistry.add_fed_socket(ws_origin, conn_pid) + {:ok, socket_info} = FedRegistry.add_fed_socket(ws_origin, conn_pid) + {:ok, %{socket_info: socket_info, waiting_fetches: %{}, last_fetch_id: 0}} {:error, reason} -> Logger.debug("Outgoing connection failed - #{inspect(reason)}") @@ -28,47 +73,65 @@ defmodule Pleroma.Web.FedSockets.OutgoingHandler do end end - def handle_info({:gun_ws, conn_pid, _ref, {:text, data}}, socket_info) do + @impl true + def handle_info( + {:gun_ws, _conn_pid, _ref, {:text, raw_message}}, + %{socket_info: socket_info} = state + ) do socket_info = SocketInfo.touch(socket_info) - case FedSocket.receive_package(socket_info, data) do - {:noreply, _} -> - {:noreply, socket_info} - - {:reply, reply} -> - :gun.ws_send(conn_pid, {:text, Jason.encode!(reply)}) - {:noreply, socket_info} + state = + case Jason.decode(raw_message) do + {:ok, message} -> + case message do + %{"action" => "fetch_reply", "uuid" => uuid, "data" => data} -> + with {{_, _} = client, waiting_fetches} <- Map.pop(state.waiting_fetches, uuid) do + GenServer.reply(client, {:ok, data}) + %{state | waiting_fetches: waiting_fetches} + else + _ -> + state + end + + message -> + case Socket.process_message(socket_info, message) do + :noreply -> :noop + {:reply, data} -> send_json(socket_info, data) + end + + state + end + + {:error, _e} -> + state + end - {:error, reason} -> - Logger.error("incoming error - receive_package: #{inspect(reason)}") - {:noreply, socket_info} - end + {:noreply, %{state | socket_info: socket_info}} end + @impl true def handle_info(:close, state) do Logger.debug("Sending close frame !!!!!!!") {:close, state} end + @impl true def handle_info({:gun_down, _pid, _prot, :closed, _}, state) do {:stop, :normal, state} end - def handle_info({:send, data}, %{conn_pid: conn_pid} = socket_info) do - socket_info = SocketInfo.touch(socket_info) - :gun.ws_send(conn_pid, {:text, data}) - {:noreply, socket_info} - end - + @impl true def handle_info({:gun_ws, _, _, :pong}, state) do {:noreply, state, :hibernate} end + @impl true def handle_info(msg, state) do Logger.debug("#{__MODULE__} unhandled event #{inspect(msg)}") {:noreply, state} end + @impl true def terminate(reason, state) do Logger.debug( "#{__MODULE__} terminating outgoing connection for #{inspect(state)} for #{inspect(reason)}" @@ -87,17 +150,18 @@ defmodule Pleroma.Web.FedSockets.OutgoingHandler do with {:ok, conn_pid} <- :gun.open(to_charlist(host), port, %{protocols: [:http]}), {:ok, _} <- :gun.await_up(conn_pid), - reference <- :gun.get(conn_pid, to_charlist(path)), - {:response, :fin, 204, _} <- :gun.await(conn_pid, reference) |> IO.inspect(), - :ok <- :gun.flush(conn_pid), + # TODO: nodeinfo-based support detection + # reference <- :gun.get(conn_pid, to_charlist(path)), + # {:response, :fin, 204, _} <- :gun.await(conn_pid, reference) |> IO.inspect(), + # :ok <- :gun.flush(conn_pid), headers <- build_headers(uri), ref <- :gun.ws_upgrade(conn_pid, to_charlist(path), headers, %{silence_pings: false}) do receive do {:gun_upgrade, ^conn_pid, ^ref, [<<"websocket">>], _} -> {:ok, ws_uri, conn_pid} - mes -> - IO.inspect(mes) + # mes -> + # IO.inspect(mes) after 15_000 -> Logger.debug("Fedsocket timeout connecting to #{inspect(uri)}") @@ -122,7 +186,7 @@ defmodule Pleroma.Web.FedSockets.OutgoingHandler do shake_size = byte_size(shake) signature_opts = %{ - # "(request-target)": shake, + "(request-target)": shake, "content-length": to_charlist("#{shake_size}"), date: date, digest: digest, @@ -132,11 +196,11 @@ defmodule Pleroma.Web.FedSockets.OutgoingHandler do signature = Pleroma.Signature.sign(InternalFetchActor.get_actor(), signature_opts) [ - {'signature', to_charlist(signature)}, - {'date', date}, - {'digest', to_charlist(digest)}, - {'content-length', to_charlist("#{shake_size}")} - # {to_charlist("(request-target)"), to_charlist(shake)} + {"signature", signature}, + {"date", date}, + {"digest", digest}, + {"content-length", to_string(shake_size)}, + {"(request-target)", shake} ] end diff --git a/lib/pleroma/web/fed_sockets/incoming_handler.ex b/lib/pleroma/web/fed_sockets/socket/server.ex index 49d0d9d84..71b45ede0 100644 --- a/lib/pleroma/web/fed_sockets/incoming_handler.ex +++ b/lib/pleroma/web/fed_sockets/socket/server.ex @@ -2,17 +2,28 @@ # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> # SPDX-License-Identifier: AGPL-3.0-only -defmodule Pleroma.Web.FedSockets.IncomingHandler do +defmodule Pleroma.Web.FedSockets.Socket.Server do require Logger + alias Pleroma.Web.FedSockets.Socket alias Pleroma.Web.FedSockets.FedRegistry alias Pleroma.Web.FedSockets.FedSocket alias Pleroma.Web.FedSockets.SocketInfo import HTTPSignatures, only: [validate_conn: 1, split_signature: 1] + require Logger + @behaviour :cowboy_websocket + @behaviour Socket + + @impl true + def fetch(_socket, _data, _timeout), do: {:error, :not_implemented} + @impl true + def publish(_socket, _data), do: {:error, :not_implemented} + + @impl true def init(req, state) do shake = FedSocket.shake() @@ -30,11 +41,13 @@ defmodule Pleroma.Web.FedSockets.IncomingHandler do {:cowboy_websocket, req, %{origin: origin}, %{}} else - _ -> + e -> + Logger.debug(fn -> "#{__MODULE__}: Websocket switch failed, #{inspect(e)}" end) {:ok, req, state} end end + @impl true def websocket_init(%{origin: origin}) do case FedRegistry.add_fed_socket(origin) do {:ok, socket_info} -> @@ -46,28 +59,24 @@ defmodule Pleroma.Web.FedSockets.IncomingHandler do end end - # Use the ping to check if the connection should be expired - def websocket_handle(:ping, socket_info) do - if SocketInfo.expired?(socket_info) do - {:stop, socket_info} - else - {:ok, socket_info, :hibernate} - end - end + @impl true + def websocket_handle(:ping, socket_info), do: {:ok, socket_info} - def websocket_handle({:text, data}, socket_info) do + def websocket_handle({:text, raw_message}, socket_info) do socket_info = SocketInfo.touch(socket_info) - case FedSocket.receive_package(socket_info, data) do - {:noreply, _} -> - {:ok, socket_info} - - {:reply, reply} -> - {:reply, {:text, Jason.encode!(reply)}, socket_info} + case Jason.decode(raw_message) do + {:ok, message} -> + case message do + message -> + case Socket.process_message(socket_info, message) do + :noreply -> {:ok, socket_info} + {:reply, data} -> {:reply, Jason.encode!(data), socket_info} + end + end - {:error, reason} -> - Logger.error("incoming error - receive_package: #{inspect(reason)}") - {:ok, socket_info} + {:error, decode_error} -> + exit({:malformed_message, decode_error}) end end diff --git a/lib/pleroma/web/fed_sockets/supervisor.ex b/lib/pleroma/web/fed_sockets/supervisor.ex index a5f4bebfb..721ac39c5 100644 --- a/lib/pleroma/web/fed_sockets/supervisor.ex +++ b/lib/pleroma/web/fed_sockets/supervisor.ex @@ -12,9 +12,8 @@ defmodule Pleroma.Web.FedSockets.Supervisor do def init(args) do children = [ - build_cache(:fed_socket_fetches, args), build_cache(:fed_socket_rejections, args), - {Registry, keys: :unique, name: FedSockets.Registry, meta: [rejected: %{}]} + {Registry, keys: :unique, name: FedSockets.Registry} ] opts = [strategy: :one_for_all, name: Pleroma.Web.Streamer.Supervisor] @@ -31,8 +30,7 @@ defmodule Pleroma.Web.FedSockets.Supervisor do } end - defp get_opts(cache_name, args) - when cache_name in [:fed_socket_fetches, :fed_socket_rejections] do + defp get_opts(:fed_socket_rejections = cache_name, args) do default = get_opts_or_config(args, cache_name, :default, 15_000) interval = get_opts_or_config(args, cache_name, :interval, 3_000) lazy = get_opts_or_config(args, cache_name, :lazy, false) |