From cad3d18b691d98a7e138dd0d605404c1d6d6f13c Mon Sep 17 00:00:00 2001 From: rinpatch Date: Wed, 23 Sep 2020 14:25:39 +0300 Subject: tmp --- lib/pleroma/web/fed_sockets/adapter.ex | 4 ++-- lib/pleroma/web/fed_sockets/adapter/cowboy.ex | 2 +- lib/pleroma/web/fed_sockets/adapter/gun.ex | 23 +++++++----------- lib/pleroma/web/fed_sockets/ingester_worker.ex | 33 -------------------------- 4 files changed, 12 insertions(+), 50 deletions(-) delete mode 100644 lib/pleroma/web/fed_sockets/ingester_worker.ex diff --git a/lib/pleroma/web/fed_sockets/adapter.ex b/lib/pleroma/web/fed_sockets/adapter.ex index edb22d1d9..f35c964c6 100644 --- a/lib/pleroma/web/fed_sockets/adapter.ex +++ b/lib/pleroma/web/fed_sockets/adapter.ex @@ -18,7 +18,7 @@ defmodule Pleroma.Web.FedSockets.Adapter do alias Pleroma.Web.ActivityPub.ObjectView alias Pleroma.Web.ActivityPub.UserView alias Pleroma.Web.ActivityPub.Visibility - alias Pleroma.Web.FedSockets.IngesterWorker + alias Pleroma.Web.Federator @type origin :: String.t() @type fetch_id :: integer() @@ -37,7 +37,7 @@ defmodule Pleroma.Web.FedSockets.Adapter do def process_message(%{"action" => "publish", "data" => data}, origin, waiting_fetches) do if Containment.contain_origin(origin, data) do - IngesterWorker.enqueue("ingest", %{"object" => data}) + Federator.incoming_ap_doc(data) end {:noreply, waiting_fetches} diff --git a/lib/pleroma/web/fed_sockets/adapter/cowboy.ex b/lib/pleroma/web/fed_sockets/adapter/cowboy.ex index 107d80849..a7b3835dd 100644 --- a/lib/pleroma/web/fed_sockets/adapter/cowboy.ex +++ b/lib/pleroma/web/fed_sockets/adapter/cowboy.ex @@ -92,7 +92,7 @@ defmodule Pleroma.Web.FedSockets.Adapter.Cowboy do case Registry.register(@registry, key, %Value{ adapter: __MODULE__, - adapter_state: %{last_fetch_id_ref: last_fetch_id_ref, waiting_fetches: %{}} + adapter_state: %{last_fetch_id_ref: last_fetch_id_ref} }) do {:ok, _owner} -> {:ok, %{origin: origin, waiting_fetches: %{}}} diff --git a/lib/pleroma/web/fed_sockets/adapter/gun.ex b/lib/pleroma/web/fed_sockets/adapter/gun.ex index dc8ae6db7..9984b666e 100644 --- a/lib/pleroma/web/fed_sockets/adapter/gun.ex +++ b/lib/pleroma/web/fed_sockets/adapter/gun.ex @@ -47,7 +47,6 @@ defmodule Pleroma.Web.FedSockets.Adapter.Gun do GenServer.cast(pid, {:await_connected, self()}) receive do - {:DOWN, ^monitor, _, _, {:shutdown, reason}} -> reason {:DOWN, ^monitor, _, _, reason} -> {:error, reason} {:await_connected, ^pid, conn_pid, last_fetch_id_ref} -> {:ok, conn_pid, last_fetch_id_ref} end @@ -130,12 +129,6 @@ defmodule Pleroma.Web.FedSockets.Adapter.Gun do {:noreply, %{state | waiting_fetches: waiting_fetches}} end - @impl true - def handle_info(:close, state) do - Logger.debug("Sending close frame !!!!!!!") - {:close, state} - end - @impl true def handle_info({:gun_down, _pid, _prot, :closed, _}, state) do {:stop, :normal, state} @@ -165,7 +158,7 @@ defmodule Pleroma.Web.FedSockets.Adapter.Gun do def initiate_connection(uri) do %{host: host, port: port} = URI.parse(uri) - with {:ok, conn_pid} <- :gun.open(to_charlist(host), port, %{protocols: [:http]}), + with {:ok, conn_pid} <- :gun.open(to_charlist(host), port, %{protocols: [:http], retry: 0}), {:ok, _} <- :gun.await_up(conn_pid), # TODO: nodeinfo-based support detection # reference <- :gun.get(conn_pid, to_charlist(path)), @@ -177,12 +170,14 @@ defmodule Pleroma.Web.FedSockets.Adapter.Gun do {:gun_upgrade, ^conn_pid, ^ref, [<<"websocket">>], _} -> {:ok, conn_pid} - # mes -> - # IO.inspect(mes) - after - 15_000 -> - Logger.debug("Fedsocket timeout connecting to #{inspect(uri)}") - {:error, :timeout} + {:gun_response, ^conn_pid, _, _, status, _} -> + {:error, {:ws_upgrade_failed, {:status, status}}} + + {:gun_error, ^conn_pid, ^ref, reason} -> + {:error, {:ws_upgrade_failed, reason}} + + {:gun_down, ^conn_pid, _, reason, _} -> + {:error, {:ws_upgrade_failed, reason}} end else {:response, :nofin, 404, _} -> diff --git a/lib/pleroma/web/fed_sockets/ingester_worker.ex b/lib/pleroma/web/fed_sockets/ingester_worker.ex deleted file mode 100644 index 325f2a4ab..000000000 --- a/lib/pleroma/web/fed_sockets/ingester_worker.ex +++ /dev/null @@ -1,33 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2020 Pleroma Authors -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Web.FedSockets.IngesterWorker do - use Pleroma.Workers.WorkerHelper, queue: "ingestion_queue" - require Logger - - alias Pleroma.Web.Federator - - @impl Oban.Worker - def perform(%Job{args: %{"op" => "ingest", "object" => ingestee}}) do - try do - ingestee - |> Jason.decode!() - |> do_ingestion() - rescue - e -> - Logger.error("IngesterWorker error - #{inspect(e)}") - e - end - end - - defp do_ingestion(params) do - case Federator.incoming_ap_doc(params) do - {:error, reason} -> - {:error, reason} - - {:ok, object} -> - {:ok, object} - end - end -end -- cgit v1.2.3