aboutsummaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/pleroma/web/activity_pub/utils.ex9
-rw-r--r--lib/pleroma/web/federator/federator.ex132
-rw-r--r--lib/pleroma/web/federator/publisher.ex12
-rw-r--r--lib/pleroma/workers/publisher.ex25
-rw-r--r--lib/pleroma/workers/receiver.ex61
-rw-r--r--lib/pleroma/workers/subscriber.ex44
6 files changed, 171 insertions, 112 deletions
diff --git a/lib/pleroma/web/activity_pub/utils.ex b/lib/pleroma/web/activity_pub/utils.ex
index 39074888b..f0917f9d4 100644
--- a/lib/pleroma/web/activity_pub/utils.ex
+++ b/lib/pleroma/web/activity_pub/utils.ex
@@ -168,14 +168,7 @@ defmodule Pleroma.Web.ActivityPub.Utils do
"""
def maybe_federate(%Activity{local: true} = activity) do
if Pleroma.Config.get!([:instance, :federating]) do
- priority =
- case activity.data["type"] do
- "Delete" -> 10
- "Create" -> 1
- _ -> 5
- end
-
- Pleroma.Web.Federator.publish(activity, priority)
+ Pleroma.Web.Federator.publish(activity)
end
:ok
diff --git a/lib/pleroma/web/federator/federator.ex b/lib/pleroma/web/federator/federator.ex
index 97ec9d549..bb9eadfee 100644
--- a/lib/pleroma/web/federator/federator.ex
+++ b/lib/pleroma/web/federator/federator.ex
@@ -3,22 +3,15 @@
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.Federator do
- alias Pleroma.Activity
- alias Pleroma.Object.Containment
- alias Pleroma.User
- alias Pleroma.Web.ActivityPub.ActivityPub
- alias Pleroma.Web.ActivityPub.Transmogrifier
- alias Pleroma.Web.ActivityPub.Utils
- alias Pleroma.Web.Federator.Publisher
- alias Pleroma.Web.OStatus
- alias Pleroma.Web.Websub
+ alias Pleroma.Workers.Publisher, as: PublisherWorker
+ alias Pleroma.Workers.Receiver, as: ReceiverWorker
+ alias Pleroma.Workers.Subscriber, as: SubscriberWorker
require Logger
def init do
# 1 minute
- Process.sleep(1000 * 60)
- refresh_subscriptions()
+ refresh_subscriptions(schedule_in: 60)
end
@doc "Addresses [memory leaks on recursive replies fetching](https://git.pleroma.social/pleroma/pleroma/issues/161)"
@@ -36,111 +29,50 @@ defmodule Pleroma.Web.Federator do
# Client API
def incoming_doc(doc) do
- PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_doc, doc])
+ %{"op" => "incoming_doc", "body" => doc}
+ |> ReceiverWorker.new(worker_args(:federator_incoming))
+ |> Pleroma.Repo.insert()
end
def incoming_ap_doc(params) do
- PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_ap_doc, params])
+ %{"op" => "incoming_ap_doc", "params" => params}
+ |> ReceiverWorker.new(worker_args(:federator_incoming))
+ |> Pleroma.Repo.insert()
end
- def publish(activity, priority \\ 1) do
- PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish, activity], priority)
+ def publish(%{id: "pleroma:fakeid"} = activity) do
+ PublisherWorker.perform_publish(activity)
end
- def verify_websub(websub) do
- PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:verify_websub, websub])
- end
-
- def request_subscription(sub) do
- PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:request_subscription, sub])
- end
-
- def refresh_subscriptions do
- PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:refresh_subscriptions])
- end
-
- # Job Worker Callbacks
-
- def perform(:refresh_subscriptions) do
- Logger.debug("Federator running refresh subscriptions")
- Websub.refresh_subscriptions()
-
- spawn(fn ->
- # 6 hours
- Process.sleep(1000 * 60 * 60 * 6)
- refresh_subscriptions()
- end)
- end
-
- def perform(:request_subscription, websub) do
- Logger.debug("Refreshing #{websub.topic}")
-
- with {:ok, websub} <- Websub.request_subscription(websub) do
- Logger.debug("Successfully refreshed #{websub.topic}")
- else
- _e -> Logger.debug("Couldn't refresh #{websub.topic}")
- end
+ def publish(activity) do
+ %{"op" => "publish", "activity_id" => activity.id}
+ |> PublisherWorker.new(worker_args(:federator_outgoing))
+ |> Pleroma.Repo.insert()
end
- def perform(:publish, activity) do
- Logger.debug(fn -> "Running publish for #{activity.data["id"]}" end)
-
- with %User{} = actor <- User.get_cached_by_ap_id(activity.data["actor"]),
- {:ok, actor} <- User.ensure_keys_present(actor) do
- Publisher.publish(actor, activity)
- end
- end
-
- def perform(:verify_websub, websub) do
- Logger.debug(fn ->
- "Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})"
- end)
-
- Websub.verify(websub)
- end
-
- def perform(:incoming_doc, doc) do
- Logger.info("Got document, trying to parse")
- OStatus.handle_incoming(doc)
+ def verify_websub(websub) do
+ %{"op" => "verify_websub", "websub_id" => websub.id}
+ |> SubscriberWorker.new(worker_args(:federator_outgoing))
+ |> Pleroma.Repo.insert()
end
- def perform(:incoming_ap_doc, params) do
- Logger.info("Handling incoming AP activity")
-
- params = Utils.normalize_params(params)
-
- # NOTE: we use the actor ID to do the containment, this is fine because an
- # actor shouldn't be acting on objects outside their own AP server.
- with {:ok, _user} <- ap_enabled_actor(params["actor"]),
- nil <- Activity.normalize(params["id"]),
- :ok <- Containment.contain_origin_from_id(params["actor"], params),
- {:ok, activity} <- Transmogrifier.handle_incoming(params) do
- {:ok, activity}
- else
- %Activity{} ->
- Logger.info("Already had #{params["id"]}")
- :error
-
- _e ->
- # Just drop those for now
- Logger.info("Unhandled activity")
- Logger.info(Jason.encode!(params, pretty: true))
- :error
- end
+ def request_subscription(websub) do
+ %{"op" => "request_subscription", "websub_id" => websub.id}
+ |> SubscriberWorker.new(worker_args(:federator_outgoing))
+ |> Pleroma.Repo.insert()
end
- def perform(type, _) do
- Logger.debug(fn -> "Unknown task: #{type}" end)
- {:error, "Don't know what to do with this"}
+ def refresh_subscriptions(worker_args \\ []) do
+ %{"op" => "refresh_subscriptions"}
+ |> SubscriberWorker.new(worker_args ++ [max_attempts: 1] ++ worker_args(:federator_outgoing))
+ |> Pleroma.Repo.insert()
end
- def ap_enabled_actor(id) do
- user = User.get_cached_by_ap_id(id)
-
- if User.ap_enabled?(user) do
- {:ok, user}
+ defp worker_args(queue) do
+ if max_attempts = Pleroma.Config.get([:workers, :retries, queue]) do
+ [max_attempts: max_attempts]
else
- ActivityPub.make_user_from_ap_id(id)
+ []
end
end
end
diff --git a/lib/pleroma/web/federator/publisher.ex b/lib/pleroma/web/federator/publisher.ex
index e8c1bf17f..05d2be615 100644
--- a/lib/pleroma/web/federator/publisher.ex
+++ b/lib/pleroma/web/federator/publisher.ex
@@ -6,6 +6,7 @@ defmodule Pleroma.Web.Federator.Publisher do
alias Pleroma.Activity
alias Pleroma.Config
alias Pleroma.User
+ alias Pleroma.Workers.Publisher, as: PublisherWorker
require Logger
@@ -30,8 +31,15 @@ defmodule Pleroma.Web.Federator.Publisher do
"""
@spec enqueue_one(module(), Map.t()) :: :ok
def enqueue_one(module, %{} = params) do
- %{module: to_string(module), params: params}
- |> Pleroma.Workers.Publisher.new()
+ worker_args =
+ if max_attempts = Pleroma.Config.get([:workers, :retries, :federator_outgoing]) do
+ [max_attempts: max_attempts]
+ else
+ []
+ end
+
+ %{"op" => "publish_one", "module" => to_string(module), "params" => params}
+ |> PublisherWorker.new(worker_args)
|> Pleroma.Repo.insert()
end
diff --git a/lib/pleroma/workers/publisher.ex b/lib/pleroma/workers/publisher.ex
index 639794830..67871977a 100644
--- a/lib/pleroma/workers/publisher.ex
+++ b/lib/pleroma/workers/publisher.ex
@@ -3,12 +3,33 @@
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.Publisher do
- use Oban.Worker, queue: "federator_outgoing", max_attempts: 5
+ alias Pleroma.Activity
+ alias Pleroma.User
+
+ # Note: `max_attempts` is intended to be overridden in `new/1` call
+ use Oban.Worker,
+ queue: "federator_outgoing",
+ max_attempts: Pleroma.Config.get([:workers, :retries, :compile_time_default])
@impl Oban.Worker
- def perform(%Oban.Job{args: %{module: module_name, params: params}}) do
+ def perform(%{"op" => "publish", "activity_id" => activity_id}) do
+ with %Activity{} = activity <- Activity.get_by_id(activity_id) do
+ perform_publish(activity)
+ else
+ _ -> raise "Non-existing activity: #{activity_id}"
+ end
+ end
+
+ def perform(%{"op" => "publish_one", "module" => module_name, "params" => params}) do
module_name
|> String.to_atom()
|> apply(:publish_one, [params])
end
+
+ def perform_publish(%Activity{} = activity) do
+ with %User{} = actor <- User.get_cached_by_ap_id(activity.data["actor"]),
+ {:ok, actor} <- User.ensure_keys_present(actor) do
+ Pleroma.Web.Federator.Publisher.publish(actor, activity)
+ end
+ end
end
diff --git a/lib/pleroma/workers/receiver.ex b/lib/pleroma/workers/receiver.ex
new file mode 100644
index 000000000..43558b4e6
--- /dev/null
+++ b/lib/pleroma/workers/receiver.ex
@@ -0,0 +1,61 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.Receiver do
+ alias Pleroma.Activity
+ alias Pleroma.Object.Containment
+ alias Pleroma.User
+ alias Pleroma.Web.ActivityPub.ActivityPub
+ alias Pleroma.Web.ActivityPub.Transmogrifier
+ alias Pleroma.Web.ActivityPub.Utils
+ alias Pleroma.Web.OStatus
+
+ require Logger
+
+ # Note: `max_attempts` is intended to be overridden in `new/1` call
+ use Oban.Worker,
+ queue: "federator_incoming",
+ max_attempts: Pleroma.Config.get([:workers, :retries, :compile_time_default])
+
+ @impl Oban.Worker
+ def perform(%{"op" => "incoming_doc", "body" => doc}) do
+ Logger.info("Got incoming document, trying to parse")
+ OStatus.handle_incoming(doc)
+ end
+
+ def perform(%{"op" => "incoming_ap_doc", "params" => params}) do
+ Logger.info("Handling incoming AP activity")
+
+ params = Utils.normalize_params(params)
+
+ # NOTE: we use the actor ID to do the containment, this is fine because an
+ # actor shouldn't be acting on objects outside their own AP server.
+ with {:ok, _user} <- ap_enabled_actor(params["actor"]),
+ nil <- Activity.normalize(params["id"]),
+ :ok <- Containment.contain_origin_from_id(params["actor"], params),
+ {:ok, activity} <- Transmogrifier.handle_incoming(params) do
+ {:ok, activity}
+ else
+ %Activity{} ->
+ Logger.info("Already had #{params["id"]}")
+ :error
+
+ _e ->
+ # Just drop those for now
+ Logger.info("Unhandled activity")
+ Logger.info(Jason.encode!(params, pretty: true))
+ :error
+ end
+ end
+
+ defp ap_enabled_actor(id) do
+ user = User.get_cached_by_ap_id(id)
+
+ if User.ap_enabled?(user) do
+ {:ok, user}
+ else
+ ActivityPub.make_user_from_ap_id(id)
+ end
+ end
+end
diff --git a/lib/pleroma/workers/subscriber.ex b/lib/pleroma/workers/subscriber.ex
new file mode 100644
index 000000000..a8c01bb10
--- /dev/null
+++ b/lib/pleroma/workers/subscriber.ex
@@ -0,0 +1,44 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.Subscriber do
+ alias Pleroma.Repo
+ alias Pleroma.Web.Websub
+ alias Pleroma.Web.Websub.WebsubClientSubscription
+
+ require Logger
+
+ # Note: `max_attempts` is intended to be overridden in `new/1` call
+ use Oban.Worker,
+ queue: "federator_outgoing",
+ max_attempts: Pleroma.Config.get([:workers, :retries, :compile_time_default])
+
+ @impl Oban.Worker
+ def perform(%{"op" => "refresh_subscriptions"}) do
+ Websub.refresh_subscriptions()
+ # Schedule the next run in 6 hours
+ Pleroma.Web.Federator.refresh_subscriptions(schedule_in: 3600 * 6)
+ end
+
+ def perform(%{"op" => "request_subscription", "websub_id" => websub_id}) do
+ websub = Repo.get(WebsubClientSubscription, websub_id)
+ Logger.debug("Refreshing #{websub.topic}")
+
+ with {:ok, websub} <- Websub.request_subscription(websub) do
+ Logger.debug("Successfully refreshed #{websub.topic}")
+ else
+ _e -> Logger.debug("Couldn't refresh #{websub.topic}")
+ end
+ end
+
+ def perform(%{"op" => "verify_websub", "websub_id" => websub_id}) do
+ websub = Repo.get(WebsubClientSubscription, websub_id)
+
+ Logger.debug(fn ->
+ "Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})"
+ end)
+
+ Websub.verify(websub)
+ end
+end