aboutsummaryrefslogtreecommitdiff
path: root/lib/pleroma/workers
diff options
context:
space:
mode:
Diffstat (limited to 'lib/pleroma/workers')
-rw-r--r--lib/pleroma/workers/attachments_cleanup_worker.ex26
-rw-r--r--lib/pleroma/workers/background_worker.ex50
-rw-r--r--lib/pleroma/workers/backup_worker.ex54
-rw-r--r--lib/pleroma/workers/cron/clear_oauth_token_worker.ex23
-rw-r--r--lib/pleroma/workers/cron/digest_emails_worker.ex2
-rw-r--r--lib/pleroma/workers/cron/new_users_digest_worker.ex2
-rw-r--r--lib/pleroma/workers/cron/purge_expired_activities_worker.ex48
-rw-r--r--lib/pleroma/workers/cron/stats_worker.ex17
-rw-r--r--lib/pleroma/workers/mailer_worker.ex2
-rw-r--r--lib/pleroma/workers/mute_expire_worker.ex20
-rw-r--r--lib/pleroma/workers/poll_worker.ex45
-rw-r--r--lib/pleroma/workers/publisher_worker.ex2
-rw-r--r--lib/pleroma/workers/purge_expired_activity.ex72
-rw-r--r--lib/pleroma/workers/purge_expired_filter.ex43
-rw-r--r--lib/pleroma/workers/purge_expired_token.ex29
-rw-r--r--lib/pleroma/workers/receiver_worker.ex2
-rw-r--r--lib/pleroma/workers/remote_fetcher_worker.ex2
-rw-r--r--lib/pleroma/workers/scheduled_activity_worker.ex56
-rw-r--r--lib/pleroma/workers/transmogrifier_worker.ex2
-rw-r--r--lib/pleroma/workers/web_pusher_worker.ex2
-rw-r--r--lib/pleroma/workers/worker_helper.ex2
21 files changed, 329 insertions, 172 deletions
diff --git a/lib/pleroma/workers/attachments_cleanup_worker.ex b/lib/pleroma/workers/attachments_cleanup_worker.ex
index 58226b395..f5090dae7 100644
--- a/lib/pleroma/workers/attachments_cleanup_worker.ex
+++ b/lib/pleroma/workers/attachments_cleanup_worker.ex
@@ -1,5 +1,5 @@
# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
+# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.AttachmentsCleanupWorker do
@@ -17,12 +17,14 @@ defmodule Pleroma.Workers.AttachmentsCleanupWorker do
"object" => %{"data" => %{"attachment" => [_ | _] = attachments, "actor" => actor}}
}
}) do
- attachments
- |> Enum.flat_map(fn item -> Enum.map(item["url"], & &1["href"]) end)
- |> fetch_objects
- |> prepare_objects(actor, Enum.map(attachments, & &1["name"]))
- |> filter_objects
- |> do_clean
+ if Pleroma.Config.get([:instance, :cleanup_attachments], false) do
+ attachments
+ |> Enum.flat_map(fn item -> Enum.map(item["url"], & &1["href"]) end)
+ |> fetch_objects
+ |> prepare_objects(actor, Enum.map(attachments, & &1["name"]))
+ |> filter_objects
+ |> do_clean
+ end
{:ok, :success}
end
@@ -32,21 +34,15 @@ defmodule Pleroma.Workers.AttachmentsCleanupWorker do
defp do_clean({object_ids, attachment_urls}) do
uploader = Pleroma.Config.get([Pleroma.Upload, :uploader])
- prefix =
- case Pleroma.Config.get([Pleroma.Upload, :base_url]) do
- nil -> "media"
- _ -> ""
- end
-
base_url =
String.trim_trailing(
- Pleroma.Config.get([Pleroma.Upload, :base_url], Pleroma.Web.base_url()),
+ Pleroma.Upload.base_url(),
"/"
)
Enum.each(attachment_urls, fn href ->
href
- |> String.trim_leading("#{base_url}/#{prefix}")
+ |> String.trim_leading("#{base_url}")
|> uploader.delete_file()
end)
diff --git a/lib/pleroma/workers/background_worker.ex b/lib/pleroma/workers/background_worker.ex
index cec5a7462..4db077232 100644
--- a/lib/pleroma/workers/background_worker.ex
+++ b/lib/pleroma/workers/background_worker.ex
@@ -1,19 +1,18 @@
# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
+# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.BackgroundWorker do
- alias Pleroma.Activity
+ alias Pleroma.Instances.Instance
alias Pleroma.User
- alias Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy
use Pleroma.Workers.WorkerHelper, queue: "background"
@impl Oban.Worker
- def perform(%Job{args: %{"op" => "deactivate_user", "user_id" => user_id, "status" => status}}) do
+ def perform(%Job{args: %{"op" => "user_activation", "user_id" => user_id, "status" => status}}) do
user = User.get_cached_by_id(user_id)
- User.perform(:deactivate_async, user, status)
+ User.perform(:set_activation_async, user, status)
end
def perform(%Job{args: %{"op" => "delete_user", "user_id" => user_id}}) do
@@ -26,39 +25,10 @@ defmodule Pleroma.Workers.BackgroundWorker do
User.perform(:force_password_reset, user)
end
- def perform(%Job{
- args: %{
- "op" => "blocks_import",
- "blocker_id" => blocker_id,
- "blocked_identifiers" => blocked_identifiers
- }
- }) do
- blocker = User.get_cached_by_id(blocker_id)
- {:ok, User.perform(:blocks_import, blocker, blocked_identifiers)}
- end
-
- def perform(%Job{
- args: %{
- "op" => "follow_import",
- "follower_id" => follower_id,
- "followed_identifiers" => followed_identifiers
- }
- }) do
- follower = User.get_cached_by_id(follower_id)
- {:ok, User.perform(:follow_import, follower, followed_identifiers)}
- end
-
- def perform(%Job{args: %{"op" => "media_proxy_preload", "message" => message}}) do
- MediaProxyWarmingPolicy.perform(:preload, message)
- end
-
- def perform(%Job{args: %{"op" => "media_proxy_prefetch", "url" => url}}) do
- MediaProxyWarmingPolicy.perform(:prefetch, url)
- end
-
- def perform(%Job{args: %{"op" => "fetch_data_for_activity", "activity_id" => activity_id}}) do
- activity = Activity.get_by_id(activity_id)
- Pleroma.Web.RichMedia.Helpers.perform(:fetch, activity)
+ def perform(%Job{args: %{"op" => op, "user_id" => user_id, "identifiers" => identifiers}})
+ when op in ["blocks_import", "follow_import", "mutes_import"] do
+ user = User.get_cached_by_id(user_id)
+ {:ok, User.Import.perform(String.to_atom(op), user, identifiers)}
end
def perform(%Job{
@@ -69,4 +39,8 @@ defmodule Pleroma.Workers.BackgroundWorker do
Pleroma.FollowingRelationship.move_following(origin, target)
end
+
+ def perform(%Job{args: %{"op" => "delete_instance", "host" => host}}) do
+ Instance.perform(:delete_instance, host)
+ end
end
diff --git a/lib/pleroma/workers/backup_worker.ex b/lib/pleroma/workers/backup_worker.ex
new file mode 100644
index 000000000..9b763b04b
--- /dev/null
+++ b/lib/pleroma/workers/backup_worker.ex
@@ -0,0 +1,54 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.BackupWorker do
+ use Oban.Worker, queue: :backup, max_attempts: 1
+
+ alias Oban.Job
+ alias Pleroma.User.Backup
+
+ def process(backup, admin_user_id \\ nil) do
+ %{"op" => "process", "backup_id" => backup.id, "admin_user_id" => admin_user_id}
+ |> new()
+ |> Oban.insert()
+ end
+
+ def schedule_deletion(backup) do
+ days = Pleroma.Config.get([Backup, :purge_after_days])
+ time = 60 * 60 * 24 * days
+ scheduled_at = Calendar.NaiveDateTime.add!(backup.inserted_at, time)
+
+ %{"op" => "delete", "backup_id" => backup.id}
+ |> new(scheduled_at: scheduled_at)
+ |> Oban.insert()
+ end
+
+ def delete(backup) do
+ %{"op" => "delete", "backup_id" => backup.id}
+ |> new()
+ |> Oban.insert()
+ end
+
+ def perform(%Job{
+ args: %{"op" => "process", "backup_id" => backup_id, "admin_user_id" => admin_user_id}
+ }) do
+ with {:ok, %Backup{} = backup} <-
+ backup_id |> Backup.get() |> Backup.process(),
+ {:ok, _job} <- schedule_deletion(backup),
+ :ok <- Backup.remove_outdated(backup),
+ {:ok, _} <-
+ backup
+ |> Pleroma.Emails.UserEmail.backup_is_ready_email(admin_user_id)
+ |> Pleroma.Emails.Mailer.deliver() do
+ {:ok, backup}
+ end
+ end
+
+ def perform(%Job{args: %{"op" => "delete", "backup_id" => backup_id}}) do
+ case Backup.get(backup_id) do
+ %Backup{} = backup -> Backup.delete(backup)
+ nil -> :ok
+ end
+ end
+end
diff --git a/lib/pleroma/workers/cron/clear_oauth_token_worker.ex b/lib/pleroma/workers/cron/clear_oauth_token_worker.ex
deleted file mode 100644
index 276f47efc..000000000
--- a/lib/pleroma/workers/cron/clear_oauth_token_worker.ex
+++ /dev/null
@@ -1,23 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Workers.Cron.ClearOauthTokenWorker do
- @moduledoc """
- The worker to cleanup expired oAuth tokens.
- """
-
- use Oban.Worker, queue: "background"
-
- alias Pleroma.Config
- alias Pleroma.Web.OAuth.Token
-
- @impl Oban.Worker
- def perform(_job) do
- if Config.get([:oauth2, :clean_expired_tokens], false) do
- Token.delete_expired_tokens()
- end
-
- :ok
- end
-end
diff --git a/lib/pleroma/workers/cron/digest_emails_worker.ex b/lib/pleroma/workers/cron/digest_emails_worker.ex
index 0c56f00fb..83dc75d60 100644
--- a/lib/pleroma/workers/cron/digest_emails_worker.ex
+++ b/lib/pleroma/workers/cron/digest_emails_worker.ex
@@ -1,5 +1,5 @@
# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
+# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.Cron.DigestEmailsWorker do
diff --git a/lib/pleroma/workers/cron/new_users_digest_worker.ex b/lib/pleroma/workers/cron/new_users_digest_worker.ex
index 8bbaed83d..9dfd92228 100644
--- a/lib/pleroma/workers/cron/new_users_digest_worker.ex
+++ b/lib/pleroma/workers/cron/new_users_digest_worker.ex
@@ -1,5 +1,5 @@
# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
+# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.Cron.NewUsersDigestWorker do
diff --git a/lib/pleroma/workers/cron/purge_expired_activities_worker.ex b/lib/pleroma/workers/cron/purge_expired_activities_worker.ex
deleted file mode 100644
index 6549207fc..000000000
--- a/lib/pleroma/workers/cron/purge_expired_activities_worker.ex
+++ /dev/null
@@ -1,48 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Workers.Cron.PurgeExpiredActivitiesWorker do
- @moduledoc """
- The worker to purge expired activities.
- """
-
- use Oban.Worker, queue: "activity_expiration"
-
- alias Pleroma.Activity
- alias Pleroma.ActivityExpiration
- alias Pleroma.Config
- alias Pleroma.User
- alias Pleroma.Web.CommonAPI
-
- require Logger
-
- @interval :timer.minutes(1)
-
- @impl Oban.Worker
- def perform(_job) do
- if Config.get([ActivityExpiration, :enabled]) do
- Enum.each(ActivityExpiration.due_expirations(@interval), &delete_activity/1)
- end
- after
- :ok
- end
-
- def delete_activity(%ActivityExpiration{activity_id: activity_id}) do
- with {:activity, %Activity{} = activity} <-
- {:activity, Activity.get_by_id_with_object(activity_id)},
- {:user, %User{} = user} <- {:user, User.get_by_ap_id(activity.object.data["actor"])} do
- CommonAPI.delete(activity.id, user)
- else
- {:activity, _} ->
- Logger.error(
- "#{__MODULE__} Couldn't delete expired activity: not found activity ##{activity_id}"
- )
-
- {:user, _} ->
- Logger.error(
- "#{__MODULE__} Couldn't delete expired activity: not found actor of ##{activity_id}"
- )
- end
- end
-end
diff --git a/lib/pleroma/workers/cron/stats_worker.ex b/lib/pleroma/workers/cron/stats_worker.ex
deleted file mode 100644
index 6a79540bc..000000000
--- a/lib/pleroma/workers/cron/stats_worker.ex
+++ /dev/null
@@ -1,17 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Workers.Cron.StatsWorker do
- @moduledoc """
- The worker to update peers statistics.
- """
-
- use Oban.Worker, queue: "background"
-
- @impl Oban.Worker
- def perform(_job) do
- Pleroma.Stats.do_collect()
- :ok
- end
-end
diff --git a/lib/pleroma/workers/mailer_worker.ex b/lib/pleroma/workers/mailer_worker.ex
index 32273cfa5..592230e7a 100644
--- a/lib/pleroma/workers/mailer_worker.ex
+++ b/lib/pleroma/workers/mailer_worker.ex
@@ -1,5 +1,5 @@
# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
+# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.MailerWorker do
diff --git a/lib/pleroma/workers/mute_expire_worker.ex b/lib/pleroma/workers/mute_expire_worker.ex
new file mode 100644
index 000000000..8da903e76
--- /dev/null
+++ b/lib/pleroma/workers/mute_expire_worker.ex
@@ -0,0 +1,20 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.MuteExpireWorker do
+ use Pleroma.Workers.WorkerHelper, queue: "mute_expire"
+
+ @impl Oban.Worker
+ def perform(%Job{args: %{"op" => "unmute_user", "muter_id" => muter_id, "mutee_id" => mutee_id}}) do
+ Pleroma.User.unmute(muter_id, mutee_id)
+ :ok
+ end
+
+ def perform(%Job{
+ args: %{"op" => "unmute_conversation", "user_id" => user_id, "activity_id" => activity_id}
+ }) do
+ Pleroma.Web.CommonAPI.remove_mute(user_id, activity_id)
+ :ok
+ end
+end
diff --git a/lib/pleroma/workers/poll_worker.ex b/lib/pleroma/workers/poll_worker.ex
new file mode 100644
index 000000000..3423cc889
--- /dev/null
+++ b/lib/pleroma/workers/poll_worker.ex
@@ -0,0 +1,45 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.PollWorker do
+ @moduledoc """
+ Generates notifications when a poll ends.
+ """
+ use Pleroma.Workers.WorkerHelper, queue: "poll_notifications"
+
+ alias Pleroma.Activity
+ alias Pleroma.Notification
+ alias Pleroma.Object
+
+ @impl Oban.Worker
+ def perform(%Job{args: %{"op" => "poll_end", "activity_id" => activity_id}}) do
+ with %Activity{} = activity <- find_poll_activity(activity_id) do
+ Notification.create_poll_notifications(activity)
+ end
+ end
+
+ defp find_poll_activity(activity_id) do
+ with nil <- Activity.get_by_id(activity_id) do
+ {:error, :poll_activity_not_found}
+ end
+ end
+
+ def schedule_poll_end(%Activity{data: %{"type" => "Create"}, id: activity_id} = activity) do
+ with %Object{data: %{"type" => "Question", "closed" => closed}} when is_binary(closed) <-
+ Object.normalize(activity),
+ {:ok, end_time} <- NaiveDateTime.from_iso8601(closed),
+ :gt <- NaiveDateTime.compare(end_time, NaiveDateTime.utc_now()) do
+ %{
+ op: "poll_end",
+ activity_id: activity_id
+ }
+ |> new(scheduled_at: end_time)
+ |> Oban.insert()
+ else
+ _ -> {:error, activity}
+ end
+ end
+
+ def schedule_poll_end(activity), do: {:error, activity}
+end
diff --git a/lib/pleroma/workers/publisher_worker.ex b/lib/pleroma/workers/publisher_worker.ex
index e739c3cd0..6209715b3 100644
--- a/lib/pleroma/workers/publisher_worker.ex
+++ b/lib/pleroma/workers/publisher_worker.ex
@@ -1,5 +1,5 @@
# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
+# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.PublisherWorker do
diff --git a/lib/pleroma/workers/purge_expired_activity.ex b/lib/pleroma/workers/purge_expired_activity.ex
new file mode 100644
index 000000000..027171c1e
--- /dev/null
+++ b/lib/pleroma/workers/purge_expired_activity.ex
@@ -0,0 +1,72 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.PurgeExpiredActivity do
+ @moduledoc """
+ Worker which purges expired activity.
+ """
+
+ use Oban.Worker, queue: :activity_expiration, max_attempts: 1, unique: [period: :infinity]
+
+ import Ecto.Query
+
+ alias Pleroma.Activity
+
+ @spec enqueue(map()) ::
+ {:ok, Oban.Job.t()}
+ | {:error, :expired_activities_disabled}
+ | {:error, :expiration_too_close}
+ def enqueue(args) do
+ with true <- enabled?() do
+ {scheduled_at, args} = Map.pop(args, :expires_at)
+
+ args
+ |> new(scheduled_at: scheduled_at)
+ |> Oban.insert()
+ end
+ end
+
+ @impl true
+ def perform(%Oban.Job{args: %{"activity_id" => id}}) do
+ with %Activity{} = activity <- find_activity(id),
+ %Pleroma.User{} = user <- find_user(activity.object.data["actor"]) do
+ Pleroma.Web.CommonAPI.delete(activity.id, user)
+ end
+ end
+
+ defp enabled? do
+ with false <- Pleroma.Config.get([__MODULE__, :enabled], false) do
+ {:error, :expired_activities_disabled}
+ end
+ end
+
+ defp find_activity(id) do
+ with nil <- Activity.get_by_id_with_object(id) do
+ {:error, :activity_not_found}
+ end
+ end
+
+ defp find_user(ap_id) do
+ with nil <- Pleroma.User.get_by_ap_id(ap_id) do
+ {:error, :user_not_found}
+ end
+ end
+
+ def get_expiration(id) do
+ from(j in Oban.Job,
+ where: j.state == "scheduled",
+ where: j.queue == "activity_expiration",
+ where: fragment("?->>'activity_id' = ?", j.args, ^id)
+ )
+ |> Pleroma.Repo.one()
+ end
+
+ @spec expires_late_enough?(DateTime.t()) :: boolean()
+ def expires_late_enough?(scheduled_at) do
+ now = DateTime.utc_now()
+ diff = DateTime.diff(scheduled_at, now, :millisecond)
+ min_lifetime = Pleroma.Config.get([__MODULE__, :min_lifetime], 600)
+ diff > :timer.seconds(min_lifetime)
+ end
+end
diff --git a/lib/pleroma/workers/purge_expired_filter.ex b/lib/pleroma/workers/purge_expired_filter.ex
new file mode 100644
index 000000000..4740d52e9
--- /dev/null
+++ b/lib/pleroma/workers/purge_expired_filter.ex
@@ -0,0 +1,43 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.PurgeExpiredFilter do
+ @moduledoc """
+ Worker which purges expired filters
+ """
+
+ use Oban.Worker, queue: :filter_expiration, max_attempts: 1, unique: [period: :infinity]
+
+ import Ecto.Query
+
+ alias Oban.Job
+ alias Pleroma.Repo
+
+ @spec enqueue(%{filter_id: integer(), expires_at: DateTime.t()}) ::
+ {:ok, Job.t()} | {:error, Ecto.Changeset.t()}
+ def enqueue(args) do
+ {scheduled_at, args} = Map.pop(args, :expires_at)
+
+ args
+ |> new(scheduled_at: scheduled_at)
+ |> Oban.insert()
+ end
+
+ @impl true
+ def perform(%Job{args: %{"filter_id" => id}}) do
+ Pleroma.Filter
+ |> Repo.get(id)
+ |> Repo.delete()
+ end
+
+ @spec get_expiration(pos_integer()) :: Job.t() | nil
+ def get_expiration(id) do
+ from(j in Job,
+ where: j.state == "scheduled",
+ where: j.queue == "filter_expiration",
+ where: fragment("?->'filter_id' = ?", j.args, ^id)
+ )
+ |> Repo.one()
+ end
+end
diff --git a/lib/pleroma/workers/purge_expired_token.ex b/lib/pleroma/workers/purge_expired_token.ex
new file mode 100644
index 000000000..cfdf5c6dc
--- /dev/null
+++ b/lib/pleroma/workers/purge_expired_token.ex
@@ -0,0 +1,29 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.PurgeExpiredToken do
+ @moduledoc """
+ Worker which purges expired OAuth tokens
+ """
+
+ use Oban.Worker, queue: :token_expiration, max_attempts: 1
+
+ @spec enqueue(%{token_id: integer(), valid_until: DateTime.t(), mod: module()}) ::
+ {:ok, Oban.Job.t()} | {:error, Ecto.Changeset.t()}
+ def enqueue(args) do
+ {scheduled_at, args} = Map.pop(args, :valid_until)
+
+ args
+ |> __MODULE__.new(scheduled_at: scheduled_at)
+ |> Oban.insert()
+ end
+
+ @impl true
+ def perform(%Oban.Job{args: %{"token_id" => id, "mod" => module}}) do
+ module
+ |> String.to_existing_atom()
+ |> Pleroma.Repo.get(id)
+ |> Pleroma.Repo.delete()
+ end
+end
diff --git a/lib/pleroma/workers/receiver_worker.ex b/lib/pleroma/workers/receiver_worker.ex
index 1b97af1a8..69125dcd0 100644
--- a/lib/pleroma/workers/receiver_worker.ex
+++ b/lib/pleroma/workers/receiver_worker.ex
@@ -1,5 +1,5 @@
# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
+# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.ReceiverWorker do
diff --git a/lib/pleroma/workers/remote_fetcher_worker.ex b/lib/pleroma/workers/remote_fetcher_worker.ex
index 27e2e3386..ad4d785a1 100644
--- a/lib/pleroma/workers/remote_fetcher_worker.ex
+++ b/lib/pleroma/workers/remote_fetcher_worker.ex
@@ -1,5 +1,5 @@
# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
+# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.RemoteFetcherWorker do
diff --git a/lib/pleroma/workers/scheduled_activity_worker.ex b/lib/pleroma/workers/scheduled_activity_worker.ex
index dd9986fe4..a4ab9928d 100644
--- a/lib/pleroma/workers/scheduled_activity_worker.ex
+++ b/lib/pleroma/workers/scheduled_activity_worker.ex
@@ -1,5 +1,5 @@
# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
+# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.ScheduledActivityWorker do
@@ -9,38 +9,50 @@ defmodule Pleroma.Workers.ScheduledActivityWorker do
use Pleroma.Workers.WorkerHelper, queue: "scheduled_activities"
- alias Pleroma.Config
+ alias Pleroma.Repo
alias Pleroma.ScheduledActivity
alias Pleroma.User
- alias Pleroma.Web.CommonAPI
require Logger
@impl Oban.Worker
def perform(%Job{args: %{"activity_id" => activity_id}}) do
- if Config.get([ScheduledActivity, :enabled]) do
- case Pleroma.Repo.get(ScheduledActivity, activity_id) do
- %ScheduledActivity{} = scheduled_activity ->
- post_activity(scheduled_activity)
-
- _ ->
- Logger.error("#{__MODULE__} Couldn't find scheduled activity: #{activity_id}")
- end
+ with %ScheduledActivity{} = scheduled_activity <- find_scheduled_activity(activity_id),
+ %User{} = user <- find_user(scheduled_activity.user_id) do
+ params = atomize_keys(scheduled_activity.params)
+
+ Repo.transaction(fn ->
+ {:ok, activity} = Pleroma.Web.CommonAPI.post(user, params)
+ {:ok, _} = ScheduledActivity.delete(scheduled_activity)
+ activity
+ end)
+ else
+ {:error, :scheduled_activity_not_found} = error ->
+ Logger.error("#{__MODULE__} Couldn't find scheduled activity: #{activity_id}")
+ error
+
+ {:error, :user_not_found} = error ->
+ Logger.error("#{__MODULE__} Couldn't find user for scheduled activity: #{activity_id}")
+ error
end
end
- defp post_activity(%ScheduledActivity{user_id: user_id, params: params} = scheduled_activity) do
- params = Map.new(params, fn {key, value} -> {String.to_existing_atom(key), value} end)
+ defp find_scheduled_activity(id) do
+ with nil <- Repo.get(ScheduledActivity, id) do
+ {:error, :scheduled_activity_not_found}
+ end
+ end
- with {:delete, {:ok, _}} <- {:delete, ScheduledActivity.delete(scheduled_activity)},
- {:user, %User{} = user} <- {:user, User.get_cached_by_id(user_id)},
- {:post, {:ok, _}} <- {:post, CommonAPI.post(user, params)} do
- :ok
- else
- error ->
- Logger.error(
- "#{__MODULE__} Couldn't create a status from the scheduled activity: #{inspect(error)}"
- )
+ defp find_user(id) do
+ with nil <- User.get_cached_by_id(id) do
+ {:error, :user_not_found}
end
end
+
+ defp atomize_keys(map) do
+ Map.new(map, fn
+ {key, value} when is_map(value) -> {String.to_existing_atom(key), atomize_keys(value)}
+ {key, value} -> {String.to_existing_atom(key), value}
+ end)
+ end
end
diff --git a/lib/pleroma/workers/transmogrifier_worker.ex b/lib/pleroma/workers/transmogrifier_worker.ex
index 15f36375c..b39c1ea62 100644
--- a/lib/pleroma/workers/transmogrifier_worker.ex
+++ b/lib/pleroma/workers/transmogrifier_worker.ex
@@ -1,5 +1,5 @@
# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
+# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.TransmogrifierWorker do
diff --git a/lib/pleroma/workers/web_pusher_worker.ex b/lib/pleroma/workers/web_pusher_worker.ex
index 0cfdc6a6f..8fc2aff26 100644
--- a/lib/pleroma/workers/web_pusher_worker.ex
+++ b/lib/pleroma/workers/web_pusher_worker.ex
@@ -1,5 +1,5 @@
# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
+# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.WebPusherWorker do
diff --git a/lib/pleroma/workers/worker_helper.ex b/lib/pleroma/workers/worker_helper.ex
index 7d1289be2..4befbeb3b 100644
--- a/lib/pleroma/workers/worker_helper.ex
+++ b/lib/pleroma/workers/worker_helper.ex
@@ -1,5 +1,5 @@
# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
+# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.WorkerHelper do