aboutsummaryrefslogtreecommitdiff
path: root/lib/pleroma/workers
diff options
context:
space:
mode:
Diffstat (limited to 'lib/pleroma/workers')
-rw-r--r--lib/pleroma/workers/activity_expiration_worker.ex2
-rw-r--r--lib/pleroma/workers/background_worker.ex2
-rw-r--r--lib/pleroma/workers/digest_emails_worker.ex21
-rw-r--r--lib/pleroma/workers/mailer_worker.ex10
-rw-r--r--lib/pleroma/workers/publisher_worker.ex2
-rw-r--r--lib/pleroma/workers/receiver_worker.ex2
-rw-r--r--lib/pleroma/workers/scheduled_activity_worker.ex2
-rw-r--r--lib/pleroma/workers/subscriber_worker.ex2
-rw-r--r--lib/pleroma/workers/transmogrifier_worker.ex2
-rw-r--r--lib/pleroma/workers/web_pusher_worker.ex2
-rw-r--r--lib/pleroma/workers/worker_helper.ex18
11 files changed, 57 insertions, 8 deletions
diff --git a/lib/pleroma/workers/activity_expiration_worker.ex b/lib/pleroma/workers/activity_expiration_worker.ex
index 0b491eabb..60dd3feba 100644
--- a/lib/pleroma/workers/activity_expiration_worker.ex
+++ b/lib/pleroma/workers/activity_expiration_worker.ex
@@ -8,6 +8,8 @@ defmodule Pleroma.Workers.ActivityExpirationWorker do
queue: "activity_expiration",
max_attempts: 1
+ use Pleroma.Workers.WorkerHelper, queue: "activity_expiration"
+
@impl Oban.Worker
def perform(
%{
diff --git a/lib/pleroma/workers/background_worker.ex b/lib/pleroma/workers/background_worker.ex
index 7b5575a5f..b9aef3a92 100644
--- a/lib/pleroma/workers/background_worker.ex
+++ b/lib/pleroma/workers/background_worker.ex
@@ -13,6 +13,8 @@ defmodule Pleroma.Workers.BackgroundWorker do
queue: "background",
max_attempts: 1
+ use Pleroma.Workers.WorkerHelper, queue: "background"
+
@impl Oban.Worker
def perform(%{"op" => "fetch_initial_posts", "user_id" => user_id}, _job) do
user = User.get_cached_by_id(user_id)
diff --git a/lib/pleroma/workers/digest_emails_worker.ex b/lib/pleroma/workers/digest_emails_worker.ex
new file mode 100644
index 000000000..ca073ce67
--- /dev/null
+++ b/lib/pleroma/workers/digest_emails_worker.ex
@@ -0,0 +1,21 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.DigestEmailsWorker do
+ alias Pleroma.User
+
+ # Note: `max_attempts` is intended to be overridden in `new/2` call
+ use Oban.Worker,
+ queue: "digest_emails",
+ max_attempts: 1
+
+ use Pleroma.Workers.WorkerHelper, queue: "digest_emails"
+
+ @impl Oban.Worker
+ def perform(%{"op" => "digest_email", "user_id" => user_id}, _job) do
+ user_id
+ |> User.get_cached_by_id()
+ |> Pleroma.DigestEmailWorker.perform()
+ end
+end
diff --git a/lib/pleroma/workers/mailer_worker.ex b/lib/pleroma/workers/mailer_worker.ex
index 4f73d61bc..a4bd54a6c 100644
--- a/lib/pleroma/workers/mailer_worker.ex
+++ b/lib/pleroma/workers/mailer_worker.ex
@@ -3,13 +3,13 @@
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.MailerWorker do
- alias Pleroma.User
-
# Note: `max_attempts` is intended to be overridden in `new/2` call
use Oban.Worker,
queue: "mailer",
max_attempts: 1
+ use Pleroma.Workers.WorkerHelper, queue: "mailer"
+
@impl Oban.Worker
def perform(%{"op" => "email", "encoded_email" => encoded_email, "config" => config}, _job) do
encoded_email
@@ -17,10 +17,4 @@ defmodule Pleroma.Workers.MailerWorker do
|> :erlang.binary_to_term()
|> Pleroma.Emails.Mailer.deliver(config)
end
-
- def perform(%{"op" => "digest_email", "user_id" => user_id}, _job) do
- user_id
- |> User.get_cached_by_id()
- |> Pleroma.DigestEmailWorker.perform()
- end
end
diff --git a/lib/pleroma/workers/publisher_worker.ex b/lib/pleroma/workers/publisher_worker.ex
index 5671d2a29..a3ac22635 100644
--- a/lib/pleroma/workers/publisher_worker.ex
+++ b/lib/pleroma/workers/publisher_worker.ex
@@ -11,6 +11,8 @@ defmodule Pleroma.Workers.PublisherWorker do
queue: "federator_outgoing",
max_attempts: 1
+ use Pleroma.Workers.WorkerHelper, queue: "federator_outgoing"
+
def backoff(attempt) when is_integer(attempt) do
Pleroma.Workers.WorkerHelper.sidekiq_backoff(attempt, 5)
end
diff --git a/lib/pleroma/workers/receiver_worker.ex b/lib/pleroma/workers/receiver_worker.ex
index cdce630f2..3cc415ce4 100644
--- a/lib/pleroma/workers/receiver_worker.ex
+++ b/lib/pleroma/workers/receiver_worker.ex
@@ -10,6 +10,8 @@ defmodule Pleroma.Workers.ReceiverWorker do
queue: "federator_incoming",
max_attempts: 1
+ use Pleroma.Workers.WorkerHelper, queue: "federator_incoming"
+
@impl Oban.Worker
def perform(%{"op" => "incoming_doc", "body" => doc}, _job) do
Federator.perform(:incoming_doc, doc)
diff --git a/lib/pleroma/workers/scheduled_activity_worker.ex b/lib/pleroma/workers/scheduled_activity_worker.ex
index 4094411ae..936bb64d3 100644
--- a/lib/pleroma/workers/scheduled_activity_worker.ex
+++ b/lib/pleroma/workers/scheduled_activity_worker.ex
@@ -8,6 +8,8 @@ defmodule Pleroma.Workers.ScheduledActivityWorker do
queue: "scheduled_activities",
max_attempts: 1
+ use Pleroma.Workers.WorkerHelper, queue: "scheduled_activities"
+
@impl Oban.Worker
def perform(%{"op" => "execute", "activity_id" => activity_id}, _job) do
Pleroma.ScheduledActivityWorker.perform(:execute, activity_id)
diff --git a/lib/pleroma/workers/subscriber_worker.ex b/lib/pleroma/workers/subscriber_worker.ex
index 22d1dc956..4fb994554 100644
--- a/lib/pleroma/workers/subscriber_worker.ex
+++ b/lib/pleroma/workers/subscriber_worker.ex
@@ -12,6 +12,8 @@ defmodule Pleroma.Workers.SubscriberWorker do
queue: "federator_outgoing",
max_attempts: 1
+ use Pleroma.Workers.WorkerHelper, queue: "federator_outgoing"
+
@impl Oban.Worker
def perform(%{"op" => "refresh_subscriptions"}, _job) do
Federator.perform(:refresh_subscriptions)
diff --git a/lib/pleroma/workers/transmogrifier_worker.ex b/lib/pleroma/workers/transmogrifier_worker.ex
index 6f5c1a2f2..6fecc2bf9 100644
--- a/lib/pleroma/workers/transmogrifier_worker.ex
+++ b/lib/pleroma/workers/transmogrifier_worker.ex
@@ -10,6 +10,8 @@ defmodule Pleroma.Workers.TransmogrifierWorker do
queue: "transmogrifier",
max_attempts: 1
+ use Pleroma.Workers.WorkerHelper, queue: "transmogrifier"
+
@impl Oban.Worker
def perform(%{"op" => "user_upgrade", "user_id" => user_id}, _job) do
user = User.get_cached_by_id(user_id)
diff --git a/lib/pleroma/workers/web_pusher_worker.ex b/lib/pleroma/workers/web_pusher_worker.ex
index 2b1d3b99a..4c2591a5c 100644
--- a/lib/pleroma/workers/web_pusher_worker.ex
+++ b/lib/pleroma/workers/web_pusher_worker.ex
@@ -11,6 +11,8 @@ defmodule Pleroma.Workers.WebPusherWorker do
queue: "web_push",
max_attempts: 1
+ use Pleroma.Workers.WorkerHelper, queue: "web_push"
+
@impl Oban.Worker
def perform(%{"op" => "web_push", "notification_id" => notification_id}, _job) do
notification = Repo.get(Notification, notification_id)
diff --git a/lib/pleroma/workers/worker_helper.ex b/lib/pleroma/workers/worker_helper.ex
index f9ed2e64d..b12f198d4 100644
--- a/lib/pleroma/workers/worker_helper.ex
+++ b/lib/pleroma/workers/worker_helper.ex
@@ -4,6 +4,7 @@
defmodule Pleroma.Workers.WorkerHelper do
alias Pleroma.Config
+ alias Pleroma.Workers.WorkerHelper
def worker_args(queue) do
case Config.get([:workers, :retries, queue]) do
@@ -20,4 +21,21 @@ defmodule Pleroma.Workers.WorkerHelper do
trunc(backoff)
end
+
+ defmacro __using__(opts) do
+ caller_module = __CALLER__.module
+ queue = Keyword.fetch!(opts, :queue)
+
+ quote do
+ def enqueue(op, params, worker_args \\ []) do
+ params = Map.merge(%{"op" => op}, params)
+ queue_atom = String.to_atom(unquote(queue))
+ worker_args = worker_args ++ WorkerHelper.worker_args(queue_atom)
+
+ unquote(caller_module)
+ |> apply(:new, [params, worker_args])
+ |> Pleroma.Repo.insert()
+ end
+ end
+ end
end