aboutsummaryrefslogtreecommitdiff
path: root/lib/pleroma/workers
diff options
context:
space:
mode:
Diffstat (limited to 'lib/pleroma/workers')
-rw-r--r--lib/pleroma/workers/activity_expiration_worker.ex18
-rw-r--r--lib/pleroma/workers/attachments_cleanup_worker.ex15
-rw-r--r--lib/pleroma/workers/background_worker.ex5
-rw-r--r--lib/pleroma/workers/cron/clear_oauth_token_worker.ex21
-rw-r--r--lib/pleroma/workers/cron/digest_emails_worker.ex58
-rw-r--r--lib/pleroma/workers/cron/purge_expired_activities_worker.ex46
-rw-r--r--lib/pleroma/workers/cron/stats_worker.ex16
-rw-r--r--lib/pleroma/workers/digest_emails_worker.ex16
-rw-r--r--lib/pleroma/workers/scheduled_activity_worker.ex36
9 files changed, 186 insertions, 45 deletions
diff --git a/lib/pleroma/workers/activity_expiration_worker.ex b/lib/pleroma/workers/activity_expiration_worker.ex
deleted file mode 100644
index 4e3e4195f..000000000
--- a/lib/pleroma/workers/activity_expiration_worker.ex
+++ /dev/null
@@ -1,18 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Workers.ActivityExpirationWorker do
- use Pleroma.Workers.WorkerHelper, queue: "activity_expiration"
-
- @impl Oban.Worker
- def perform(
- %{
- "op" => "activity_expiration",
- "activity_expiration_id" => activity_expiration_id
- },
- _job
- ) do
- Pleroma.Daemons.ActivityExpirationDaemon.perform(:execute, activity_expiration_id)
- end
-end
diff --git a/lib/pleroma/workers/attachments_cleanup_worker.ex b/lib/pleroma/workers/attachments_cleanup_worker.ex
index 3f421db40..2cbc6b64d 100644
--- a/lib/pleroma/workers/attachments_cleanup_worker.ex
+++ b/lib/pleroma/workers/attachments_cleanup_worker.ex
@@ -12,7 +12,10 @@ defmodule Pleroma.Workers.AttachmentsCleanupWorker do
@impl Oban.Worker
def perform(
- %{"object" => %{"data" => %{"attachment" => [_ | _] = attachments, "actor" => actor}}},
+ %{
+ "op" => "cleanup_attachments",
+ "object" => %{"data" => %{"attachment" => [_ | _] = attachments, "actor" => actor}}
+ },
_job
) do
hrefs =
@@ -37,7 +40,7 @@ defmodule Pleroma.Workers.AttachmentsCleanupWorker do
)
# The query above can be time consumptive on large instances until we
# refactor how uploads are stored
- |> Repo.all(timout: :infinity)
+ |> Repo.all(timeout: :infinity)
# we should delete 1 object for any given attachment, but don't delete
# files if there are more than 1 object for it
|> Enum.reduce(%{}, fn %{
@@ -70,7 +73,11 @@ defmodule Pleroma.Workers.AttachmentsCleanupWorker do
_ -> ""
end
- base_url = Pleroma.Config.get([__MODULE__, :base_url], Pleroma.Web.base_url())
+ base_url =
+ String.trim_trailing(
+ Pleroma.Config.get([Pleroma.Upload, :base_url], Pleroma.Web.base_url()),
+ "/"
+ )
file_path = String.trim_leading(href, "#{base_url}/#{prefix}")
@@ -84,5 +91,5 @@ defmodule Pleroma.Workers.AttachmentsCleanupWorker do
|> Repo.delete_all()
end
- def perform(%{"object" => _object}, _job), do: :ok
+ def perform(%{"op" => "cleanup_attachments", "object" => _object}, _job), do: :ok
end
diff --git a/lib/pleroma/workers/background_worker.ex b/lib/pleroma/workers/background_worker.ex
index 323a4da1e..ac2fe6946 100644
--- a/lib/pleroma/workers/background_worker.ex
+++ b/lib/pleroma/workers/background_worker.ex
@@ -6,7 +6,6 @@ defmodule Pleroma.Workers.BackgroundWorker do
alias Pleroma.Activity
alias Pleroma.User
alias Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy
- alias Pleroma.Web.OAuth.Token.CleanWorker
use Pleroma.Workers.WorkerHelper, queue: "background"
@@ -55,10 +54,6 @@ defmodule Pleroma.Workers.BackgroundWorker do
User.perform(:follow_import, follower, followed_identifiers)
end
- def perform(%{"op" => "clean_expired_tokens"}, _job) do
- CleanWorker.perform(:clean)
- end
-
def perform(%{"op" => "media_proxy_preload", "message" => message}, _job) do
MediaProxyWarmingPolicy.perform(:preload, message)
end
diff --git a/lib/pleroma/workers/cron/clear_oauth_token_worker.ex b/lib/pleroma/workers/cron/clear_oauth_token_worker.ex
new file mode 100644
index 000000000..a24407874
--- /dev/null
+++ b/lib/pleroma/workers/cron/clear_oauth_token_worker.ex
@@ -0,0 +1,21 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.Cron.ClearOauthTokenWorker do
+ @moduledoc """
+ The worker to cleanup expired oAuth tokens.
+ """
+
+ use Oban.Worker, queue: "background"
+
+ alias Pleroma.Config
+ alias Pleroma.Web.OAuth.Token
+
+ @impl Oban.Worker
+ def perform(_opts, _job) do
+ if Config.get([:oauth2, :clean_expired_tokens], false) do
+ Token.delete_expired_tokens()
+ end
+ end
+end
diff --git a/lib/pleroma/workers/cron/digest_emails_worker.ex b/lib/pleroma/workers/cron/digest_emails_worker.ex
new file mode 100644
index 000000000..0a00129df
--- /dev/null
+++ b/lib/pleroma/workers/cron/digest_emails_worker.ex
@@ -0,0 +1,58 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.Cron.DigestEmailsWorker do
+ @moduledoc """
+ The worker to send digest emails.
+ """
+
+ use Oban.Worker, queue: "digest_emails"
+
+ alias Pleroma.Config
+ alias Pleroma.Emails
+ alias Pleroma.Repo
+ alias Pleroma.User
+
+ import Ecto.Query
+
+ require Logger
+
+ @impl Oban.Worker
+ def perform(_opts, _job) do
+ config = Config.get([:email_notifications, :digest])
+
+ if config[:active] do
+ negative_interval = -Map.fetch!(config, :interval)
+ inactivity_threshold = Map.fetch!(config, :inactivity_threshold)
+ inactive_users_query = User.list_inactive_users_query(inactivity_threshold)
+
+ now = NaiveDateTime.truncate(NaiveDateTime.utc_now(), :second)
+
+ from(u in inactive_users_query,
+ where: fragment(~s(? ->'digest' @> 'true'), u.email_notifications),
+ where: u.last_digest_emailed_at < datetime_add(^now, ^negative_interval, "day"),
+ select: u
+ )
+ |> Repo.all()
+ |> send_emails
+ end
+ end
+
+ def send_emails(users) do
+ Enum.each(users, &send_email/1)
+ end
+
+ @doc """
+ Send digest email to the given user.
+ Updates `last_digest_emailed_at` field for the user and returns the updated user.
+ """
+ @spec send_email(User.t()) :: User.t()
+ def send_email(user) do
+ with %Swoosh.Email{} = email <- Emails.UserEmail.digest_email(user) do
+ Emails.Mailer.deliver_async(email)
+ end
+
+ User.touch_last_digest_emailed_at(user)
+ end
+end
diff --git a/lib/pleroma/workers/cron/purge_expired_activities_worker.ex b/lib/pleroma/workers/cron/purge_expired_activities_worker.ex
new file mode 100644
index 000000000..7a52860a9
--- /dev/null
+++ b/lib/pleroma/workers/cron/purge_expired_activities_worker.ex
@@ -0,0 +1,46 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.Cron.PurgeExpiredActivitiesWorker do
+ @moduledoc """
+ The worker to purge expired activities.
+ """
+
+ use Oban.Worker, queue: "activity_expiration"
+
+ alias Pleroma.Activity
+ alias Pleroma.ActivityExpiration
+ alias Pleroma.Config
+ alias Pleroma.User
+ alias Pleroma.Web.CommonAPI
+
+ require Logger
+
+ @interval :timer.minutes(1)
+
+ @impl Oban.Worker
+ def perform(_opts, _job) do
+ if Config.get([ActivityExpiration, :enabled]) do
+ Enum.each(ActivityExpiration.due_expirations(@interval), &delete_activity/1)
+ end
+ end
+
+ def delete_activity(%ActivityExpiration{activity_id: activity_id}) do
+ with {:activity, %Activity{} = activity} <-
+ {:activity, Activity.get_by_id_with_object(activity_id)},
+ {:user, %User{} = user} <- {:user, User.get_by_ap_id(activity.object.data["actor"])} do
+ CommonAPI.delete(activity.id, user)
+ else
+ {:activity, _} ->
+ Logger.error(
+ "#{__MODULE__} Couldn't delete expired activity: not found activity ##{activity_id}"
+ )
+
+ {:user, _} ->
+ Logger.error(
+ "#{__MODULE__} Couldn't delete expired activity: not found actorof ##{activity_id}"
+ )
+ end
+ end
+end
diff --git a/lib/pleroma/workers/cron/stats_worker.ex b/lib/pleroma/workers/cron/stats_worker.ex
new file mode 100644
index 000000000..425ad41ca
--- /dev/null
+++ b/lib/pleroma/workers/cron/stats_worker.ex
@@ -0,0 +1,16 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.Cron.StatsWorker do
+ @moduledoc """
+ The worker to update peers statistics.
+ """
+
+ use Oban.Worker, queue: "background"
+
+ @impl Oban.Worker
+ def perform(_opts, _job) do
+ Pleroma.Stats.do_collect()
+ end
+end
diff --git a/lib/pleroma/workers/digest_emails_worker.ex b/lib/pleroma/workers/digest_emails_worker.ex
deleted file mode 100644
index 3e5a836d0..000000000
--- a/lib/pleroma/workers/digest_emails_worker.ex
+++ /dev/null
@@ -1,16 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Workers.DigestEmailsWorker do
- alias Pleroma.User
-
- use Pleroma.Workers.WorkerHelper, queue: "digest_emails"
-
- @impl Oban.Worker
- def perform(%{"op" => "digest_email", "user_id" => user_id}, _job) do
- user_id
- |> User.get_cached_by_id()
- |> Pleroma.Daemons.DigestEmailDaemon.perform()
- end
-end
diff --git a/lib/pleroma/workers/scheduled_activity_worker.ex b/lib/pleroma/workers/scheduled_activity_worker.ex
index ca7d53af1..bd41ab4ce 100644
--- a/lib/pleroma/workers/scheduled_activity_worker.ex
+++ b/lib/pleroma/workers/scheduled_activity_worker.ex
@@ -3,10 +3,42 @@
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.ScheduledActivityWorker do
+ @moduledoc """
+ The worker to post scheduled activity.
+ """
+
use Pleroma.Workers.WorkerHelper, queue: "scheduled_activities"
+ alias Pleroma.Config
+ alias Pleroma.ScheduledActivity
+ alias Pleroma.User
+ alias Pleroma.Web.CommonAPI
+
+ require Logger
+
@impl Oban.Worker
- def perform(%{"op" => "execute", "activity_id" => activity_id}, _job) do
- Pleroma.Daemons.ScheduledActivityDaemon.perform(:execute, activity_id)
+ def perform(%{"activity_id" => activity_id}, _job) do
+ if Config.get([ScheduledActivity, :enabled]) do
+ case Pleroma.Repo.get(ScheduledActivity, activity_id) do
+ %ScheduledActivity{} = scheduled_activity ->
+ post_activity(scheduled_activity)
+
+ _ ->
+ Logger.error("#{__MODULE__} Couldn't find scheduled activity: #{activity_id}")
+ end
+ end
+ end
+
+ defp post_activity(%ScheduledActivity{user_id: user_id, params: params} = scheduled_activity) do
+ with {:delete, {:ok, _}} <- {:delete, ScheduledActivity.delete(scheduled_activity)},
+ {:user, %User{} = user} <- {:user, User.get_cached_by_id(user_id)},
+ {:post, {:ok, _}} <- {:post, CommonAPI.post(user, params)} do
+ :ok
+ else
+ error ->
+ Logger.error(
+ "#{__MODULE__} Couldn't create a status from the scheduled activity: #{inspect(error)}"
+ )
+ end
end
end