aboutsummaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/pleroma/activity_expiration_worker.ex13
-rw-r--r--lib/pleroma/digest_email_worker.ex10
-rw-r--r--lib/pleroma/emails/mailer.ex7
-rw-r--r--lib/pleroma/scheduled_activity_worker.ex10
-rw-r--r--lib/pleroma/user.ex28
-rw-r--r--lib/pleroma/web/activity_pub/activity_pub.ex6
-rw-r--r--lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex11
-rw-r--r--lib/pleroma/web/activity_pub/transmogrifier.ex6
-rw-r--r--lib/pleroma/web/federator/federator.ex26
-rw-r--r--lib/pleroma/web/federator/publisher.ex9
-rw-r--r--lib/pleroma/web/oauth/token/clean_worker.ex7
-rw-r--r--lib/pleroma/web/push/push.ex7
-rw-r--r--lib/pleroma/workers/activity_expiration_worker.ex2
-rw-r--r--lib/pleroma/workers/background_worker.ex2
-rw-r--r--lib/pleroma/workers/digest_emails_worker.ex21
-rw-r--r--lib/pleroma/workers/mailer_worker.ex10
-rw-r--r--lib/pleroma/workers/publisher_worker.ex2
-rw-r--r--lib/pleroma/workers/receiver_worker.ex2
-rw-r--r--lib/pleroma/workers/scheduled_activity_worker.ex2
-rw-r--r--lib/pleroma/workers/subscriber_worker.ex2
-rw-r--r--lib/pleroma/workers/transmogrifier_worker.ex2
-rw-r--r--lib/pleroma/workers/web_pusher_worker.ex2
-rw-r--r--lib/pleroma/workers/worker_helper.ex18
23 files changed, 92 insertions, 113 deletions
diff --git a/lib/pleroma/activity_expiration_worker.ex b/lib/pleroma/activity_expiration_worker.ex
index 7aba7eece..c0820c202 100644
--- a/lib/pleroma/activity_expiration_worker.ex
+++ b/lib/pleroma/activity_expiration_worker.ex
@@ -9,14 +9,11 @@ defmodule Pleroma.ActivityExpirationWorker do
alias Pleroma.Repo
alias Pleroma.User
alias Pleroma.Web.CommonAPI
- alias Pleroma.Workers.ActivityExpirationWorker
require Logger
use GenServer
import Ecto.Query
- import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
-
@schedule_interval :timer.minutes(1)
def start_link(_) do
@@ -53,12 +50,10 @@ defmodule Pleroma.ActivityExpirationWorker do
def handle_info(:perform, state) do
ActivityExpiration.due_expirations(@schedule_interval)
|> Enum.each(fn expiration ->
- %{
- "op" => "activity_expiration",
- "activity_expiration_id" => expiration.id
- }
- |> ActivityExpirationWorker.new(worker_args(:activity_expiration))
- |> Repo.insert()
+ Pleroma.Workers.ActivityExpirationWorker.enqueue(
+ "activity_expiration",
+ %{"activity_expiration_id" => expiration.id}
+ )
end)
schedule_next()
diff --git a/lib/pleroma/digest_email_worker.ex b/lib/pleroma/digest_email_worker.ex
index 4ab2a4ef4..5be7cf26b 100644
--- a/lib/pleroma/digest_email_worker.ex
+++ b/lib/pleroma/digest_email_worker.ex
@@ -4,12 +4,10 @@
defmodule Pleroma.DigestEmailWorker do
alias Pleroma.Repo
- alias Pleroma.Workers.MailerWorker
+ alias Pleroma.Workers.DigestEmailsWorker
import Ecto.Query
- import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
-
def perform do
config = Pleroma.Config.get([:email_notifications, :digest])
negative_interval = -Map.fetch!(config, :interval)
@@ -23,11 +21,9 @@ defmodule Pleroma.DigestEmailWorker do
where: u.last_digest_emailed_at < datetime_add(^now, ^negative_interval, "day"),
select: u
)
- |> Pleroma.Repo.all()
+ |> Repo.all()
|> Enum.each(fn user ->
- %{"op" => "digest_email", "user_id" => user.id}
- |> MailerWorker.new([queue: "digest_emails"] ++ worker_args(:digest_emails))
- |> Repo.insert()
+ DigestEmailsWorker.enqueue("digest_email", %{"user_id" => user.id})
end)
end
diff --git a/lib/pleroma/emails/mailer.ex b/lib/pleroma/emails/mailer.ex
index 9cbe7313c..eb96f2e8b 100644
--- a/lib/pleroma/emails/mailer.ex
+++ b/lib/pleroma/emails/mailer.ex
@@ -9,7 +9,6 @@ defmodule Pleroma.Emails.Mailer do
The module contains functions to delivery email using Swoosh.Mailer.
"""
- alias Pleroma.Repo
alias Pleroma.Workers.MailerWorker
alias Swoosh.DeliveryError
@@ -19,8 +18,6 @@ defmodule Pleroma.Emails.Mailer do
@spec enabled?() :: boolean()
def enabled?, do: Pleroma.Config.get([__MODULE__, :enabled])
- import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
-
@doc "add email to queue"
def deliver_async(email, config \\ []) do
encoded_email =
@@ -28,9 +25,7 @@ defmodule Pleroma.Emails.Mailer do
|> :erlang.term_to_binary()
|> Base.encode64()
- %{"op" => "email", "encoded_email" => encoded_email, "config" => config}
- |> MailerWorker.new(worker_args(:mailer))
- |> Repo.insert()
+ MailerWorker.enqueue("email", %{"encoded_email" => encoded_email, "config" => config})
end
@doc "callback to perform send email from queue"
diff --git a/lib/pleroma/scheduled_activity_worker.ex b/lib/pleroma/scheduled_activity_worker.ex
index 8bf534f42..c41a542de 100644
--- a/lib/pleroma/scheduled_activity_worker.ex
+++ b/lib/pleroma/scheduled_activity_worker.ex
@@ -8,7 +8,6 @@ defmodule Pleroma.ScheduledActivityWorker do
"""
alias Pleroma.Config
- alias Pleroma.Repo
alias Pleroma.ScheduledActivity
alias Pleroma.User
alias Pleroma.Web.CommonAPI
@@ -18,8 +17,6 @@ defmodule Pleroma.ScheduledActivityWorker do
@schedule_interval :timer.minutes(1)
- import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
-
def start_link(_) do
GenServer.start_link(__MODULE__, nil)
end
@@ -49,9 +46,10 @@ defmodule Pleroma.ScheduledActivityWorker do
def handle_info(:perform, state) do
ScheduledActivity.due_activities(@schedule_interval)
|> Enum.each(fn scheduled_activity ->
- %{"op" => "execute", "activity_id" => scheduled_activity.id}
- |> Pleroma.Workers.ScheduledActivityWorker.new(worker_args(:scheduled_activities))
- |> Repo.insert()
+ Pleroma.Workers.ScheduledActivityWorker.enqueue(
+ "execute",
+ %{"activity_id" => scheduled_activity.id}
+ )
end)
schedule_next()
diff --git a/lib/pleroma/user.ex b/lib/pleroma/user.ex
index abfa063fb..2fe7e1748 100644
--- a/lib/pleroma/user.ex
+++ b/lib/pleroma/user.ex
@@ -41,8 +41,6 @@ defmodule Pleroma.User do
@strict_local_nickname_regex ~r/^[a-zA-Z\d]+$/
@extended_local_nickname_regex ~r/^[a-zA-Z\d_-]+$/
- import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
-
schema "users" do
field(:bio, :string)
field(:email, :string)
@@ -623,9 +621,7 @@ defmodule Pleroma.User do
@doc "Fetch some posts when the user has just been federated with"
def fetch_initial_posts(user) do
- %{"op" => "fetch_initial_posts", "user_id" => user.id}
- |> BackgroundWorker.new(worker_args(:background))
- |> Repo.insert()
+ BackgroundWorker.enqueue("fetch_initial_posts", %{"user_id" => user.id})
end
@spec get_followers_query(User.t(), pos_integer() | nil) :: Ecto.Query.t()
@@ -1056,9 +1052,7 @@ defmodule Pleroma.User do
end
def deactivate_async(user, status \\ true) do
- %{"op" => "deactivate_user", "user_id" => user.id, "status" => status}
- |> BackgroundWorker.new(worker_args(:background))
- |> Repo.insert()
+ BackgroundWorker.enqueue("deactivate_user", %{"user_id" => user.id, "status" => status})
end
def deactivate(%User{} = user, status \\ true) do
@@ -1087,9 +1081,7 @@ defmodule Pleroma.User do
end
def delete(%User{} = user) do
- %{"op" => "delete_user", "user_id" => user.id}
- |> BackgroundWorker.new(worker_args(:background))
- |> Repo.insert()
+ BackgroundWorker.enqueue("delete_user", %{"user_id" => user.id})
end
@spec perform(atom(), User.t()) :: {:ok, User.t()}
@@ -1198,24 +1190,18 @@ defmodule Pleroma.User do
end
def blocks_import(%User{} = blocker, blocked_identifiers) when is_list(blocked_identifiers) do
- %{
- "op" => "blocks_import",
+ BackgroundWorker.enqueue("blocks_import", %{
"blocker_id" => blocker.id,
"blocked_identifiers" => blocked_identifiers
- }
- |> BackgroundWorker.new(worker_args(:background))
- |> Repo.insert()
+ })
end
def follow_import(%User{} = follower, followed_identifiers)
when is_list(followed_identifiers) do
- %{
- "op" => "follow_import",
+ BackgroundWorker.enqueue("follow_import", %{
"follower_id" => follower.id,
"followed_identifiers" => followed_identifiers
- }
- |> BackgroundWorker.new(worker_args(:background))
- |> Repo.insert()
+ })
end
def delete_user_activities(%User{ap_id: ap_id} = user) do
diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex
index 74c5eb91c..90b409606 100644
--- a/lib/pleroma/web/activity_pub/activity_pub.ex
+++ b/lib/pleroma/web/activity_pub/activity_pub.ex
@@ -26,8 +26,6 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
require Logger
require Pleroma.Constants
- import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
-
# For Announce activities, we filter the recipients based on following status for any actors
# that match actual users. See issue #164 for more information about why this is necessary.
defp get_recipients(%{"type" => "Announce"} = data) do
@@ -148,9 +146,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
activity
end
- %{"op" => "fetch_data_for_activity", "activity_id" => activity.id}
- |> BackgroundWorker.new(worker_args(:background))
- |> Repo.insert()
+ BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
Notification.create_notifications(activity)
diff --git a/lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex b/lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex
index 178321558..26b8539fe 100644
--- a/lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex
+++ b/lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex
@@ -7,7 +7,6 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do
@behaviour Pleroma.Web.ActivityPub.MRF
alias Pleroma.HTTP
- alias Pleroma.Repo
alias Pleroma.Web.MediaProxy
alias Pleroma.Workers.BackgroundWorker
@@ -18,8 +17,6 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do
recv_timeout: 10_000
]
- import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
-
def perform(:prefetch, url) do
Logger.info("Prefetching #{inspect(url)}")
@@ -34,9 +31,7 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do
url
|> Enum.each(fn
%{"href" => href} ->
- %{"op" => "media_proxy_prefetch", "url" => href}
- |> BackgroundWorker.new(worker_args(:background))
- |> Repo.insert()
+ BackgroundWorker.enqueue("media_proxy_prefetch", %{"url" => href})
x ->
Logger.debug("Unhandled attachment URL object #{inspect(x)}")
@@ -52,9 +47,7 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do
%{"type" => "Create", "object" => %{"attachment" => attachments} = _object} = message
)
when is_list(attachments) and length(attachments) > 0 do
- %{"op" => "media_proxy_preload", "message" => message}
- |> BackgroundWorker.new(worker_args(:background))
- |> Repo.insert()
+ BackgroundWorker.enqueue("media_proxy_preload", %{"message" => message})
{:ok, message}
end
diff --git a/lib/pleroma/web/activity_pub/transmogrifier.ex b/lib/pleroma/web/activity_pub/transmogrifier.ex
index 9437f9a16..f27455e8b 100644
--- a/lib/pleroma/web/activity_pub/transmogrifier.ex
+++ b/lib/pleroma/web/activity_pub/transmogrifier.ex
@@ -22,8 +22,6 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do
require Logger
require Pleroma.Constants
- import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
-
@doc """
Modifies an incoming AP object (mastodon format) to our internal format.
"""
@@ -1054,9 +1052,7 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do
already_ap <- User.ap_enabled?(user),
{:ok, user} <- user |> User.upgrade_changeset(data) |> User.update_and_set_cache() do
unless already_ap do
- %{"op" => "user_upgrade", "user_id" => user.id}
- |> TransmogrifierWorker.new(worker_args(:transmogrifier))
- |> Repo.insert()
+ TransmogrifierWorker.enqueue("user_upgrade", %{"user_id" => user.id})
end
{:ok, user}
diff --git a/lib/pleroma/web/federator/federator.ex b/lib/pleroma/web/federator/federator.ex
index 8f43066e3..1a2da014a 100644
--- a/lib/pleroma/web/federator/federator.ex
+++ b/lib/pleroma/web/federator/federator.ex
@@ -18,8 +18,6 @@ defmodule Pleroma.Web.Federator do
require Logger
- import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
-
def init do
# To do: consider removing this call in favor of scheduled execution (`quantum`-based)
refresh_subscriptions(schedule_in: 60)
@@ -40,15 +38,11 @@ defmodule Pleroma.Web.Federator do
# Client API
def incoming_doc(doc) do
- %{"op" => "incoming_doc", "body" => doc}
- |> ReceiverWorker.new(worker_args(:federator_incoming))
- |> Pleroma.Repo.insert()
+ ReceiverWorker.enqueue("incoming_doc", %{"body" => doc})
end
def incoming_ap_doc(params) do
- %{"op" => "incoming_ap_doc", "params" => params}
- |> ReceiverWorker.new(worker_args(:federator_incoming))
- |> Pleroma.Repo.insert()
+ ReceiverWorker.enqueue("incoming_ap_doc", %{"params" => params})
end
def publish(%{id: "pleroma:fakeid"} = activity) do
@@ -56,27 +50,19 @@ defmodule Pleroma.Web.Federator do
end
def publish(activity) do
- %{"op" => "publish", "activity_id" => activity.id}
- |> PublisherWorker.new(worker_args(:federator_outgoing))
- |> Pleroma.Repo.insert()
+ PublisherWorker.enqueue("publish", %{"activity_id" => activity.id})
end
def verify_websub(websub) do
- %{"op" => "verify_websub", "websub_id" => websub.id}
- |> SubscriberWorker.new(worker_args(:federator_outgoing))
- |> Pleroma.Repo.insert()
+ SubscriberWorker.enqueue("verify_websub", %{"websub_id" => websub.id})
end
def request_subscription(websub) do
- %{"op" => "request_subscription", "websub_id" => websub.id}
- |> SubscriberWorker.new(worker_args(:federator_outgoing))
- |> Pleroma.Repo.insert()
+ SubscriberWorker.enqueue("request_subscription", %{"websub_id" => websub.id})
end
def refresh_subscriptions(worker_args \\ []) do
- %{"op" => "refresh_subscriptions"}
- |> SubscriberWorker.new(worker_args ++ [max_attempts: 1] ++ worker_args(:federator_outgoing))
- |> Pleroma.Repo.insert()
+ SubscriberWorker.enqueue("refresh_subscriptions", %{}, worker_args ++ [max_attempts: 1])
end
# Job Worker Callbacks
diff --git a/lib/pleroma/web/federator/publisher.ex b/lib/pleroma/web/federator/publisher.ex
index 42be109ab..937064638 100644
--- a/lib/pleroma/web/federator/publisher.ex
+++ b/lib/pleroma/web/federator/publisher.ex
@@ -31,11 +31,10 @@ defmodule Pleroma.Web.Federator.Publisher do
"""
@spec enqueue_one(module(), Map.t()) :: :ok
def enqueue_one(module, %{} = params) do
- worker_args = Pleroma.Workers.WorkerHelper.worker_args(:federator_outgoing)
-
- %{"op" => "publish_one", "module" => to_string(module), "params" => params}
- |> PublisherWorker.new(worker_args)
- |> Pleroma.Repo.insert()
+ PublisherWorker.enqueue(
+ "publish_one",
+ %{"module" => to_string(module), "params" => params}
+ )
end
@doc """
diff --git a/lib/pleroma/web/oauth/token/clean_worker.ex b/lib/pleroma/web/oauth/token/clean_worker.ex
index b150a68a7..eb94bf86f 100644
--- a/lib/pleroma/web/oauth/token/clean_worker.ex
+++ b/lib/pleroma/web/oauth/token/clean_worker.ex
@@ -16,12 +16,9 @@ defmodule Pleroma.Web.OAuth.Token.CleanWorker do
@one_day
)
- alias Pleroma.Repo
alias Pleroma.Web.OAuth.Token
alias Pleroma.Workers.BackgroundWorker
- import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
-
def start_link(_), do: GenServer.start_link(__MODULE__, %{})
def init(_) do
@@ -31,9 +28,7 @@ defmodule Pleroma.Web.OAuth.Token.CleanWorker do
@doc false
def handle_info(:perform, state) do
- %{"op" => "clean_expired_tokens"}
- |> BackgroundWorker.new(worker_args(:background))
- |> Repo.insert()
+ BackgroundWorker.enqueue("clean_expired_tokens", %{})
Process.send_after(self(), :perform, @interval)
{:noreply, state}
diff --git a/lib/pleroma/web/push/push.ex b/lib/pleroma/web/push/push.ex
index 4973b529c..7ef1532ac 100644
--- a/lib/pleroma/web/push/push.ex
+++ b/lib/pleroma/web/push/push.ex
@@ -3,13 +3,10 @@
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.Push do
- alias Pleroma.Repo
alias Pleroma.Workers.WebPusherWorker
require Logger
- import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
-
def init do
unless enabled() do
Logger.warn("""
@@ -35,8 +32,6 @@ defmodule Pleroma.Web.Push do
end
def send(notification) do
- %{"op" => "web_push", "notification_id" => notification.id}
- |> WebPusherWorker.new(worker_args(:web_push))
- |> Repo.insert()
+ WebPusherWorker.enqueue("web_push", %{"notification_id" => notification.id})
end
end
diff --git a/lib/pleroma/workers/activity_expiration_worker.ex b/lib/pleroma/workers/activity_expiration_worker.ex
index 0b491eabb..60dd3feba 100644
--- a/lib/pleroma/workers/activity_expiration_worker.ex
+++ b/lib/pleroma/workers/activity_expiration_worker.ex
@@ -8,6 +8,8 @@ defmodule Pleroma.Workers.ActivityExpirationWorker do
queue: "activity_expiration",
max_attempts: 1
+ use Pleroma.Workers.WorkerHelper, queue: "activity_expiration"
+
@impl Oban.Worker
def perform(
%{
diff --git a/lib/pleroma/workers/background_worker.ex b/lib/pleroma/workers/background_worker.ex
index 7b5575a5f..b9aef3a92 100644
--- a/lib/pleroma/workers/background_worker.ex
+++ b/lib/pleroma/workers/background_worker.ex
@@ -13,6 +13,8 @@ defmodule Pleroma.Workers.BackgroundWorker do
queue: "background",
max_attempts: 1
+ use Pleroma.Workers.WorkerHelper, queue: "background"
+
@impl Oban.Worker
def perform(%{"op" => "fetch_initial_posts", "user_id" => user_id}, _job) do
user = User.get_cached_by_id(user_id)
diff --git a/lib/pleroma/workers/digest_emails_worker.ex b/lib/pleroma/workers/digest_emails_worker.ex
new file mode 100644
index 000000000..ca073ce67
--- /dev/null
+++ b/lib/pleroma/workers/digest_emails_worker.ex
@@ -0,0 +1,21 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.DigestEmailsWorker do
+ alias Pleroma.User
+
+ # Note: `max_attempts` is intended to be overridden in `new/2` call
+ use Oban.Worker,
+ queue: "digest_emails",
+ max_attempts: 1
+
+ use Pleroma.Workers.WorkerHelper, queue: "digest_emails"
+
+ @impl Oban.Worker
+ def perform(%{"op" => "digest_email", "user_id" => user_id}, _job) do
+ user_id
+ |> User.get_cached_by_id()
+ |> Pleroma.DigestEmailWorker.perform()
+ end
+end
diff --git a/lib/pleroma/workers/mailer_worker.ex b/lib/pleroma/workers/mailer_worker.ex
index 4f73d61bc..a4bd54a6c 100644
--- a/lib/pleroma/workers/mailer_worker.ex
+++ b/lib/pleroma/workers/mailer_worker.ex
@@ -3,13 +3,13 @@
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.MailerWorker do
- alias Pleroma.User
-
# Note: `max_attempts` is intended to be overridden in `new/2` call
use Oban.Worker,
queue: "mailer",
max_attempts: 1
+ use Pleroma.Workers.WorkerHelper, queue: "mailer"
+
@impl Oban.Worker
def perform(%{"op" => "email", "encoded_email" => encoded_email, "config" => config}, _job) do
encoded_email
@@ -17,10 +17,4 @@ defmodule Pleroma.Workers.MailerWorker do
|> :erlang.binary_to_term()
|> Pleroma.Emails.Mailer.deliver(config)
end
-
- def perform(%{"op" => "digest_email", "user_id" => user_id}, _job) do
- user_id
- |> User.get_cached_by_id()
- |> Pleroma.DigestEmailWorker.perform()
- end
end
diff --git a/lib/pleroma/workers/publisher_worker.ex b/lib/pleroma/workers/publisher_worker.ex
index 5671d2a29..a3ac22635 100644
--- a/lib/pleroma/workers/publisher_worker.ex
+++ b/lib/pleroma/workers/publisher_worker.ex
@@ -11,6 +11,8 @@ defmodule Pleroma.Workers.PublisherWorker do
queue: "federator_outgoing",
max_attempts: 1
+ use Pleroma.Workers.WorkerHelper, queue: "federator_outgoing"
+
def backoff(attempt) when is_integer(attempt) do
Pleroma.Workers.WorkerHelper.sidekiq_backoff(attempt, 5)
end
diff --git a/lib/pleroma/workers/receiver_worker.ex b/lib/pleroma/workers/receiver_worker.ex
index cdce630f2..3cc415ce4 100644
--- a/lib/pleroma/workers/receiver_worker.ex
+++ b/lib/pleroma/workers/receiver_worker.ex
@@ -10,6 +10,8 @@ defmodule Pleroma.Workers.ReceiverWorker do
queue: "federator_incoming",
max_attempts: 1
+ use Pleroma.Workers.WorkerHelper, queue: "federator_incoming"
+
@impl Oban.Worker
def perform(%{"op" => "incoming_doc", "body" => doc}, _job) do
Federator.perform(:incoming_doc, doc)
diff --git a/lib/pleroma/workers/scheduled_activity_worker.ex b/lib/pleroma/workers/scheduled_activity_worker.ex
index 4094411ae..936bb64d3 100644
--- a/lib/pleroma/workers/scheduled_activity_worker.ex
+++ b/lib/pleroma/workers/scheduled_activity_worker.ex
@@ -8,6 +8,8 @@ defmodule Pleroma.Workers.ScheduledActivityWorker do
queue: "scheduled_activities",
max_attempts: 1
+ use Pleroma.Workers.WorkerHelper, queue: "scheduled_activities"
+
@impl Oban.Worker
def perform(%{"op" => "execute", "activity_id" => activity_id}, _job) do
Pleroma.ScheduledActivityWorker.perform(:execute, activity_id)
diff --git a/lib/pleroma/workers/subscriber_worker.ex b/lib/pleroma/workers/subscriber_worker.ex
index 22d1dc956..4fb994554 100644
--- a/lib/pleroma/workers/subscriber_worker.ex
+++ b/lib/pleroma/workers/subscriber_worker.ex
@@ -12,6 +12,8 @@ defmodule Pleroma.Workers.SubscriberWorker do
queue: "federator_outgoing",
max_attempts: 1
+ use Pleroma.Workers.WorkerHelper, queue: "federator_outgoing"
+
@impl Oban.Worker
def perform(%{"op" => "refresh_subscriptions"}, _job) do
Federator.perform(:refresh_subscriptions)
diff --git a/lib/pleroma/workers/transmogrifier_worker.ex b/lib/pleroma/workers/transmogrifier_worker.ex
index 6f5c1a2f2..6fecc2bf9 100644
--- a/lib/pleroma/workers/transmogrifier_worker.ex
+++ b/lib/pleroma/workers/transmogrifier_worker.ex
@@ -10,6 +10,8 @@ defmodule Pleroma.Workers.TransmogrifierWorker do
queue: "transmogrifier",
max_attempts: 1
+ use Pleroma.Workers.WorkerHelper, queue: "transmogrifier"
+
@impl Oban.Worker
def perform(%{"op" => "user_upgrade", "user_id" => user_id}, _job) do
user = User.get_cached_by_id(user_id)
diff --git a/lib/pleroma/workers/web_pusher_worker.ex b/lib/pleroma/workers/web_pusher_worker.ex
index 2b1d3b99a..4c2591a5c 100644
--- a/lib/pleroma/workers/web_pusher_worker.ex
+++ b/lib/pleroma/workers/web_pusher_worker.ex
@@ -11,6 +11,8 @@ defmodule Pleroma.Workers.WebPusherWorker do
queue: "web_push",
max_attempts: 1
+ use Pleroma.Workers.WorkerHelper, queue: "web_push"
+
@impl Oban.Worker
def perform(%{"op" => "web_push", "notification_id" => notification_id}, _job) do
notification = Repo.get(Notification, notification_id)
diff --git a/lib/pleroma/workers/worker_helper.ex b/lib/pleroma/workers/worker_helper.ex
index f9ed2e64d..b12f198d4 100644
--- a/lib/pleroma/workers/worker_helper.ex
+++ b/lib/pleroma/workers/worker_helper.ex
@@ -4,6 +4,7 @@
defmodule Pleroma.Workers.WorkerHelper do
alias Pleroma.Config
+ alias Pleroma.Workers.WorkerHelper
def worker_args(queue) do
case Config.get([:workers, :retries, queue]) do
@@ -20,4 +21,21 @@ defmodule Pleroma.Workers.WorkerHelper do
trunc(backoff)
end
+
+ defmacro __using__(opts) do
+ caller_module = __CALLER__.module
+ queue = Keyword.fetch!(opts, :queue)
+
+ quote do
+ def enqueue(op, params, worker_args \\ []) do
+ params = Map.merge(%{"op" => op}, params)
+ queue_atom = String.to_atom(unquote(queue))
+ worker_args = worker_args ++ WorkerHelper.worker_args(queue_atom)
+
+ unquote(caller_module)
+ |> apply(:new, [params, worker_args])
+ |> Pleroma.Repo.insert()
+ end
+ end
+ end
end