aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorrinpatch <rinpatch@sdf.org>2020-09-20 14:15:34 +0300
committerrinpatch <rinpatch@sdf.org>2020-09-20 14:15:34 +0300
commitcd139dbd86570ca4c4fd9784d64b0a5a4ed24dff (patch)
tree08c1260e13249c7040bd8400974a3c148d51ba51
parent3cd9ded4744974d5ac0b4d351536815e4c0db56b (diff)
downloadpleroma-cd139dbd86570ca4c4fd9784d64b0a5a4ed24dff.tar.gz
tmp
-rw-r--r--lib/pleroma/web/fed_sockets/fed_socket.ex14
-rw-r--r--lib/pleroma/web/fed_sockets/socket/client.ex (renamed from lib/pleroma/web/fed_sockets/outgoing_handler.ex)126
-rw-r--r--lib/pleroma/web/fed_sockets/socket/server.ex (renamed from lib/pleroma/web/fed_sockets/incoming_handler.ex)49
-rw-r--r--lib/pleroma/web/fed_sockets/supervisor.ex6
4 files changed, 128 insertions, 67 deletions
diff --git a/lib/pleroma/web/fed_sockets/fed_socket.ex b/lib/pleroma/web/fed_sockets/fed_socket.ex
index 57b17174a..fa03b78cc 100644
--- a/lib/pleroma/web/fed_sockets/fed_socket.ex
+++ b/lib/pleroma/web/fed_sockets/fed_socket.ex
@@ -17,7 +17,6 @@ defmodule Pleroma.Web.FedSockets.FedSocket do
alias Pleroma.Web.ActivityPub.ObjectView
alias Pleroma.Web.ActivityPub.UserView
alias Pleroma.Web.ActivityPub.Visibility
- alias Pleroma.Web.FedSockets.FetchRegistry
alias Pleroma.Web.FedSockets.IngesterWorker
alias Pleroma.Web.FedSockets.OutgoingHandler
alias Pleroma.Web.FedSockets.SocketInfo
@@ -46,8 +45,8 @@ defmodule Pleroma.Web.FedSockets.FedSocket do
end
def fetch(%SocketInfo{pid: socket_pid}, data) do
- timeout = Pleroma.Config.get([:fed_sockets, :fetch_timeout], 12_000)
- FetchRegistry.fetch(socket_pid, data, timeout)
+ _timeout = Pleroma.Config.get([:fed_sockets, :fetch_timeout], 12_000)
+ OutgoingHandler.fetch(socket_pid, data)
end
def receive_package(%SocketInfo{} = fed_socket, json) do
@@ -64,20 +63,11 @@ defmodule Pleroma.Web.FedSockets.FedSocket do
{:reply, %{"action" => "publish_reply", "status" => "processed"}}
end
- defp process_package(%{"action" => "fetch_reply", "uuid" => uuid, "data" => data}, _fed_socket) do
- FetchRegistry.receive_callback(uuid, data)
- {:noreply, nil}
- end
-
defp process_package(%{"action" => "fetch", "uuid" => uuid, "data" => ap_id}, _fed_socket) do
{:ok, data} = render_fetched_data(ap_id, uuid)
{:reply, data}
end
- defp process_package(%{"action" => "publish_reply"}, _fed_socket) do
- {:noreply, nil}
- end
-
defp process_package(other, _fed_socket) do
Logger.warn("unknown json packages received #{inspect(other)}")
{:noreply, nil}
diff --git a/lib/pleroma/web/fed_sockets/outgoing_handler.ex b/lib/pleroma/web/fed_sockets/socket/client.ex
index b340fad3a..480cc2274 100644
--- a/lib/pleroma/web/fed_sockets/outgoing_handler.ex
+++ b/lib/pleroma/web/fed_sockets/socket/client.ex
@@ -2,7 +2,7 @@
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
-defmodule Pleroma.Web.FedSockets.OutgoingHandler do
+defmodule Pleroma.Web.FedSockets.Socket.Client do
use GenServer
require Logger
@@ -11,16 +11,61 @@ defmodule Pleroma.Web.FedSockets.OutgoingHandler do
alias Pleroma.Web.FedSockets
alias Pleroma.Web.FedSockets.FedRegistry
alias Pleroma.Web.FedSockets.FedSocket
+ alias Pleroma.Web.FedSockets.Socket
alias Pleroma.Web.FedSockets.SocketInfo
+ @behaviour Socket
+
+ @impl true
+ def fetch(%{pid: handler_pid}, data, timeout) do
+ GenServer.call(handler_pid, {:fetch, data}, timeout)
+ end
+
+ @impl true
+ def publish(socket_info, data) do
+ send_json(socket_info, %{action: :publish, data: data})
+ end
+
def start_link(uri) do
GenServer.start_link(__MODULE__, %{uri: uri})
end
+ @impl true
+ def handle_call(
+ {:fetch, data},
+ from,
+ %{
+ last_fetch_id: last_fetch_id,
+ socket_info: socket_info,
+ waiting_fetches: waiting_fetches
+ } = state
+ ) do
+ last_fetch_id = last_fetch_id + 1
+ request = %{action: :fetch, data: data, uuid: last_fetch_id}
+ socket_info = send_json(socket_info, request)
+ waiting_fetches = Map.put(waiting_fetches, last_fetch_id, from)
+
+ {:noreply,
+ %{
+ state
+ | socket_info: socket_info,
+ waiting_fetches: waiting_fetches,
+ last_fetch_id: last_fetch_id
+ }}
+ end
+
+ defp send_json(%{conn_pid: conn_pid} = socket_info, data) do
+ socket_info = SocketInfo.touch(socket_info)
+ :gun.ws_send(conn_pid, {:text, Jason.encode!(data)})
+ socket_info
+ end
+
+ @impl true
def init(%{uri: uri}) do
case initiate_connection(uri) do
{:ok, ws_origin, conn_pid} ->
- FedRegistry.add_fed_socket(ws_origin, conn_pid)
+ {:ok, socket_info} = FedRegistry.add_fed_socket(ws_origin, conn_pid)
+ {:ok, %{socket_info: socket_info, waiting_fetches: %{}, last_fetch_id: 0}}
{:error, reason} ->
Logger.debug("Outgoing connection failed - #{inspect(reason)}")
@@ -28,47 +73,65 @@ defmodule Pleroma.Web.FedSockets.OutgoingHandler do
end
end
- def handle_info({:gun_ws, conn_pid, _ref, {:text, data}}, socket_info) do
+ @impl true
+ def handle_info(
+ {:gun_ws, _conn_pid, _ref, {:text, raw_message}},
+ %{socket_info: socket_info} = state
+ ) do
socket_info = SocketInfo.touch(socket_info)
- case FedSocket.receive_package(socket_info, data) do
- {:noreply, _} ->
- {:noreply, socket_info}
-
- {:reply, reply} ->
- :gun.ws_send(conn_pid, {:text, Jason.encode!(reply)})
- {:noreply, socket_info}
+ state =
+ case Jason.decode(raw_message) do
+ {:ok, message} ->
+ case message do
+ %{"action" => "fetch_reply", "uuid" => uuid, "data" => data} ->
+ with {{_, _} = client, waiting_fetches} <- Map.pop(state.waiting_fetches, uuid) do
+ GenServer.reply(client, {:ok, data})
+ %{state | waiting_fetches: waiting_fetches}
+ else
+ _ ->
+ state
+ end
+
+ message ->
+ case Socket.process_message(socket_info, message) do
+ :noreply -> :noop
+ {:reply, data} -> send_json(socket_info, data)
+ end
+
+ state
+ end
+
+ {:error, _e} ->
+ state
+ end
- {:error, reason} ->
- Logger.error("incoming error - receive_package: #{inspect(reason)}")
- {:noreply, socket_info}
- end
+ {:noreply, %{state | socket_info: socket_info}}
end
+ @impl true
def handle_info(:close, state) do
Logger.debug("Sending close frame !!!!!!!")
{:close, state}
end
+ @impl true
def handle_info({:gun_down, _pid, _prot, :closed, _}, state) do
{:stop, :normal, state}
end
- def handle_info({:send, data}, %{conn_pid: conn_pid} = socket_info) do
- socket_info = SocketInfo.touch(socket_info)
- :gun.ws_send(conn_pid, {:text, data})
- {:noreply, socket_info}
- end
-
+ @impl true
def handle_info({:gun_ws, _, _, :pong}, state) do
{:noreply, state, :hibernate}
end
+ @impl true
def handle_info(msg, state) do
Logger.debug("#{__MODULE__} unhandled event #{inspect(msg)}")
{:noreply, state}
end
+ @impl true
def terminate(reason, state) do
Logger.debug(
"#{__MODULE__} terminating outgoing connection for #{inspect(state)} for #{inspect(reason)}"
@@ -87,17 +150,18 @@ defmodule Pleroma.Web.FedSockets.OutgoingHandler do
with {:ok, conn_pid} <- :gun.open(to_charlist(host), port, %{protocols: [:http]}),
{:ok, _} <- :gun.await_up(conn_pid),
- reference <- :gun.get(conn_pid, to_charlist(path)),
- {:response, :fin, 204, _} <- :gun.await(conn_pid, reference) |> IO.inspect(),
- :ok <- :gun.flush(conn_pid),
+ # TODO: nodeinfo-based support detection
+ # reference <- :gun.get(conn_pid, to_charlist(path)),
+ # {:response, :fin, 204, _} <- :gun.await(conn_pid, reference) |> IO.inspect(),
+ # :ok <- :gun.flush(conn_pid),
headers <- build_headers(uri),
ref <- :gun.ws_upgrade(conn_pid, to_charlist(path), headers, %{silence_pings: false}) do
receive do
{:gun_upgrade, ^conn_pid, ^ref, [<<"websocket">>], _} ->
{:ok, ws_uri, conn_pid}
- mes ->
- IO.inspect(mes)
+ # mes ->
+ # IO.inspect(mes)
after
15_000 ->
Logger.debug("Fedsocket timeout connecting to #{inspect(uri)}")
@@ -122,7 +186,7 @@ defmodule Pleroma.Web.FedSockets.OutgoingHandler do
shake_size = byte_size(shake)
signature_opts = %{
- # "(request-target)": shake,
+ "(request-target)": shake,
"content-length": to_charlist("#{shake_size}"),
date: date,
digest: digest,
@@ -132,11 +196,11 @@ defmodule Pleroma.Web.FedSockets.OutgoingHandler do
signature = Pleroma.Signature.sign(InternalFetchActor.get_actor(), signature_opts)
[
- {'signature', to_charlist(signature)},
- {'date', date},
- {'digest', to_charlist(digest)},
- {'content-length', to_charlist("#{shake_size}")}
- # {to_charlist("(request-target)"), to_charlist(shake)}
+ {"signature", signature},
+ {"date", date},
+ {"digest", digest},
+ {"content-length", to_string(shake_size)},
+ {"(request-target)", shake}
]
end
diff --git a/lib/pleroma/web/fed_sockets/incoming_handler.ex b/lib/pleroma/web/fed_sockets/socket/server.ex
index 49d0d9d84..71b45ede0 100644
--- a/lib/pleroma/web/fed_sockets/incoming_handler.ex
+++ b/lib/pleroma/web/fed_sockets/socket/server.ex
@@ -2,17 +2,28 @@
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
-defmodule Pleroma.Web.FedSockets.IncomingHandler do
+defmodule Pleroma.Web.FedSockets.Socket.Server do
require Logger
+ alias Pleroma.Web.FedSockets.Socket
alias Pleroma.Web.FedSockets.FedRegistry
alias Pleroma.Web.FedSockets.FedSocket
alias Pleroma.Web.FedSockets.SocketInfo
import HTTPSignatures, only: [validate_conn: 1, split_signature: 1]
+ require Logger
+
@behaviour :cowboy_websocket
+ @behaviour Socket
+
+ @impl true
+ def fetch(_socket, _data, _timeout), do: {:error, :not_implemented}
+ @impl true
+ def publish(_socket, _data), do: {:error, :not_implemented}
+
+ @impl true
def init(req, state) do
shake = FedSocket.shake()
@@ -30,11 +41,13 @@ defmodule Pleroma.Web.FedSockets.IncomingHandler do
{:cowboy_websocket, req, %{origin: origin}, %{}}
else
- _ ->
+ e ->
+ Logger.debug(fn -> "#{__MODULE__}: Websocket switch failed, #{inspect(e)}" end)
{:ok, req, state}
end
end
+ @impl true
def websocket_init(%{origin: origin}) do
case FedRegistry.add_fed_socket(origin) do
{:ok, socket_info} ->
@@ -46,28 +59,24 @@ defmodule Pleroma.Web.FedSockets.IncomingHandler do
end
end
- # Use the ping to check if the connection should be expired
- def websocket_handle(:ping, socket_info) do
- if SocketInfo.expired?(socket_info) do
- {:stop, socket_info}
- else
- {:ok, socket_info, :hibernate}
- end
- end
+ @impl true
+ def websocket_handle(:ping, socket_info), do: {:ok, socket_info}
- def websocket_handle({:text, data}, socket_info) do
+ def websocket_handle({:text, raw_message}, socket_info) do
socket_info = SocketInfo.touch(socket_info)
- case FedSocket.receive_package(socket_info, data) do
- {:noreply, _} ->
- {:ok, socket_info}
-
- {:reply, reply} ->
- {:reply, {:text, Jason.encode!(reply)}, socket_info}
+ case Jason.decode(raw_message) do
+ {:ok, message} ->
+ case message do
+ message ->
+ case Socket.process_message(socket_info, message) do
+ :noreply -> {:ok, socket_info}
+ {:reply, data} -> {:reply, Jason.encode!(data), socket_info}
+ end
+ end
- {:error, reason} ->
- Logger.error("incoming error - receive_package: #{inspect(reason)}")
- {:ok, socket_info}
+ {:error, decode_error} ->
+ exit({:malformed_message, decode_error})
end
end
diff --git a/lib/pleroma/web/fed_sockets/supervisor.ex b/lib/pleroma/web/fed_sockets/supervisor.ex
index a5f4bebfb..721ac39c5 100644
--- a/lib/pleroma/web/fed_sockets/supervisor.ex
+++ b/lib/pleroma/web/fed_sockets/supervisor.ex
@@ -12,9 +12,8 @@ defmodule Pleroma.Web.FedSockets.Supervisor do
def init(args) do
children = [
- build_cache(:fed_socket_fetches, args),
build_cache(:fed_socket_rejections, args),
- {Registry, keys: :unique, name: FedSockets.Registry, meta: [rejected: %{}]}
+ {Registry, keys: :unique, name: FedSockets.Registry}
]
opts = [strategy: :one_for_all, name: Pleroma.Web.Streamer.Supervisor]
@@ -31,8 +30,7 @@ defmodule Pleroma.Web.FedSockets.Supervisor do
}
end
- defp get_opts(cache_name, args)
- when cache_name in [:fed_socket_fetches, :fed_socket_rejections] do
+ defp get_opts(:fed_socket_rejections = cache_name, args) do
default = get_opts_or_config(args, cache_name, :default, 15_000)
interval = get_opts_or_config(args, cache_name, :interval, 3_000)
lazy = get_opts_or_config(args, cache_name, :lazy, false)