aboutsummaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/pleroma/web/fed_sockets/adapter.ex4
-rw-r--r--lib/pleroma/web/fed_sockets/adapter/cowboy.ex2
-rw-r--r--lib/pleroma/web/fed_sockets/adapter/gun.ex23
-rw-r--r--lib/pleroma/web/fed_sockets/ingester_worker.ex33
4 files changed, 12 insertions, 50 deletions
diff --git a/lib/pleroma/web/fed_sockets/adapter.ex b/lib/pleroma/web/fed_sockets/adapter.ex
index edb22d1d9..f35c964c6 100644
--- a/lib/pleroma/web/fed_sockets/adapter.ex
+++ b/lib/pleroma/web/fed_sockets/adapter.ex
@@ -18,7 +18,7 @@ defmodule Pleroma.Web.FedSockets.Adapter do
alias Pleroma.Web.ActivityPub.ObjectView
alias Pleroma.Web.ActivityPub.UserView
alias Pleroma.Web.ActivityPub.Visibility
- alias Pleroma.Web.FedSockets.IngesterWorker
+ alias Pleroma.Web.Federator
@type origin :: String.t()
@type fetch_id :: integer()
@@ -37,7 +37,7 @@ defmodule Pleroma.Web.FedSockets.Adapter do
def process_message(%{"action" => "publish", "data" => data}, origin, waiting_fetches) do
if Containment.contain_origin(origin, data) do
- IngesterWorker.enqueue("ingest", %{"object" => data})
+ Federator.incoming_ap_doc(data)
end
{:noreply, waiting_fetches}
diff --git a/lib/pleroma/web/fed_sockets/adapter/cowboy.ex b/lib/pleroma/web/fed_sockets/adapter/cowboy.ex
index 107d80849..a7b3835dd 100644
--- a/lib/pleroma/web/fed_sockets/adapter/cowboy.ex
+++ b/lib/pleroma/web/fed_sockets/adapter/cowboy.ex
@@ -92,7 +92,7 @@ defmodule Pleroma.Web.FedSockets.Adapter.Cowboy do
case Registry.register(@registry, key, %Value{
adapter: __MODULE__,
- adapter_state: %{last_fetch_id_ref: last_fetch_id_ref, waiting_fetches: %{}}
+ adapter_state: %{last_fetch_id_ref: last_fetch_id_ref}
}) do
{:ok, _owner} ->
{:ok, %{origin: origin, waiting_fetches: %{}}}
diff --git a/lib/pleroma/web/fed_sockets/adapter/gun.ex b/lib/pleroma/web/fed_sockets/adapter/gun.ex
index dc8ae6db7..9984b666e 100644
--- a/lib/pleroma/web/fed_sockets/adapter/gun.ex
+++ b/lib/pleroma/web/fed_sockets/adapter/gun.ex
@@ -47,7 +47,6 @@ defmodule Pleroma.Web.FedSockets.Adapter.Gun do
GenServer.cast(pid, {:await_connected, self()})
receive do
- {:DOWN, ^monitor, _, _, {:shutdown, reason}} -> reason
{:DOWN, ^monitor, _, _, reason} -> {:error, reason}
{:await_connected, ^pid, conn_pid, last_fetch_id_ref} -> {:ok, conn_pid, last_fetch_id_ref}
end
@@ -131,12 +130,6 @@ defmodule Pleroma.Web.FedSockets.Adapter.Gun do
end
@impl true
- def handle_info(:close, state) do
- Logger.debug("Sending close frame !!!!!!!")
- {:close, state}
- end
-
- @impl true
def handle_info({:gun_down, _pid, _prot, :closed, _}, state) do
{:stop, :normal, state}
end
@@ -165,7 +158,7 @@ defmodule Pleroma.Web.FedSockets.Adapter.Gun do
def initiate_connection(uri) do
%{host: host, port: port} = URI.parse(uri)
- with {:ok, conn_pid} <- :gun.open(to_charlist(host), port, %{protocols: [:http]}),
+ with {:ok, conn_pid} <- :gun.open(to_charlist(host), port, %{protocols: [:http], retry: 0}),
{:ok, _} <- :gun.await_up(conn_pid),
# TODO: nodeinfo-based support detection
# reference <- :gun.get(conn_pid, to_charlist(path)),
@@ -177,12 +170,14 @@ defmodule Pleroma.Web.FedSockets.Adapter.Gun do
{:gun_upgrade, ^conn_pid, ^ref, [<<"websocket">>], _} ->
{:ok, conn_pid}
- # mes ->
- # IO.inspect(mes)
- after
- 15_000 ->
- Logger.debug("Fedsocket timeout connecting to #{inspect(uri)}")
- {:error, :timeout}
+ {:gun_response, ^conn_pid, _, _, status, _} ->
+ {:error, {:ws_upgrade_failed, {:status, status}}}
+
+ {:gun_error, ^conn_pid, ^ref, reason} ->
+ {:error, {:ws_upgrade_failed, reason}}
+
+ {:gun_down, ^conn_pid, _, reason, _} ->
+ {:error, {:ws_upgrade_failed, reason}}
end
else
{:response, :nofin, 404, _} ->
diff --git a/lib/pleroma/web/fed_sockets/ingester_worker.ex b/lib/pleroma/web/fed_sockets/ingester_worker.ex
deleted file mode 100644
index 325f2a4ab..000000000
--- a/lib/pleroma/web/fed_sockets/ingester_worker.ex
+++ /dev/null
@@ -1,33 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Web.FedSockets.IngesterWorker do
- use Pleroma.Workers.WorkerHelper, queue: "ingestion_queue"
- require Logger
-
- alias Pleroma.Web.Federator
-
- @impl Oban.Worker
- def perform(%Job{args: %{"op" => "ingest", "object" => ingestee}}) do
- try do
- ingestee
- |> Jason.decode!()
- |> do_ingestion()
- rescue
- e ->
- Logger.error("IngesterWorker error - #{inspect(e)}")
- e
- end
- end
-
- defp do_ingestion(params) do
- case Federator.incoming_ap_doc(params) do
- {:error, reason} ->
- {:error, reason}
-
- {:ok, object} ->
- {:ok, object}
- end
- end
-end