From a90ea8ba1562818b025f677ffeea35f7ca08ddf2 Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Sat, 31 Aug 2019 19:08:56 +0300 Subject: [#1149] Addressed code review comments (code style, jobs pruning etc.). --- lib/pleroma/activity_expiration_worker.ex | 6 ++--- lib/pleroma/application.ex | 2 +- lib/pleroma/digest_email_worker.ex | 4 +-- lib/pleroma/emails/mailer.ex | 4 +-- lib/pleroma/scheduled_activity_worker.ex | 2 +- lib/pleroma/user.ex | 2 +- lib/pleroma/web/activity_pub/activity_pub.ex | 2 +- .../activity_pub/mrf/mediaproxy_warming_policy.ex | 2 +- lib/pleroma/web/activity_pub/publisher.ex | 2 +- lib/pleroma/web/activity_pub/transmogrifier.ex | 4 +-- lib/pleroma/web/federator/federator.ex | 8 +++--- lib/pleroma/web/federator/publisher.ex | 9 ++----- lib/pleroma/web/oauth/token/clean_worker.ex | 2 +- lib/pleroma/web/push/push.ex | 6 ++--- lib/pleroma/web/salmon/salmon.ex | 2 +- lib/pleroma/workers/activity_expiration_worker.ex | 21 ++++++++++++++++ lib/pleroma/workers/background_worker.ex | 19 +++++--------- lib/pleroma/workers/helper.ex | 13 ---------- lib/pleroma/workers/mailer.ex | 27 -------------------- lib/pleroma/workers/mailer_worker.ex | 26 +++++++++++++++++++ lib/pleroma/workers/publisher.ex | 24 ------------------ lib/pleroma/workers/publisher_worker.ex | 28 +++++++++++++++++++++ lib/pleroma/workers/receiver.ex | 21 ---------------- lib/pleroma/workers/receiver_worker.ex | 21 ++++++++++++++++ lib/pleroma/workers/scheduled_activity_worker.ex | 2 +- lib/pleroma/workers/subscriber.ex | 29 ---------------------- lib/pleroma/workers/subscriber_worker.ex | 29 ++++++++++++++++++++++ lib/pleroma/workers/transmogrifier.ex | 18 -------------- lib/pleroma/workers/transmogrifier_worker.ex | 18 ++++++++++++++ lib/pleroma/workers/web_pusher.ex | 19 -------------- lib/pleroma/workers/web_pusher_worker.ex | 19 ++++++++++++++ lib/pleroma/workers/worker_helper.ex | 23 +++++++++++++++++ 32 files changed, 218 insertions(+), 196 deletions(-) create mode 100644 lib/pleroma/workers/activity_expiration_worker.ex delete mode 100644 lib/pleroma/workers/helper.ex delete mode 100644 lib/pleroma/workers/mailer.ex create mode 100644 lib/pleroma/workers/mailer_worker.ex delete mode 100644 lib/pleroma/workers/publisher.ex create mode 100644 lib/pleroma/workers/publisher_worker.ex delete mode 100644 lib/pleroma/workers/receiver.ex create mode 100644 lib/pleroma/workers/receiver_worker.ex delete mode 100644 lib/pleroma/workers/subscriber.ex create mode 100644 lib/pleroma/workers/subscriber_worker.ex delete mode 100644 lib/pleroma/workers/transmogrifier.ex create mode 100644 lib/pleroma/workers/transmogrifier_worker.ex delete mode 100644 lib/pleroma/workers/web_pusher.ex create mode 100644 lib/pleroma/workers/web_pusher_worker.ex create mode 100644 lib/pleroma/workers/worker_helper.ex (limited to 'lib') diff --git a/lib/pleroma/activity_expiration_worker.ex b/lib/pleroma/activity_expiration_worker.ex index 5c0c53232..7aba7eece 100644 --- a/lib/pleroma/activity_expiration_worker.ex +++ b/lib/pleroma/activity_expiration_worker.ex @@ -9,13 +9,13 @@ defmodule Pleroma.ActivityExpirationWorker do alias Pleroma.Repo alias Pleroma.User alias Pleroma.Web.CommonAPI - alias Pleroma.Workers.BackgroundWorker + alias Pleroma.Workers.ActivityExpirationWorker require Logger use GenServer import Ecto.Query - defdelegate worker_args(queue), to: Pleroma.Workers.Helper + import Pleroma.Workers.WorkerHelper, only: [worker_args: 1] @schedule_interval :timer.minutes(1) @@ -57,7 +57,7 @@ defmodule Pleroma.ActivityExpirationWorker do "op" => "activity_expiration", "activity_expiration_id" => expiration.id } - |> BackgroundWorker.new(worker_args(:activity_expiration)) + |> ActivityExpirationWorker.new(worker_args(:activity_expiration)) |> Repo.insert() end) diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex index 7d38ed5c4..f8f866dbd 100644 --- a/lib/pleroma/application.ex +++ b/lib/pleroma/application.ex @@ -43,7 +43,7 @@ defmodule Pleroma.Application do hackney_pool_children() ++ [ Pleroma.Stats, - {Oban, Application.get_env(:pleroma, Oban)}, + {Oban, Pleroma.Config.get(Oban)}, %{ id: :web_push_init, start: {Task, :start_link, [&Pleroma.Web.Push.init/0]}, diff --git a/lib/pleroma/digest_email_worker.ex b/lib/pleroma/digest_email_worker.ex index ffc48bfab..4ab2a4ef4 100644 --- a/lib/pleroma/digest_email_worker.ex +++ b/lib/pleroma/digest_email_worker.ex @@ -4,11 +4,11 @@ defmodule Pleroma.DigestEmailWorker do alias Pleroma.Repo - alias Pleroma.Workers.Mailer, as: MailerWorker + alias Pleroma.Workers.MailerWorker import Ecto.Query - defdelegate worker_args(queue), to: Pleroma.Workers.Helper + import Pleroma.Workers.WorkerHelper, only: [worker_args: 1] def perform do config = Pleroma.Config.get([:email_notifications, :digest]) diff --git a/lib/pleroma/emails/mailer.ex b/lib/pleroma/emails/mailer.ex index bb534f602..9cbe7313c 100644 --- a/lib/pleroma/emails/mailer.ex +++ b/lib/pleroma/emails/mailer.ex @@ -10,7 +10,7 @@ defmodule Pleroma.Emails.Mailer do """ alias Pleroma.Repo - alias Pleroma.Workers.Mailer, as: MailerWorker + alias Pleroma.Workers.MailerWorker alias Swoosh.DeliveryError @otp_app :pleroma @@ -19,7 +19,7 @@ defmodule Pleroma.Emails.Mailer do @spec enabled?() :: boolean() def enabled?, do: Pleroma.Config.get([__MODULE__, :enabled]) - defdelegate worker_args(queue), to: Pleroma.Workers.Helper + import Pleroma.Workers.WorkerHelper, only: [worker_args: 1] @doc "add email to queue" def deliver_async(email, config \\ []) do diff --git a/lib/pleroma/scheduled_activity_worker.ex b/lib/pleroma/scheduled_activity_worker.ex index a01fb4fcb..8bf534f42 100644 --- a/lib/pleroma/scheduled_activity_worker.ex +++ b/lib/pleroma/scheduled_activity_worker.ex @@ -18,7 +18,7 @@ defmodule Pleroma.ScheduledActivityWorker do @schedule_interval :timer.minutes(1) - defdelegate worker_args(queue), to: Pleroma.Workers.Helper + import Pleroma.Workers.WorkerHelper, only: [worker_args: 1] def start_link(_) do GenServer.start_link(__MODULE__, nil) diff --git a/lib/pleroma/user.ex b/lib/pleroma/user.ex index 18bba0fbb..abfa063fb 100644 --- a/lib/pleroma/user.ex +++ b/lib/pleroma/user.ex @@ -41,7 +41,7 @@ defmodule Pleroma.User do @strict_local_nickname_regex ~r/^[a-zA-Z\d]+$/ @extended_local_nickname_regex ~r/^[a-zA-Z\d_-]+$/ - defdelegate worker_args(queue), to: Pleroma.Workers.Helper + import Pleroma.Workers.WorkerHelper, only: [worker_args: 1] schema "users" do field(:bio, :string) diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index 50279cca5..74c5eb91c 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -26,7 +26,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do require Logger require Pleroma.Constants - defdelegate worker_args(queue), to: Pleroma.Workers.Helper + import Pleroma.Workers.WorkerHelper, only: [worker_args: 1] # For Announce activities, we filter the recipients based on following status for any actors # that match actual users. See issue #164 for more information about why this is necessary. diff --git a/lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex b/lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex index b188164ee..178321558 100644 --- a/lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex +++ b/lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex @@ -18,7 +18,7 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do recv_timeout: 10_000 ] - defdelegate worker_args(queue), to: Pleroma.Workers.Helper + import Pleroma.Workers.WorkerHelper, only: [worker_args: 1] def perform(:prefetch, url) do Logger.info("Prefetching #{inspect(url)}") diff --git a/lib/pleroma/web/activity_pub/publisher.ex b/lib/pleroma/web/activity_pub/publisher.ex index 24d101dc8..a6322e25a 100644 --- a/lib/pleroma/web/activity_pub/publisher.ex +++ b/lib/pleroma/web/activity_pub/publisher.ex @@ -85,7 +85,7 @@ defmodule Pleroma.Web.ActivityPub.Publisher do end def publish_one(%{actor_id: actor_id} = params) do - actor = User.get_by_id(actor_id) + actor = User.get_cached_by_id(actor_id) params |> Map.delete(:actor_id) diff --git a/lib/pleroma/web/activity_pub/transmogrifier.ex b/lib/pleroma/web/activity_pub/transmogrifier.ex index b068d28a7..9437f9a16 100644 --- a/lib/pleroma/web/activity_pub/transmogrifier.ex +++ b/lib/pleroma/web/activity_pub/transmogrifier.ex @@ -15,14 +15,14 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do alias Pleroma.Web.ActivityPub.Utils alias Pleroma.Web.ActivityPub.Visibility alias Pleroma.Web.Federator - alias Pleroma.Workers.Transmogrifier, as: TransmogrifierWorker + alias Pleroma.Workers.TransmogrifierWorker import Ecto.Query require Logger require Pleroma.Constants - defdelegate worker_args(queue), to: Pleroma.Workers.Helper + import Pleroma.Workers.WorkerHelper, only: [worker_args: 1] @doc """ Modifies an incoming AP object (mastodon format) to our internal format. diff --git a/lib/pleroma/web/federator/federator.ex b/lib/pleroma/web/federator/federator.ex index cf7e50fee..8f43066e3 100644 --- a/lib/pleroma/web/federator/federator.ex +++ b/lib/pleroma/web/federator/federator.ex @@ -12,13 +12,13 @@ defmodule Pleroma.Web.Federator do alias Pleroma.Web.Federator.Publisher alias Pleroma.Web.OStatus alias Pleroma.Web.Websub - alias Pleroma.Workers.Publisher, as: PublisherWorker - alias Pleroma.Workers.Receiver, as: ReceiverWorker - alias Pleroma.Workers.Subscriber, as: SubscriberWorker + alias Pleroma.Workers.PublisherWorker + alias Pleroma.Workers.ReceiverWorker + alias Pleroma.Workers.SubscriberWorker require Logger - defdelegate worker_args(queue), to: Pleroma.Workers.Helper + import Pleroma.Workers.WorkerHelper, only: [worker_args: 1] def init do # To do: consider removing this call in favor of scheduled execution (`quantum`-based) diff --git a/lib/pleroma/web/federator/publisher.ex b/lib/pleroma/web/federator/publisher.ex index 05d2be615..42be109ab 100644 --- a/lib/pleroma/web/federator/publisher.ex +++ b/lib/pleroma/web/federator/publisher.ex @@ -6,7 +6,7 @@ defmodule Pleroma.Web.Federator.Publisher do alias Pleroma.Activity alias Pleroma.Config alias Pleroma.User - alias Pleroma.Workers.Publisher, as: PublisherWorker + alias Pleroma.Workers.PublisherWorker require Logger @@ -31,12 +31,7 @@ defmodule Pleroma.Web.Federator.Publisher do """ @spec enqueue_one(module(), Map.t()) :: :ok def enqueue_one(module, %{} = params) do - worker_args = - if max_attempts = Pleroma.Config.get([:workers, :retries, :federator_outgoing]) do - [max_attempts: max_attempts] - else - [] - end + worker_args = Pleroma.Workers.WorkerHelper.worker_args(:federator_outgoing) %{"op" => "publish_one", "module" => to_string(module), "params" => params} |> PublisherWorker.new(worker_args) diff --git a/lib/pleroma/web/oauth/token/clean_worker.ex b/lib/pleroma/web/oauth/token/clean_worker.ex index 943e73289..b150a68a7 100644 --- a/lib/pleroma/web/oauth/token/clean_worker.ex +++ b/lib/pleroma/web/oauth/token/clean_worker.ex @@ -20,7 +20,7 @@ defmodule Pleroma.Web.OAuth.Token.CleanWorker do alias Pleroma.Web.OAuth.Token alias Pleroma.Workers.BackgroundWorker - defdelegate worker_args(queue), to: Pleroma.Workers.Helper + import Pleroma.Workers.WorkerHelper, only: [worker_args: 1] def start_link(_), do: GenServer.start_link(__MODULE__, %{}) diff --git a/lib/pleroma/web/push/push.ex b/lib/pleroma/web/push/push.ex index b4f0e5127..4973b529c 100644 --- a/lib/pleroma/web/push/push.ex +++ b/lib/pleroma/web/push/push.ex @@ -4,11 +4,11 @@ defmodule Pleroma.Web.Push do alias Pleroma.Repo - alias Pleroma.Workers.WebPusher + alias Pleroma.Workers.WebPusherWorker require Logger - defdelegate worker_args(queue), to: Pleroma.Workers.Helper + import Pleroma.Workers.WorkerHelper, only: [worker_args: 1] def init do unless enabled() do @@ -36,7 +36,7 @@ defmodule Pleroma.Web.Push do def send(notification) do %{"op" => "web_push", "notification_id" => notification.id} - |> WebPusher.new(worker_args(:web_push)) + |> WebPusherWorker.new(worker_args(:web_push)) |> Repo.insert() end end diff --git a/lib/pleroma/web/salmon/salmon.ex b/lib/pleroma/web/salmon/salmon.ex index bbaa293fd..8ba7380c0 100644 --- a/lib/pleroma/web/salmon/salmon.ex +++ b/lib/pleroma/web/salmon/salmon.ex @@ -171,7 +171,7 @@ defmodule Pleroma.Web.Salmon do end def publish_one(%{recipient_id: recipient_id} = params) do - recipient = User.get_by_id(recipient_id) + recipient = User.get_cached_by_id(recipient_id) params |> Map.delete(:recipient_id) diff --git a/lib/pleroma/workers/activity_expiration_worker.ex b/lib/pleroma/workers/activity_expiration_worker.ex new file mode 100644 index 000000000..0b491eabb --- /dev/null +++ b/lib/pleroma/workers/activity_expiration_worker.ex @@ -0,0 +1,21 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.ActivityExpirationWorker do + # Note: `max_attempts` is intended to be overridden in `new/2` call + use Oban.Worker, + queue: "activity_expiration", + max_attempts: 1 + + @impl Oban.Worker + def perform( + %{ + "op" => "activity_expiration", + "activity_expiration_id" => activity_expiration_id + }, + _job + ) do + Pleroma.ActivityExpirationWorker.perform(:execute, activity_expiration_id) + end +end diff --git a/lib/pleroma/workers/background_worker.ex b/lib/pleroma/workers/background_worker.ex index fbce7d789..7b5575a5f 100644 --- a/lib/pleroma/workers/background_worker.ex +++ b/lib/pleroma/workers/background_worker.ex @@ -8,24 +8,24 @@ defmodule Pleroma.Workers.BackgroundWorker do alias Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy alias Pleroma.Web.OAuth.Token.CleanWorker - # Note: `max_attempts` is intended to be overridden in `new/1` call + # Note: `max_attempts` is intended to be overridden in `new/2` call use Oban.Worker, queue: "background", max_attempts: 1 @impl Oban.Worker def perform(%{"op" => "fetch_initial_posts", "user_id" => user_id}, _job) do - user = User.get_by_id(user_id) + user = User.get_cached_by_id(user_id) User.perform(:fetch_initial_posts, user) end def perform(%{"op" => "deactivate_user", "user_id" => user_id, "status" => status}, _job) do - user = User.get_by_id(user_id) + user = User.get_cached_by_id(user_id) User.perform(:deactivate_async, user, status) end def perform(%{"op" => "delete_user", "user_id" => user_id}, _job) do - user = User.get_by_id(user_id) + user = User.get_cached_by_id(user_id) User.perform(:delete, user) end @@ -37,7 +37,7 @@ defmodule Pleroma.Workers.BackgroundWorker do }, _job ) do - blocker = User.get_by_id(blocker_id) + blocker = User.get_cached_by_id(blocker_id) User.perform(:blocks_import, blocker, blocked_identifiers) end @@ -49,7 +49,7 @@ defmodule Pleroma.Workers.BackgroundWorker do }, _job ) do - follower = User.get_by_id(follower_id) + follower = User.get_cached_by_id(follower_id) User.perform(:follow_import, follower, followed_identifiers) end @@ -69,11 +69,4 @@ defmodule Pleroma.Workers.BackgroundWorker do activity = Activity.get_by_id(activity_id) Pleroma.Web.RichMedia.Helpers.perform(:fetch, activity) end - - def perform( - %{"op" => "activity_expiration", "activity_expiration_id" => activity_expiration_id}, - _job - ) do - Pleroma.ActivityExpirationWorker.perform(:execute, activity_expiration_id) - end end diff --git a/lib/pleroma/workers/helper.ex b/lib/pleroma/workers/helper.ex deleted file mode 100644 index 3286ce0e8..000000000 --- a/lib/pleroma/workers/helper.ex +++ /dev/null @@ -1,13 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2019 Pleroma Authors -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Workers.Helper do - def worker_args(queue) do - if max_attempts = Pleroma.Config.get([:workers, :retries, queue]) do - [max_attempts: max_attempts] - else - [] - end - end -end diff --git a/lib/pleroma/workers/mailer.ex b/lib/pleroma/workers/mailer.ex deleted file mode 100644 index 1cce2ea03..000000000 --- a/lib/pleroma/workers/mailer.ex +++ /dev/null @@ -1,27 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2019 Pleroma Authors -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Workers.Mailer do - alias Pleroma.User - - # Note: `max_attempts` is intended to be overridden in `new/1` call - use Oban.Worker, - queue: "mailer", - max_attempts: 1 - - @impl Oban.Worker - def perform(%{"op" => "email", "encoded_email" => encoded_email, "config" => config}, _job) do - email = - encoded_email - |> Base.decode64!() - |> :erlang.binary_to_term() - - Pleroma.Emails.Mailer.deliver(email, config) - end - - def perform(%{"op" => "digest_email", "user_id" => user_id}, _job) do - user = User.get_by_id(user_id) - Pleroma.DigestEmailWorker.perform(user) - end -end diff --git a/lib/pleroma/workers/mailer_worker.ex b/lib/pleroma/workers/mailer_worker.ex new file mode 100644 index 000000000..4f73d61bc --- /dev/null +++ b/lib/pleroma/workers/mailer_worker.ex @@ -0,0 +1,26 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.MailerWorker do + alias Pleroma.User + + # Note: `max_attempts` is intended to be overridden in `new/2` call + use Oban.Worker, + queue: "mailer", + max_attempts: 1 + + @impl Oban.Worker + def perform(%{"op" => "email", "encoded_email" => encoded_email, "config" => config}, _job) do + encoded_email + |> Base.decode64!() + |> :erlang.binary_to_term() + |> Pleroma.Emails.Mailer.deliver(config) + end + + def perform(%{"op" => "digest_email", "user_id" => user_id}, _job) do + user_id + |> User.get_cached_by_id() + |> Pleroma.DigestEmailWorker.perform() + end +end diff --git a/lib/pleroma/workers/publisher.ex b/lib/pleroma/workers/publisher.ex deleted file mode 100644 index 00fae99c7..000000000 --- a/lib/pleroma/workers/publisher.ex +++ /dev/null @@ -1,24 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2019 Pleroma Authors -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Workers.Publisher do - alias Pleroma.Activity - alias Pleroma.Web.Federator - - # Note: `max_attempts` is intended to be overridden in `new/1` call - use Oban.Worker, - queue: "federator_outgoing", - max_attempts: 1 - - @impl Oban.Worker - def perform(%{"op" => "publish", "activity_id" => activity_id}, _job) do - activity = Activity.get_by_id(activity_id) - Federator.perform(:publish, activity) - end - - def perform(%{"op" => "publish_one", "module" => module_name, "params" => params}, _job) do - params = Map.new(params, fn {k, v} -> {String.to_atom(k), v} end) - Federator.perform(:publish_one, String.to_atom(module_name), params) - end -end diff --git a/lib/pleroma/workers/publisher_worker.ex b/lib/pleroma/workers/publisher_worker.ex new file mode 100644 index 000000000..5671d2a29 --- /dev/null +++ b/lib/pleroma/workers/publisher_worker.ex @@ -0,0 +1,28 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.PublisherWorker do + alias Pleroma.Activity + alias Pleroma.Web.Federator + + # Note: `max_attempts` is intended to be overridden in `new/2` call + use Oban.Worker, + queue: "federator_outgoing", + max_attempts: 1 + + def backoff(attempt) when is_integer(attempt) do + Pleroma.Workers.WorkerHelper.sidekiq_backoff(attempt, 5) + end + + @impl Oban.Worker + def perform(%{"op" => "publish", "activity_id" => activity_id}, _job) do + activity = Activity.get_by_id(activity_id) + Federator.perform(:publish, activity) + end + + def perform(%{"op" => "publish_one", "module" => module_name, "params" => params}, _job) do + params = Map.new(params, fn {k, v} -> {String.to_atom(k), v} end) + Federator.perform(:publish_one, String.to_atom(module_name), params) + end +end diff --git a/lib/pleroma/workers/receiver.ex b/lib/pleroma/workers/receiver.ex deleted file mode 100644 index 4ee270d74..000000000 --- a/lib/pleroma/workers/receiver.ex +++ /dev/null @@ -1,21 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2019 Pleroma Authors -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Workers.Receiver do - alias Pleroma.Web.Federator - - # Note: `max_attempts` is intended to be overridden in `new/1` call - use Oban.Worker, - queue: "federator_incoming", - max_attempts: 1 - - @impl Oban.Worker - def perform(%{"op" => "incoming_doc", "body" => doc}, _job) do - Federator.perform(:incoming_doc, doc) - end - - def perform(%{"op" => "incoming_ap_doc", "params" => params}, _job) do - Federator.perform(:incoming_ap_doc, params) - end -end diff --git a/lib/pleroma/workers/receiver_worker.ex b/lib/pleroma/workers/receiver_worker.ex new file mode 100644 index 000000000..cdce630f2 --- /dev/null +++ b/lib/pleroma/workers/receiver_worker.ex @@ -0,0 +1,21 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.ReceiverWorker do + alias Pleroma.Web.Federator + + # Note: `max_attempts` is intended to be overridden in `new/2` call + use Oban.Worker, + queue: "federator_incoming", + max_attempts: 1 + + @impl Oban.Worker + def perform(%{"op" => "incoming_doc", "body" => doc}, _job) do + Federator.perform(:incoming_doc, doc) + end + + def perform(%{"op" => "incoming_ap_doc", "params" => params}, _job) do + Federator.perform(:incoming_ap_doc, params) + end +end diff --git a/lib/pleroma/workers/scheduled_activity_worker.ex b/lib/pleroma/workers/scheduled_activity_worker.ex index d9724c78a..4094411ae 100644 --- a/lib/pleroma/workers/scheduled_activity_worker.ex +++ b/lib/pleroma/workers/scheduled_activity_worker.ex @@ -3,7 +3,7 @@ # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Workers.ScheduledActivityWorker do - # Note: `max_attempts` is intended to be overridden in `new/1` call + # Note: `max_attempts` is intended to be overridden in `new/2` call use Oban.Worker, queue: "scheduled_activities", max_attempts: 1 diff --git a/lib/pleroma/workers/subscriber.ex b/lib/pleroma/workers/subscriber.ex deleted file mode 100644 index e960b35bf..000000000 --- a/lib/pleroma/workers/subscriber.ex +++ /dev/null @@ -1,29 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2019 Pleroma Authors -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Workers.Subscriber do - alias Pleroma.Repo - alias Pleroma.Web.Federator - alias Pleroma.Web.Websub - - # Note: `max_attempts` is intended to be overridden in `new/1` call - use Oban.Worker, - queue: "federator_outgoing", - max_attempts: 1 - - @impl Oban.Worker - def perform(%{"op" => "refresh_subscriptions"}, _job) do - Federator.perform(:refresh_subscriptions) - end - - def perform(%{"op" => "request_subscription", "websub_id" => websub_id}, _job) do - websub = Repo.get(Websub.WebsubClientSubscription, websub_id) - Federator.perform(:request_subscription, websub) - end - - def perform(%{"op" => "verify_websub", "websub_id" => websub_id}, _job) do - websub = Repo.get(Websub.WebsubServerSubscription, websub_id) - Federator.perform(:verify_websub, websub) - end -end diff --git a/lib/pleroma/workers/subscriber_worker.ex b/lib/pleroma/workers/subscriber_worker.ex new file mode 100644 index 000000000..22d1dc956 --- /dev/null +++ b/lib/pleroma/workers/subscriber_worker.ex @@ -0,0 +1,29 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.SubscriberWorker do + alias Pleroma.Repo + alias Pleroma.Web.Federator + alias Pleroma.Web.Websub + + # Note: `max_attempts` is intended to be overridden in `new/2` call + use Oban.Worker, + queue: "federator_outgoing", + max_attempts: 1 + + @impl Oban.Worker + def perform(%{"op" => "refresh_subscriptions"}, _job) do + Federator.perform(:refresh_subscriptions) + end + + def perform(%{"op" => "request_subscription", "websub_id" => websub_id}, _job) do + websub = Repo.get(Websub.WebsubClientSubscription, websub_id) + Federator.perform(:request_subscription, websub) + end + + def perform(%{"op" => "verify_websub", "websub_id" => websub_id}, _job) do + websub = Repo.get(Websub.WebsubServerSubscription, websub_id) + Federator.perform(:verify_websub, websub) + end +end diff --git a/lib/pleroma/workers/transmogrifier.ex b/lib/pleroma/workers/transmogrifier.ex deleted file mode 100644 index e13202c06..000000000 --- a/lib/pleroma/workers/transmogrifier.ex +++ /dev/null @@ -1,18 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2019 Pleroma Authors -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Workers.Transmogrifier do - alias Pleroma.User - - # Note: `max_attempts` is intended to be overridden in `new/1` call - use Oban.Worker, - queue: "transmogrifier", - max_attempts: 1 - - @impl Oban.Worker - def perform(%{"op" => "user_upgrade", "user_id" => user_id}, _job) do - user = User.get_by_id(user_id) - Pleroma.Web.ActivityPub.Transmogrifier.perform(:user_upgrade, user) - end -end diff --git a/lib/pleroma/workers/transmogrifier_worker.ex b/lib/pleroma/workers/transmogrifier_worker.ex new file mode 100644 index 000000000..6f5c1a2f2 --- /dev/null +++ b/lib/pleroma/workers/transmogrifier_worker.ex @@ -0,0 +1,18 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.TransmogrifierWorker do + alias Pleroma.User + + # Note: `max_attempts` is intended to be overridden in `new/2` call + use Oban.Worker, + queue: "transmogrifier", + max_attempts: 1 + + @impl Oban.Worker + def perform(%{"op" => "user_upgrade", "user_id" => user_id}, _job) do + user = User.get_cached_by_id(user_id) + Pleroma.Web.ActivityPub.Transmogrifier.perform(:user_upgrade, user) + end +end diff --git a/lib/pleroma/workers/web_pusher.ex b/lib/pleroma/workers/web_pusher.ex deleted file mode 100644 index 7b78bb3ea..000000000 --- a/lib/pleroma/workers/web_pusher.ex +++ /dev/null @@ -1,19 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2019 Pleroma Authors -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Workers.WebPusher do - alias Pleroma.Notification - alias Pleroma.Repo - - # Note: `max_attempts` is intended to be overridden in `new/1` call - use Oban.Worker, - queue: "web_push", - max_attempts: 1 - - @impl Oban.Worker - def perform(%{"op" => "web_push", "notification_id" => notification_id}, _job) do - notification = Repo.get(Notification, notification_id) - Pleroma.Web.Push.Impl.perform(notification) - end -end diff --git a/lib/pleroma/workers/web_pusher_worker.ex b/lib/pleroma/workers/web_pusher_worker.ex new file mode 100644 index 000000000..2b1d3b99a --- /dev/null +++ b/lib/pleroma/workers/web_pusher_worker.ex @@ -0,0 +1,19 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.WebPusherWorker do + alias Pleroma.Notification + alias Pleroma.Repo + + # Note: `max_attempts` is intended to be overridden in `new/2` call + use Oban.Worker, + queue: "web_push", + max_attempts: 1 + + @impl Oban.Worker + def perform(%{"op" => "web_push", "notification_id" => notification_id}, _job) do + notification = Repo.get(Notification, notification_id) + Pleroma.Web.Push.Impl.perform(notification) + end +end diff --git a/lib/pleroma/workers/worker_helper.ex b/lib/pleroma/workers/worker_helper.ex new file mode 100644 index 000000000..f9ed2e64d --- /dev/null +++ b/lib/pleroma/workers/worker_helper.ex @@ -0,0 +1,23 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.WorkerHelper do + alias Pleroma.Config + + def worker_args(queue) do + case Config.get([:workers, :retries, queue]) do + nil -> [] + max_attempts -> [max_attempts: max_attempts] + end + end + + def sidekiq_backoff(attempt, pow \\ 4, base_backoff \\ 15) do + backoff = + :math.pow(attempt, pow) + + base_backoff + + :rand.uniform(2 * base_backoff) * attempt + + trunc(backoff) + end +end -- cgit v1.2.3