From cd139dbd86570ca4c4fd9784d64b0a5a4ed24dff Mon Sep 17 00:00:00 2001 From: rinpatch Date: Sun, 20 Sep 2020 14:15:34 +0300 Subject: tmp --- lib/pleroma/web/fed_sockets/fed_socket.ex | 14 +- lib/pleroma/web/fed_sockets/incoming_handler.ex | 88 ---------- lib/pleroma/web/fed_sockets/outgoing_handler.ex | 150 ----------------- lib/pleroma/web/fed_sockets/socket/client.ex | 214 ++++++++++++++++++++++++ lib/pleroma/web/fed_sockets/socket/server.ex | 97 +++++++++++ lib/pleroma/web/fed_sockets/supervisor.ex | 6 +- 6 files changed, 315 insertions(+), 254 deletions(-) delete mode 100644 lib/pleroma/web/fed_sockets/incoming_handler.ex delete mode 100644 lib/pleroma/web/fed_sockets/outgoing_handler.ex create mode 100644 lib/pleroma/web/fed_sockets/socket/client.ex create mode 100644 lib/pleroma/web/fed_sockets/socket/server.ex diff --git a/lib/pleroma/web/fed_sockets/fed_socket.ex b/lib/pleroma/web/fed_sockets/fed_socket.ex index 57b17174a..fa03b78cc 100644 --- a/lib/pleroma/web/fed_sockets/fed_socket.ex +++ b/lib/pleroma/web/fed_sockets/fed_socket.ex @@ -17,7 +17,6 @@ defmodule Pleroma.Web.FedSockets.FedSocket do alias Pleroma.Web.ActivityPub.ObjectView alias Pleroma.Web.ActivityPub.UserView alias Pleroma.Web.ActivityPub.Visibility - alias Pleroma.Web.FedSockets.FetchRegistry alias Pleroma.Web.FedSockets.IngesterWorker alias Pleroma.Web.FedSockets.OutgoingHandler alias Pleroma.Web.FedSockets.SocketInfo @@ -46,8 +45,8 @@ defmodule Pleroma.Web.FedSockets.FedSocket do end def fetch(%SocketInfo{pid: socket_pid}, data) do - timeout = Pleroma.Config.get([:fed_sockets, :fetch_timeout], 12_000) - FetchRegistry.fetch(socket_pid, data, timeout) + _timeout = Pleroma.Config.get([:fed_sockets, :fetch_timeout], 12_000) + OutgoingHandler.fetch(socket_pid, data) end def receive_package(%SocketInfo{} = fed_socket, json) do @@ -64,20 +63,11 @@ defmodule Pleroma.Web.FedSockets.FedSocket do {:reply, %{"action" => "publish_reply", "status" => "processed"}} end - defp process_package(%{"action" => "fetch_reply", "uuid" => uuid, "data" => data}, _fed_socket) do - FetchRegistry.receive_callback(uuid, data) - {:noreply, nil} - end - defp process_package(%{"action" => "fetch", "uuid" => uuid, "data" => ap_id}, _fed_socket) do {:ok, data} = render_fetched_data(ap_id, uuid) {:reply, data} end - defp process_package(%{"action" => "publish_reply"}, _fed_socket) do - {:noreply, nil} - end - defp process_package(other, _fed_socket) do Logger.warn("unknown json packages received #{inspect(other)}") {:noreply, nil} diff --git a/lib/pleroma/web/fed_sockets/incoming_handler.ex b/lib/pleroma/web/fed_sockets/incoming_handler.ex deleted file mode 100644 index 49d0d9d84..000000000 --- a/lib/pleroma/web/fed_sockets/incoming_handler.ex +++ /dev/null @@ -1,88 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2020 Pleroma Authors -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Web.FedSockets.IncomingHandler do - require Logger - - alias Pleroma.Web.FedSockets.FedRegistry - alias Pleroma.Web.FedSockets.FedSocket - alias Pleroma.Web.FedSockets.SocketInfo - - import HTTPSignatures, only: [validate_conn: 1, split_signature: 1] - - @behaviour :cowboy_websocket - - def init(req, state) do - shake = FedSocket.shake() - - with true <- Pleroma.Config.get([:fed_sockets, :enabled]), - sec_protocol <- :cowboy_req.header("sec-websocket-protocol", req, nil), - headers = %{"(request-target)" => ^shake} <- :cowboy_req.headers(req), - true <- validate_conn(%{req_headers: headers}), - %{"keyId" => origin} <- split_signature(headers["signature"]) do - req = - if is_nil(sec_protocol) do - req - else - :cowboy_req.set_resp_header("sec-websocket-protocol", sec_protocol, req) - end - - {:cowboy_websocket, req, %{origin: origin}, %{}} - else - _ -> - {:ok, req, state} - end - end - - def websocket_init(%{origin: origin}) do - case FedRegistry.add_fed_socket(origin) do - {:ok, socket_info} -> - {:ok, socket_info} - - e -> - Logger.error("FedSocket websocket_init failed - #{inspect(e)}") - {:error, inspect(e)} - end - end - - # Use the ping to check if the connection should be expired - def websocket_handle(:ping, socket_info) do - if SocketInfo.expired?(socket_info) do - {:stop, socket_info} - else - {:ok, socket_info, :hibernate} - end - end - - def websocket_handle({:text, data}, socket_info) do - socket_info = SocketInfo.touch(socket_info) - - case FedSocket.receive_package(socket_info, data) do - {:noreply, _} -> - {:ok, socket_info} - - {:reply, reply} -> - {:reply, {:text, Jason.encode!(reply)}, socket_info} - - {:error, reason} -> - Logger.error("incoming error - receive_package: #{inspect(reason)}") - {:ok, socket_info} - end - end - - def websocket_info({:send, message}, socket_info) do - socket_info = SocketInfo.touch(socket_info) - - {:reply, {:text, message}, socket_info} - end - - def websocket_info(:close, state) do - {:stop, state} - end - - def websocket_info(message, state) do - Logger.debug("#{__MODULE__} unknown message #{inspect(message)}") - {:ok, state} - end -end diff --git a/lib/pleroma/web/fed_sockets/outgoing_handler.ex b/lib/pleroma/web/fed_sockets/outgoing_handler.ex deleted file mode 100644 index b340fad3a..000000000 --- a/lib/pleroma/web/fed_sockets/outgoing_handler.ex +++ /dev/null @@ -1,150 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2020 Pleroma Authors -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Web.FedSockets.OutgoingHandler do - use GenServer - - require Logger - - alias Pleroma.Web.ActivityPub.InternalFetchActor - alias Pleroma.Web.FedSockets - alias Pleroma.Web.FedSockets.FedRegistry - alias Pleroma.Web.FedSockets.FedSocket - alias Pleroma.Web.FedSockets.SocketInfo - - def start_link(uri) do - GenServer.start_link(__MODULE__, %{uri: uri}) - end - - def init(%{uri: uri}) do - case initiate_connection(uri) do - {:ok, ws_origin, conn_pid} -> - FedRegistry.add_fed_socket(ws_origin, conn_pid) - - {:error, reason} -> - Logger.debug("Outgoing connection failed - #{inspect(reason)}") - :ignore - end - end - - def handle_info({:gun_ws, conn_pid, _ref, {:text, data}}, socket_info) do - socket_info = SocketInfo.touch(socket_info) - - case FedSocket.receive_package(socket_info, data) do - {:noreply, _} -> - {:noreply, socket_info} - - {:reply, reply} -> - :gun.ws_send(conn_pid, {:text, Jason.encode!(reply)}) - {:noreply, socket_info} - - {:error, reason} -> - Logger.error("incoming error - receive_package: #{inspect(reason)}") - {:noreply, socket_info} - end - end - - def handle_info(:close, state) do - Logger.debug("Sending close frame !!!!!!!") - {:close, state} - end - - def handle_info({:gun_down, _pid, _prot, :closed, _}, state) do - {:stop, :normal, state} - end - - def handle_info({:send, data}, %{conn_pid: conn_pid} = socket_info) do - socket_info = SocketInfo.touch(socket_info) - :gun.ws_send(conn_pid, {:text, data}) - {:noreply, socket_info} - end - - def handle_info({:gun_ws, _, _, :pong}, state) do - {:noreply, state, :hibernate} - end - - def handle_info(msg, state) do - Logger.debug("#{__MODULE__} unhandled event #{inspect(msg)}") - {:noreply, state} - end - - def terminate(reason, state) do - Logger.debug( - "#{__MODULE__} terminating outgoing connection for #{inspect(state)} for #{inspect(reason)}" - ) - - {:ok, state} - end - - def initiate_connection(uri) do - ws_uri = - uri - |> SocketInfo.origin() - |> FedSockets.uri_for_origin() - - %{host: host, port: port, path: path} = URI.parse(ws_uri) - - with {:ok, conn_pid} <- :gun.open(to_charlist(host), port, %{protocols: [:http]}), - {:ok, _} <- :gun.await_up(conn_pid), - reference <- :gun.get(conn_pid, to_charlist(path)), - {:response, :fin, 204, _} <- :gun.await(conn_pid, reference) |> IO.inspect(), - :ok <- :gun.flush(conn_pid), - headers <- build_headers(uri), - ref <- :gun.ws_upgrade(conn_pid, to_charlist(path), headers, %{silence_pings: false}) do - receive do - {:gun_upgrade, ^conn_pid, ^ref, [<<"websocket">>], _} -> - {:ok, ws_uri, conn_pid} - - mes -> - IO.inspect(mes) - after - 15_000 -> - Logger.debug("Fedsocket timeout connecting to #{inspect(uri)}") - {:error, :timeout} - end - else - {:response, :nofin, 404, _} -> - {:error, :fedsockets_not_supported} - - e -> - Logger.debug("Fedsocket error connecting to #{inspect(uri)}") - {:error, e} - end - end - - defp build_headers(uri) do - host_for_sig = uri |> URI.parse() |> host_signature() - - shake = FedSocket.shake() - digest = "SHA-256=" <> (:crypto.hash(:sha256, shake) |> Base.encode64()) - date = Pleroma.Signature.signed_date() - shake_size = byte_size(shake) - - signature_opts = %{ - # "(request-target)": shake, - "content-length": to_charlist("#{shake_size}"), - date: date, - digest: digest, - host: host_for_sig - } - - signature = Pleroma.Signature.sign(InternalFetchActor.get_actor(), signature_opts) - - [ - {'signature', to_charlist(signature)}, - {'date', date}, - {'digest', to_charlist(digest)}, - {'content-length', to_charlist("#{shake_size}")} - # {to_charlist("(request-target)"), to_charlist(shake)} - ] - end - - defp host_signature(%{host: host, scheme: scheme, port: port}) do - if port == URI.default_port(scheme) do - host - else - "#{host}:#{port}" - end - end -end diff --git a/lib/pleroma/web/fed_sockets/socket/client.ex b/lib/pleroma/web/fed_sockets/socket/client.ex new file mode 100644 index 000000000..480cc2274 --- /dev/null +++ b/lib/pleroma/web/fed_sockets/socket/client.ex @@ -0,0 +1,214 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2020 Pleroma Authors +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Web.FedSockets.Socket.Client do + use GenServer + + require Logger + + alias Pleroma.Web.ActivityPub.InternalFetchActor + alias Pleroma.Web.FedSockets + alias Pleroma.Web.FedSockets.FedRegistry + alias Pleroma.Web.FedSockets.FedSocket + alias Pleroma.Web.FedSockets.Socket + alias Pleroma.Web.FedSockets.SocketInfo + + @behaviour Socket + + @impl true + def fetch(%{pid: handler_pid}, data, timeout) do + GenServer.call(handler_pid, {:fetch, data}, timeout) + end + + @impl true + def publish(socket_info, data) do + send_json(socket_info, %{action: :publish, data: data}) + end + + def start_link(uri) do + GenServer.start_link(__MODULE__, %{uri: uri}) + end + + @impl true + def handle_call( + {:fetch, data}, + from, + %{ + last_fetch_id: last_fetch_id, + socket_info: socket_info, + waiting_fetches: waiting_fetches + } = state + ) do + last_fetch_id = last_fetch_id + 1 + request = %{action: :fetch, data: data, uuid: last_fetch_id} + socket_info = send_json(socket_info, request) + waiting_fetches = Map.put(waiting_fetches, last_fetch_id, from) + + {:noreply, + %{ + state + | socket_info: socket_info, + waiting_fetches: waiting_fetches, + last_fetch_id: last_fetch_id + }} + end + + defp send_json(%{conn_pid: conn_pid} = socket_info, data) do + socket_info = SocketInfo.touch(socket_info) + :gun.ws_send(conn_pid, {:text, Jason.encode!(data)}) + socket_info + end + + @impl true + def init(%{uri: uri}) do + case initiate_connection(uri) do + {:ok, ws_origin, conn_pid} -> + {:ok, socket_info} = FedRegistry.add_fed_socket(ws_origin, conn_pid) + {:ok, %{socket_info: socket_info, waiting_fetches: %{}, last_fetch_id: 0}} + + {:error, reason} -> + Logger.debug("Outgoing connection failed - #{inspect(reason)}") + :ignore + end + end + + @impl true + def handle_info( + {:gun_ws, _conn_pid, _ref, {:text, raw_message}}, + %{socket_info: socket_info} = state + ) do + socket_info = SocketInfo.touch(socket_info) + + state = + case Jason.decode(raw_message) do + {:ok, message} -> + case message do + %{"action" => "fetch_reply", "uuid" => uuid, "data" => data} -> + with {{_, _} = client, waiting_fetches} <- Map.pop(state.waiting_fetches, uuid) do + GenServer.reply(client, {:ok, data}) + %{state | waiting_fetches: waiting_fetches} + else + _ -> + state + end + + message -> + case Socket.process_message(socket_info, message) do + :noreply -> :noop + {:reply, data} -> send_json(socket_info, data) + end + + state + end + + {:error, _e} -> + state + end + + {:noreply, %{state | socket_info: socket_info}} + end + + @impl true + def handle_info(:close, state) do + Logger.debug("Sending close frame !!!!!!!") + {:close, state} + end + + @impl true + def handle_info({:gun_down, _pid, _prot, :closed, _}, state) do + {:stop, :normal, state} + end + + @impl true + def handle_info({:gun_ws, _, _, :pong}, state) do + {:noreply, state, :hibernate} + end + + @impl true + def handle_info(msg, state) do + Logger.debug("#{__MODULE__} unhandled event #{inspect(msg)}") + {:noreply, state} + end + + @impl true + def terminate(reason, state) do + Logger.debug( + "#{__MODULE__} terminating outgoing connection for #{inspect(state)} for #{inspect(reason)}" + ) + + {:ok, state} + end + + def initiate_connection(uri) do + ws_uri = + uri + |> SocketInfo.origin() + |> FedSockets.uri_for_origin() + + %{host: host, port: port, path: path} = URI.parse(ws_uri) + + with {:ok, conn_pid} <- :gun.open(to_charlist(host), port, %{protocols: [:http]}), + {:ok, _} <- :gun.await_up(conn_pid), + # TODO: nodeinfo-based support detection + # reference <- :gun.get(conn_pid, to_charlist(path)), + # {:response, :fin, 204, _} <- :gun.await(conn_pid, reference) |> IO.inspect(), + # :ok <- :gun.flush(conn_pid), + headers <- build_headers(uri), + ref <- :gun.ws_upgrade(conn_pid, to_charlist(path), headers, %{silence_pings: false}) do + receive do + {:gun_upgrade, ^conn_pid, ^ref, [<<"websocket">>], _} -> + {:ok, ws_uri, conn_pid} + + # mes -> + # IO.inspect(mes) + after + 15_000 -> + Logger.debug("Fedsocket timeout connecting to #{inspect(uri)}") + {:error, :timeout} + end + else + {:response, :nofin, 404, _} -> + {:error, :fedsockets_not_supported} + + e -> + Logger.debug("Fedsocket error connecting to #{inspect(uri)}") + {:error, e} + end + end + + defp build_headers(uri) do + host_for_sig = uri |> URI.parse() |> host_signature() + + shake = FedSocket.shake() + digest = "SHA-256=" <> (:crypto.hash(:sha256, shake) |> Base.encode64()) + date = Pleroma.Signature.signed_date() + shake_size = byte_size(shake) + + signature_opts = %{ + "(request-target)": shake, + "content-length": to_charlist("#{shake_size}"), + date: date, + digest: digest, + host: host_for_sig + } + + signature = Pleroma.Signature.sign(InternalFetchActor.get_actor(), signature_opts) + + [ + {"signature", signature}, + {"date", date}, + {"digest", digest}, + {"content-length", to_string(shake_size)}, + {"(request-target)", shake} + ] + end + + defp host_signature(%{host: host, scheme: scheme, port: port}) do + if port == URI.default_port(scheme) do + host + else + "#{host}:#{port}" + end + end +end diff --git a/lib/pleroma/web/fed_sockets/socket/server.ex b/lib/pleroma/web/fed_sockets/socket/server.ex new file mode 100644 index 000000000..71b45ede0 --- /dev/null +++ b/lib/pleroma/web/fed_sockets/socket/server.ex @@ -0,0 +1,97 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2020 Pleroma Authors +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Web.FedSockets.Socket.Server do + require Logger + + alias Pleroma.Web.FedSockets.Socket + alias Pleroma.Web.FedSockets.FedRegistry + alias Pleroma.Web.FedSockets.FedSocket + alias Pleroma.Web.FedSockets.SocketInfo + + import HTTPSignatures, only: [validate_conn: 1, split_signature: 1] + + require Logger + + @behaviour :cowboy_websocket + @behaviour Socket + + @impl true + def fetch(_socket, _data, _timeout), do: {:error, :not_implemented} + + @impl true + def publish(_socket, _data), do: {:error, :not_implemented} + + @impl true + def init(req, state) do + shake = FedSocket.shake() + + with true <- Pleroma.Config.get([:fed_sockets, :enabled]), + sec_protocol <- :cowboy_req.header("sec-websocket-protocol", req, nil), + headers = %{"(request-target)" => ^shake} <- :cowboy_req.headers(req), + true <- validate_conn(%{req_headers: headers}), + %{"keyId" => origin} <- split_signature(headers["signature"]) do + req = + if is_nil(sec_protocol) do + req + else + :cowboy_req.set_resp_header("sec-websocket-protocol", sec_protocol, req) + end + + {:cowboy_websocket, req, %{origin: origin}, %{}} + else + e -> + Logger.debug(fn -> "#{__MODULE__}: Websocket switch failed, #{inspect(e)}" end) + {:ok, req, state} + end + end + + @impl true + def websocket_init(%{origin: origin}) do + case FedRegistry.add_fed_socket(origin) do + {:ok, socket_info} -> + {:ok, socket_info} + + e -> + Logger.error("FedSocket websocket_init failed - #{inspect(e)}") + {:error, inspect(e)} + end + end + + @impl true + def websocket_handle(:ping, socket_info), do: {:ok, socket_info} + + def websocket_handle({:text, raw_message}, socket_info) do + socket_info = SocketInfo.touch(socket_info) + + case Jason.decode(raw_message) do + {:ok, message} -> + case message do + message -> + case Socket.process_message(socket_info, message) do + :noreply -> {:ok, socket_info} + {:reply, data} -> {:reply, Jason.encode!(data), socket_info} + end + end + + {:error, decode_error} -> + exit({:malformed_message, decode_error}) + end + end + + def websocket_info({:send, message}, socket_info) do + socket_info = SocketInfo.touch(socket_info) + + {:reply, {:text, message}, socket_info} + end + + def websocket_info(:close, state) do + {:stop, state} + end + + def websocket_info(message, state) do + Logger.debug("#{__MODULE__} unknown message #{inspect(message)}") + {:ok, state} + end +end diff --git a/lib/pleroma/web/fed_sockets/supervisor.ex b/lib/pleroma/web/fed_sockets/supervisor.ex index a5f4bebfb..721ac39c5 100644 --- a/lib/pleroma/web/fed_sockets/supervisor.ex +++ b/lib/pleroma/web/fed_sockets/supervisor.ex @@ -12,9 +12,8 @@ defmodule Pleroma.Web.FedSockets.Supervisor do def init(args) do children = [ - build_cache(:fed_socket_fetches, args), build_cache(:fed_socket_rejections, args), - {Registry, keys: :unique, name: FedSockets.Registry, meta: [rejected: %{}]} + {Registry, keys: :unique, name: FedSockets.Registry} ] opts = [strategy: :one_for_all, name: Pleroma.Web.Streamer.Supervisor] @@ -31,8 +30,7 @@ defmodule Pleroma.Web.FedSockets.Supervisor do } end - defp get_opts(cache_name, args) - when cache_name in [:fed_socket_fetches, :fed_socket_rejections] do + defp get_opts(:fed_socket_rejections = cache_name, args) do default = get_opts_or_config(args, cache_name, :default, 15_000) interval = get_opts_or_config(args, cache_name, :interval, 3_000) lazy = get_opts_or_config(args, cache_name, :lazy, false) -- cgit v1.2.3