aboutsummaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/mix/tasks/pleroma/email.ex5
-rw-r--r--lib/pleroma/application.ex12
-rw-r--r--lib/pleroma/daemons/activity_expiration_daemon.ex66
-rw-r--r--lib/pleroma/daemons/digest_email_daemon.ex42
-rw-r--r--lib/pleroma/daemons/scheduled_activity_daemon.ex62
-rw-r--r--lib/pleroma/docs/generator.ex2
-rw-r--r--lib/pleroma/following_relationship.ex2
-rw-r--r--lib/pleroma/plugs/parsers_plug.ex21
-rw-r--r--lib/pleroma/scheduled_activity.ex83
-rw-r--r--lib/pleroma/scheduler.ex7
-rw-r--r--lib/pleroma/stats.ex39
-rw-r--r--lib/pleroma/user.ex43
-rw-r--r--lib/pleroma/web/activity_pub/utils.ex9
-rw-r--r--lib/pleroma/web/common_api/utils.ex6
-rw-r--r--lib/pleroma/web/endpoint.ex12
-rw-r--r--lib/pleroma/web/mastodon_api/controllers/status_controller.ex19
-rw-r--r--lib/pleroma/web/oauth/token/clean_worker.ex34
-rw-r--r--lib/pleroma/workers/activity_expiration_worker.ex18
-rw-r--r--lib/pleroma/workers/background_worker.ex5
-rw-r--r--lib/pleroma/workers/cron/clear_oauth_token_worker.ex21
-rw-r--r--lib/pleroma/workers/cron/digest_emails_worker.ex58
-rw-r--r--lib/pleroma/workers/cron/purge_expired_activities_worker.ex46
-rw-r--r--lib/pleroma/workers/cron/stats_worker.ex16
-rw-r--r--lib/pleroma/workers/digest_emails_worker.ex16
-rw-r--r--lib/pleroma/workers/scheduled_activity_worker.ex36
25 files changed, 339 insertions, 341 deletions
diff --git a/lib/mix/tasks/pleroma/email.ex b/lib/mix/tasks/pleroma/email.ex
index 2c3801429..d3fac6ec8 100644
--- a/lib/mix/tasks/pleroma/email.ex
+++ b/lib/mix/tasks/pleroma/email.ex
@@ -1,5 +1,6 @@
defmodule Mix.Tasks.Pleroma.Email do
use Mix.Task
+ import Mix.Pleroma
@shortdoc "Simple Email test"
@moduledoc File.read!("docs/administration/CLI_tasks/email.md")
@@ -18,8 +19,6 @@ defmodule Mix.Tasks.Pleroma.Email do
email = Pleroma.Emails.AdminEmail.test_email(options[:to])
{:ok, _} = Pleroma.Emails.Mailer.deliver(email)
- Mix.shell().info(
- "Test email has been sent to #{inspect(email.to)} from #{inspect(email.from)}"
- )
+ shell_info("Test email has been sent to #{inspect(email.to)} from #{inspect(email.from)}")
end
end
diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex
index 2c8889ce5..27758cf94 100644
--- a/lib/pleroma/application.ex
+++ b/lib/pleroma/application.ex
@@ -42,12 +42,9 @@ defmodule Pleroma.Application do
children =
[
Pleroma.Repo,
- Pleroma.Scheduler,
Pleroma.Config.TransferTask,
Pleroma.Emoji,
Pleroma.Captcha,
- Pleroma.Daemons.ScheduledActivityDaemon,
- Pleroma.Daemons.ActivityExpirationDaemon,
Pleroma.Plugs.RateLimiter.Supervisor
] ++
cachex_children() ++
@@ -58,7 +55,6 @@ defmodule Pleroma.Application do
{Oban, Pleroma.Config.get(Oban)}
] ++
task_children(@env) ++
- oauth_cleanup_child(oauth_cleanup_enabled?()) ++
streamer_child(@env) ++
chat_child(@env, chat_enabled?()) ++
[
@@ -160,20 +156,12 @@ defmodule Pleroma.Application do
defp chat_enabled?, do: Pleroma.Config.get([:chat, :enabled])
- defp oauth_cleanup_enabled?,
- do: Pleroma.Config.get([:oauth2, :clean_expired_tokens], false)
-
defp streamer_child(:test), do: []
defp streamer_child(_) do
[Pleroma.Web.Streamer.supervisor()]
end
- defp oauth_cleanup_child(true),
- do: [Pleroma.Web.OAuth.Token.CleanWorker]
-
- defp oauth_cleanup_child(_), do: []
-
defp chat_child(_env, true) do
[Pleroma.Web.ChatChannel.ChatChannelState]
end
diff --git a/lib/pleroma/daemons/activity_expiration_daemon.ex b/lib/pleroma/daemons/activity_expiration_daemon.ex
deleted file mode 100644
index cab7628c4..000000000
--- a/lib/pleroma/daemons/activity_expiration_daemon.ex
+++ /dev/null
@@ -1,66 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2019 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Daemons.ActivityExpirationDaemon do
- alias Pleroma.Activity
- alias Pleroma.ActivityExpiration
- alias Pleroma.Config
- alias Pleroma.Repo
- alias Pleroma.User
- alias Pleroma.Web.CommonAPI
-
- require Logger
- use GenServer
- import Ecto.Query
-
- @schedule_interval :timer.minutes(1)
-
- def start_link(_) do
- GenServer.start_link(__MODULE__, nil)
- end
-
- @impl true
- def init(_) do
- if Config.get([ActivityExpiration, :enabled]) do
- schedule_next()
- {:ok, nil}
- else
- :ignore
- end
- end
-
- def perform(:execute, expiration_id) do
- try do
- expiration =
- ActivityExpiration
- |> where([e], e.id == ^expiration_id)
- |> Repo.one!()
-
- activity = Activity.get_by_id_with_object(expiration.activity_id)
- user = User.get_by_ap_id(activity.object.data["actor"])
- CommonAPI.delete(activity.id, user)
- rescue
- error ->
- Logger.error("#{__MODULE__} Couldn't delete expired activity: #{inspect(error)}")
- end
- end
-
- @impl true
- def handle_info(:perform, state) do
- ActivityExpiration.due_expirations(@schedule_interval)
- |> Enum.each(fn expiration ->
- Pleroma.Workers.ActivityExpirationWorker.enqueue(
- "activity_expiration",
- %{"activity_expiration_id" => expiration.id}
- )
- end)
-
- schedule_next()
- {:noreply, state}
- end
-
- defp schedule_next do
- Process.send_after(self(), :perform, @schedule_interval)
- end
-end
diff --git a/lib/pleroma/daemons/digest_email_daemon.ex b/lib/pleroma/daemons/digest_email_daemon.ex
deleted file mode 100644
index b4c8eaad9..000000000
--- a/lib/pleroma/daemons/digest_email_daemon.ex
+++ /dev/null
@@ -1,42 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Daemons.DigestEmailDaemon do
- alias Pleroma.Repo
- alias Pleroma.Workers.DigestEmailsWorker
-
- import Ecto.Query
-
- def perform do
- config = Pleroma.Config.get([:email_notifications, :digest])
- negative_interval = -Map.fetch!(config, :interval)
- inactivity_threshold = Map.fetch!(config, :inactivity_threshold)
- inactive_users_query = Pleroma.User.list_inactive_users_query(inactivity_threshold)
-
- now = NaiveDateTime.truncate(NaiveDateTime.utc_now(), :second)
-
- from(u in inactive_users_query,
- where: fragment(~s(? ->'digest' @> 'true'), u.email_notifications),
- where: u.last_digest_emailed_at < datetime_add(^now, ^negative_interval, "day"),
- select: u
- )
- |> Repo.all()
- |> Enum.each(fn user ->
- DigestEmailsWorker.enqueue("digest_email", %{"user_id" => user.id})
- end)
- end
-
- @doc """
- Send digest email to the given user.
- Updates `last_digest_emailed_at` field for the user and returns the updated user.
- """
- @spec perform(Pleroma.User.t()) :: Pleroma.User.t()
- def perform(user) do
- with %Swoosh.Email{} = email <- Pleroma.Emails.UserEmail.digest_email(user) do
- Pleroma.Emails.Mailer.deliver_async(email)
- end
-
- Pleroma.User.touch_last_digest_emailed_at(user)
- end
-end
diff --git a/lib/pleroma/daemons/scheduled_activity_daemon.ex b/lib/pleroma/daemons/scheduled_activity_daemon.ex
deleted file mode 100644
index aee5f723a..000000000
--- a/lib/pleroma/daemons/scheduled_activity_daemon.ex
+++ /dev/null
@@ -1,62 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Daemons.ScheduledActivityDaemon do
- @moduledoc """
- Sends scheduled activities to the job queue.
- """
-
- alias Pleroma.Config
- alias Pleroma.ScheduledActivity
- alias Pleroma.User
- alias Pleroma.Web.CommonAPI
-
- use GenServer
- require Logger
-
- @schedule_interval :timer.minutes(1)
-
- def start_link(_) do
- GenServer.start_link(__MODULE__, nil)
- end
-
- def init(_) do
- if Config.get([ScheduledActivity, :enabled]) do
- schedule_next()
- {:ok, nil}
- else
- :ignore
- end
- end
-
- def perform(:execute, scheduled_activity_id) do
- try do
- {:ok, scheduled_activity} = ScheduledActivity.delete(scheduled_activity_id)
- %User{} = user = User.get_cached_by_id(scheduled_activity.user_id)
- {:ok, _result} = CommonAPI.post(user, scheduled_activity.params)
- rescue
- error ->
- Logger.error(
- "#{__MODULE__} Couldn't create a status from the scheduled activity: #{inspect(error)}"
- )
- end
- end
-
- def handle_info(:perform, state) do
- ScheduledActivity.due_activities(@schedule_interval)
- |> Enum.each(fn scheduled_activity ->
- Pleroma.Workers.ScheduledActivityWorker.enqueue(
- "execute",
- %{"activity_id" => scheduled_activity.id}
- )
- end)
-
- schedule_next()
- {:noreply, state}
- end
-
- defp schedule_next do
- Process.send_after(self(), :perform, @schedule_interval)
- end
-end
diff --git a/lib/pleroma/docs/generator.ex b/lib/pleroma/docs/generator.ex
index 6b12dcdd9..e0fc8cd02 100644
--- a/lib/pleroma/docs/generator.ex
+++ b/lib/pleroma/docs/generator.ex
@@ -13,7 +13,7 @@ defmodule Pleroma.Docs.Generator do
|> Enum.filter(&String.ends_with?(&1, ".ex"))
|> Enum.map(fn filename ->
module = filename |> String.trim_trailing(".ex") |> Macro.camelize()
- String.to_existing_atom(start <> module)
+ String.to_atom(start <> module)
end)
end
end
diff --git a/lib/pleroma/following_relationship.ex b/lib/pleroma/following_relationship.ex
index 0b0219b82..b8cb3bf03 100644
--- a/lib/pleroma/following_relationship.ex
+++ b/lib/pleroma/following_relationship.ex
@@ -58,8 +58,8 @@ defmodule Pleroma.FollowingRelationship do
def unfollow(%User{} = follower, %User{} = following) do
case get(follower, following) do
- nil -> {:ok, nil}
%__MODULE__{} = following_relationship -> Repo.delete(following_relationship)
+ _ -> {:ok, nil}
end
end
diff --git a/lib/pleroma/plugs/parsers_plug.ex b/lib/pleroma/plugs/parsers_plug.ex
deleted file mode 100644
index 2e493ce0e..000000000
--- a/lib/pleroma/plugs/parsers_plug.ex
+++ /dev/null
@@ -1,21 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Plugs.Parsers do
- @moduledoc "Initializes Plug.Parsers with upload limit set at boot time"
-
- @behaviour Plug
-
- def init(_opts) do
- Plug.Parsers.init(
- parsers: [:urlencoded, :multipart, :json],
- pass: ["*/*"],
- json_decoder: Jason,
- length: Pleroma.Config.get([:instance, :upload_limit]),
- body_reader: {Pleroma.Web.Plugs.DigestPlug, :read_body, []}
- )
- end
-
- defdelegate call(conn, opts), to: Plug.Parsers
-end
diff --git a/lib/pleroma/scheduled_activity.ex b/lib/pleroma/scheduled_activity.ex
index fea2cf3ff..e81bfcd7d 100644
--- a/lib/pleroma/scheduled_activity.ex
+++ b/lib/pleroma/scheduled_activity.ex
@@ -5,15 +5,19 @@
defmodule Pleroma.ScheduledActivity do
use Ecto.Schema
+ alias Ecto.Multi
alias Pleroma.Config
alias Pleroma.Repo
alias Pleroma.ScheduledActivity
alias Pleroma.User
alias Pleroma.Web.CommonAPI.Utils
+ alias Pleroma.Workers.ScheduledActivityWorker
import Ecto.Query
import Ecto.Changeset
+ @type t :: %__MODULE__{}
+
@min_offset :timer.minutes(5)
schema "scheduled_activities" do
@@ -105,16 +109,32 @@ defmodule Pleroma.ScheduledActivity do
end
def new(%User{} = user, attrs) do
- %ScheduledActivity{user_id: user.id}
- |> changeset(attrs)
+ changeset(%ScheduledActivity{user_id: user.id}, attrs)
end
+ @doc """
+ Creates ScheduledActivity and add to queue to perform at scheduled_at date
+ """
+ @spec create(User.t(), map()) :: {:ok, ScheduledActivity.t()} | {:error, Ecto.Changeset.t()}
def create(%User{} = user, attrs) do
- user
- |> new(attrs)
- |> Repo.insert()
+ Multi.new()
+ |> Multi.insert(:scheduled_activity, new(user, attrs))
+ |> maybe_add_jobs(Config.get([ScheduledActivity, :enabled]))
+ |> Repo.transaction()
+ |> transaction_response
+ end
+
+ defp maybe_add_jobs(multi, true) do
+ multi
+ |> Multi.run(:scheduled_activity_job, fn _repo, %{scheduled_activity: activity} ->
+ %{activity_id: activity.id}
+ |> ScheduledActivityWorker.new(scheduled_at: activity.scheduled_at)
+ |> Oban.insert()
+ end)
end
+ defp maybe_add_jobs(multi, _), do: multi
+
def get(%User{} = user, scheduled_activity_id) do
ScheduledActivity
|> where(user_id: ^user.id)
@@ -122,25 +142,43 @@ defmodule Pleroma.ScheduledActivity do
|> Repo.one()
end
- def update(%ScheduledActivity{} = scheduled_activity, attrs) do
- scheduled_activity
- |> update_changeset(attrs)
- |> Repo.update()
+ @spec update(ScheduledActivity.t(), map()) ::
+ {:ok, ScheduledActivity.t()} | {:error, Ecto.Changeset.t()}
+ def update(%ScheduledActivity{id: id} = scheduled_activity, attrs) do
+ with {:error, %Ecto.Changeset{valid?: true} = changeset} <-
+ {:error, update_changeset(scheduled_activity, attrs)} do
+ Multi.new()
+ |> Multi.update(:scheduled_activity, changeset)
+ |> Multi.update_all(:scheduled_job, job_query(id),
+ set: [scheduled_at: get_field(changeset, :scheduled_at)]
+ )
+ |> Repo.transaction()
+ |> transaction_response
+ end
end
- def delete(%ScheduledActivity{} = scheduled_activity) do
- scheduled_activity
- |> Repo.delete()
+ @doc "Deletes a ScheduledActivity and linked jobs."
+ @spec delete(ScheduledActivity.t() | binary() | integer) ::
+ {:ok, ScheduledActivity.t()} | {:error, Ecto.Changeset.t()}
+ def delete(%ScheduledActivity{id: id} = scheduled_activity) do
+ Multi.new()
+ |> Multi.delete(:scheduled_activity, scheduled_activity, stale_error_field: :id)
+ |> Multi.delete_all(:jobs, job_query(id))
+ |> Repo.transaction()
+ |> transaction_response
end
def delete(id) when is_binary(id) or is_integer(id) do
- ScheduledActivity
- |> where(id: ^id)
- |> select([sa], sa)
- |> Repo.delete_all()
- |> case do
- {1, [scheduled_activity]} -> {:ok, scheduled_activity}
- _ -> :error
+ delete(%__MODULE__{id: id})
+ end
+
+ defp transaction_response(result) do
+ case result do
+ {:ok, %{scheduled_activity: scheduled_activity}} ->
+ {:ok, scheduled_activity}
+
+ {:error, _, changeset, _} ->
+ {:error, changeset}
end
end
@@ -158,4 +196,11 @@ defmodule Pleroma.ScheduledActivity do
|> where([sa], sa.scheduled_at < ^naive_datetime)
|> Repo.all()
end
+
+ def job_query(scheduled_activity_id) do
+ from(j in Oban.Job,
+ where: j.queue == "scheduled_activities",
+ where: fragment("args ->> 'activity_id' = ?::text", ^to_string(scheduled_activity_id))
+ )
+ end
end
diff --git a/lib/pleroma/scheduler.ex b/lib/pleroma/scheduler.ex
deleted file mode 100644
index d84cd99ad..000000000
--- a/lib/pleroma/scheduler.ex
+++ /dev/null
@@ -1,7 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Scheduler do
- use Quantum.Scheduler, otp_app: :pleroma
-end
diff --git a/lib/pleroma/stats.ex b/lib/pleroma/stats.ex
index 8154a09b7..cf590fb01 100644
--- a/lib/pleroma/stats.ex
+++ b/lib/pleroma/stats.ex
@@ -9,22 +9,43 @@ defmodule Pleroma.Stats do
use GenServer
- @interval 1000 * 60 * 60
+ @init_state %{
+ peers: [],
+ stats: %{
+ domain_count: 0,
+ status_count: 0,
+ user_count: 0
+ }
+ }
def start_link(_) do
- GenServer.start_link(__MODULE__, initial_data(), name: __MODULE__)
+ GenServer.start_link(
+ __MODULE__,
+ @init_state,
+ name: __MODULE__
+ )
end
+ @doc "Performs update stats"
def force_update do
GenServer.call(__MODULE__, :force_update)
end
+ @doc "Performs collect stats"
+ def do_collect do
+ GenServer.cast(__MODULE__, :run_update)
+ end
+
+ @doc "Returns stats data"
+ @spec get_stats() :: %{domain_count: integer(), status_count: integer(), user_count: integer()}
def get_stats do
%{stats: stats} = GenServer.call(__MODULE__, :get_state)
stats
end
+ @doc "Returns list peers"
+ @spec get_peers() :: list(String.t())
def get_peers do
%{peers: peers} = GenServer.call(__MODULE__, :get_state)
@@ -32,7 +53,6 @@ defmodule Pleroma.Stats do
end
def init(args) do
- Process.send(self(), :run_update, [])
{:ok, args}
end
@@ -45,17 +65,12 @@ defmodule Pleroma.Stats do
{:reply, state, state}
end
- def handle_info(:run_update, _state) do
+ def handle_cast(:run_update, _state) do
new_stats = get_stat_data()
- Process.send_after(self(), :run_update, @interval)
{:noreply, new_stats}
end
- defp initial_data do
- %{peers: [], stats: %{}}
- end
-
defp get_stat_data do
peers =
from(
@@ -74,7 +89,11 @@ defmodule Pleroma.Stats do
%{
peers: peers,
- stats: %{domain_count: domain_count, status_count: status_count, user_count: user_count}
+ stats: %{
+ domain_count: domain_count,
+ status_count: status_count,
+ user_count: user_count
+ }
}
end
end
diff --git a/lib/pleroma/user.ex b/lib/pleroma/user.ex
index 3c86cdb38..5ea36fea3 100644
--- a/lib/pleroma/user.ex
+++ b/lib/pleroma/user.ex
@@ -647,25 +647,48 @@ defmodule Pleroma.User do
end
end
+ def unfollow(%User{ap_id: ap_id}, %User{ap_id: ap_id}) do
+ {:error, "Not subscribed!"}
+ end
+
def unfollow(%User{} = follower, %User{} = followed) do
- if following?(follower, followed) and follower.ap_id != followed.ap_id do
- FollowingRelationship.unfollow(follower, followed)
+ case get_follow_state(follower, followed) do
+ state when state in ["accept", "pending"] ->
+ FollowingRelationship.unfollow(follower, followed)
+ {:ok, followed} = update_follower_count(followed)
- {:ok, followed} = update_follower_count(followed)
+ {:ok, follower} =
+ follower
+ |> update_following_count()
+ |> set_cache()
- {:ok, follower} =
- follower
- |> update_following_count()
- |> set_cache()
+ {:ok, follower, Utils.fetch_latest_follow(follower, followed)}
- {:ok, follower, Utils.fetch_latest_follow(follower, followed)}
- else
- {:error, "Not subscribed!"}
+ nil ->
+ {:error, "Not subscribed!"}
end
end
defdelegate following?(follower, followed), to: FollowingRelationship
+ def get_follow_state(%User{} = follower, %User{} = following) do
+ following_relationship = FollowingRelationship.get(follower, following)
+
+ case {following_relationship, following.local} do
+ {nil, false} ->
+ case Utils.fetch_latest_follow(follower, following) do
+ %{data: %{"state" => state}} when state in ["pending", "accept"] -> state
+ _ -> nil
+ end
+
+ {%{state: state}, _} ->
+ state
+
+ {nil, _} ->
+ nil
+ end
+ end
+
def locked?(%User{} = user) do
user.locked || false
end
diff --git a/lib/pleroma/web/activity_pub/utils.ex b/lib/pleroma/web/activity_pub/utils.ex
index 4bcacc6d1..10ce5eee8 100644
--- a/lib/pleroma/web/activity_pub/utils.ex
+++ b/lib/pleroma/web/activity_pub/utils.ex
@@ -490,6 +490,15 @@ defmodule Pleroma.Web.ActivityPub.Utils do
|> Repo.one()
end
+ def fetch_latest_undo(%User{ap_id: ap_id}) do
+ "Undo"
+ |> Activity.Queries.by_type()
+ |> where(actor: ^ap_id)
+ |> order_by([activity], fragment("? desc nulls last", activity.id))
+ |> limit(1)
+ |> Repo.one()
+ end
+
def get_latest_reaction(internal_activity_id, %{ap_id: ap_id}, emoji) do
%{data: %{"object" => object_ap_id}} = Activity.get_by_id(internal_activity_id)
diff --git a/lib/pleroma/web/common_api/utils.ex b/lib/pleroma/web/common_api/utils.ex
index a9b164d9a..ca6c93862 100644
--- a/lib/pleroma/web/common_api/utils.ex
+++ b/lib/pleroma/web/common_api/utils.ex
@@ -179,9 +179,9 @@ defmodule Pleroma.Web.CommonAPI.Utils do
end)
end_time =
- NaiveDateTime.utc_now()
- |> NaiveDateTime.add(expires_in)
- |> NaiveDateTime.to_iso8601()
+ DateTime.utc_now()
+ |> DateTime.add(expires_in)
+ |> DateTime.to_iso8601()
key = if truthy_param?(data["poll"]["multiple"]), do: "anyOf", else: "oneOf"
poll = %{"type" => "Question", key => option_notes, "closed" => end_time}
diff --git a/lib/pleroma/web/endpoint.ex b/lib/pleroma/web/endpoint.ex
index d32c38a05..a77b73109 100644
--- a/lib/pleroma/web/endpoint.ex
+++ b/lib/pleroma/web/endpoint.ex
@@ -61,7 +61,17 @@ defmodule Pleroma.Web.Endpoint do
plug(Plug.RequestId)
plug(Plug.Logger, log: :debug)
- plug(Pleroma.Plugs.Parsers)
+ plug(Plug.Parsers,
+ parsers: [
+ :urlencoded,
+ {:multipart, length: {Pleroma.Config, :get, [[:instance, :upload_limit]]}},
+ :json
+ ],
+ pass: ["*/*"],
+ json_decoder: Jason,
+ length: Pleroma.Config.get([:instance, :upload_limit]),
+ body_reader: {Pleroma.Web.Plugs.DigestPlug, :read_body, []}
+ )
plug(Plug.MethodOverride)
plug(Plug.Head)
diff --git a/lib/pleroma/web/mastodon_api/controllers/status_controller.ex b/lib/pleroma/web/mastodon_api/controllers/status_controller.ex
index 1149fb469..287d1631c 100644
--- a/lib/pleroma/web/mastodon_api/controllers/status_controller.ex
+++ b/lib/pleroma/web/mastodon_api/controllers/status_controller.ex
@@ -124,15 +124,18 @@ defmodule Pleroma.Web.MastodonAPI.StatusController do
) do
params = Map.put(params, "in_reply_to_status_id", params["in_reply_to_id"])
- if ScheduledActivity.far_enough?(scheduled_at) do
- with {:ok, scheduled_activity} <-
- ScheduledActivity.create(user, %{"params" => params, "scheduled_at" => scheduled_at}) do
- conn
- |> put_view(ScheduledActivityView)
- |> render("show.json", scheduled_activity: scheduled_activity)
- end
+ with {:far_enough, true} <- {:far_enough, ScheduledActivity.far_enough?(scheduled_at)},
+ attrs <- %{"params" => params, "scheduled_at" => scheduled_at},
+ {:ok, scheduled_activity} <- ScheduledActivity.create(user, attrs) do
+ conn
+ |> put_view(ScheduledActivityView)
+ |> render("show.json", scheduled_activity: scheduled_activity)
else
- create(conn, Map.drop(params, ["scheduled_at"]))
+ {:far_enough, _} ->
+ create(conn, Map.drop(params, ["scheduled_at"]))
+
+ error ->
+ error
end
end
diff --git a/lib/pleroma/web/oauth/token/clean_worker.ex b/lib/pleroma/web/oauth/token/clean_worker.ex
deleted file mode 100644
index 3c9c580d5..000000000
--- a/lib/pleroma/web/oauth/token/clean_worker.ex
+++ /dev/null
@@ -1,34 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Web.OAuth.Token.CleanWorker do
- @moduledoc """
- The module represents functions to clean an expired oauth tokens.
- """
- use GenServer
-
- @ten_seconds 10_000
- @one_day 86_400_000
-
- alias Pleroma.Web.OAuth.Token
- alias Pleroma.Workers.BackgroundWorker
-
- def start_link(_), do: GenServer.start_link(__MODULE__, %{})
-
- def init(_) do
- Process.send_after(self(), :perform, @ten_seconds)
- {:ok, nil}
- end
-
- @doc false
- def handle_info(:perform, state) do
- BackgroundWorker.enqueue("clean_expired_tokens", %{})
- interval = Pleroma.Config.get([:oauth2, :clean_expired_tokens_interval], @one_day)
-
- Process.send_after(self(), :perform, interval)
- {:noreply, state}
- end
-
- def perform(:clean), do: Token.delete_expired_tokens()
-end
diff --git a/lib/pleroma/workers/activity_expiration_worker.ex b/lib/pleroma/workers/activity_expiration_worker.ex
deleted file mode 100644
index 4e3e4195f..000000000
--- a/lib/pleroma/workers/activity_expiration_worker.ex
+++ /dev/null
@@ -1,18 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Workers.ActivityExpirationWorker do
- use Pleroma.Workers.WorkerHelper, queue: "activity_expiration"
-
- @impl Oban.Worker
- def perform(
- %{
- "op" => "activity_expiration",
- "activity_expiration_id" => activity_expiration_id
- },
- _job
- ) do
- Pleroma.Daemons.ActivityExpirationDaemon.perform(:execute, activity_expiration_id)
- end
-end
diff --git a/lib/pleroma/workers/background_worker.ex b/lib/pleroma/workers/background_worker.ex
index 323a4da1e..ac2fe6946 100644
--- a/lib/pleroma/workers/background_worker.ex
+++ b/lib/pleroma/workers/background_worker.ex
@@ -6,7 +6,6 @@ defmodule Pleroma.Workers.BackgroundWorker do
alias Pleroma.Activity
alias Pleroma.User
alias Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy
- alias Pleroma.Web.OAuth.Token.CleanWorker
use Pleroma.Workers.WorkerHelper, queue: "background"
@@ -55,10 +54,6 @@ defmodule Pleroma.Workers.BackgroundWorker do
User.perform(:follow_import, follower, followed_identifiers)
end
- def perform(%{"op" => "clean_expired_tokens"}, _job) do
- CleanWorker.perform(:clean)
- end
-
def perform(%{"op" => "media_proxy_preload", "message" => message}, _job) do
MediaProxyWarmingPolicy.perform(:preload, message)
end
diff --git a/lib/pleroma/workers/cron/clear_oauth_token_worker.ex b/lib/pleroma/workers/cron/clear_oauth_token_worker.ex
new file mode 100644
index 000000000..a24407874
--- /dev/null
+++ b/lib/pleroma/workers/cron/clear_oauth_token_worker.ex
@@ -0,0 +1,21 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.Cron.ClearOauthTokenWorker do
+ @moduledoc """
+ The worker to cleanup expired oAuth tokens.
+ """
+
+ use Oban.Worker, queue: "background"
+
+ alias Pleroma.Config
+ alias Pleroma.Web.OAuth.Token
+
+ @impl Oban.Worker
+ def perform(_opts, _job) do
+ if Config.get([:oauth2, :clean_expired_tokens], false) do
+ Token.delete_expired_tokens()
+ end
+ end
+end
diff --git a/lib/pleroma/workers/cron/digest_emails_worker.ex b/lib/pleroma/workers/cron/digest_emails_worker.ex
new file mode 100644
index 000000000..0a00129df
--- /dev/null
+++ b/lib/pleroma/workers/cron/digest_emails_worker.ex
@@ -0,0 +1,58 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.Cron.DigestEmailsWorker do
+ @moduledoc """
+ The worker to send digest emails.
+ """
+
+ use Oban.Worker, queue: "digest_emails"
+
+ alias Pleroma.Config
+ alias Pleroma.Emails
+ alias Pleroma.Repo
+ alias Pleroma.User
+
+ import Ecto.Query
+
+ require Logger
+
+ @impl Oban.Worker
+ def perform(_opts, _job) do
+ config = Config.get([:email_notifications, :digest])
+
+ if config[:active] do
+ negative_interval = -Map.fetch!(config, :interval)
+ inactivity_threshold = Map.fetch!(config, :inactivity_threshold)
+ inactive_users_query = User.list_inactive_users_query(inactivity_threshold)
+
+ now = NaiveDateTime.truncate(NaiveDateTime.utc_now(), :second)
+
+ from(u in inactive_users_query,
+ where: fragment(~s(? ->'digest' @> 'true'), u.email_notifications),
+ where: u.last_digest_emailed_at < datetime_add(^now, ^negative_interval, "day"),
+ select: u
+ )
+ |> Repo.all()
+ |> send_emails
+ end
+ end
+
+ def send_emails(users) do
+ Enum.each(users, &send_email/1)
+ end
+
+ @doc """
+ Send digest email to the given user.
+ Updates `last_digest_emailed_at` field for the user and returns the updated user.
+ """
+ @spec send_email(User.t()) :: User.t()
+ def send_email(user) do
+ with %Swoosh.Email{} = email <- Emails.UserEmail.digest_email(user) do
+ Emails.Mailer.deliver_async(email)
+ end
+
+ User.touch_last_digest_emailed_at(user)
+ end
+end
diff --git a/lib/pleroma/workers/cron/purge_expired_activities_worker.ex b/lib/pleroma/workers/cron/purge_expired_activities_worker.ex
new file mode 100644
index 000000000..7a52860a9
--- /dev/null
+++ b/lib/pleroma/workers/cron/purge_expired_activities_worker.ex
@@ -0,0 +1,46 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.Cron.PurgeExpiredActivitiesWorker do
+ @moduledoc """
+ The worker to purge expired activities.
+ """
+
+ use Oban.Worker, queue: "activity_expiration"
+
+ alias Pleroma.Activity
+ alias Pleroma.ActivityExpiration
+ alias Pleroma.Config
+ alias Pleroma.User
+ alias Pleroma.Web.CommonAPI
+
+ require Logger
+
+ @interval :timer.minutes(1)
+
+ @impl Oban.Worker
+ def perform(_opts, _job) do
+ if Config.get([ActivityExpiration, :enabled]) do
+ Enum.each(ActivityExpiration.due_expirations(@interval), &delete_activity/1)
+ end
+ end
+
+ def delete_activity(%ActivityExpiration{activity_id: activity_id}) do
+ with {:activity, %Activity{} = activity} <-
+ {:activity, Activity.get_by_id_with_object(activity_id)},
+ {:user, %User{} = user} <- {:user, User.get_by_ap_id(activity.object.data["actor"])} do
+ CommonAPI.delete(activity.id, user)
+ else
+ {:activity, _} ->
+ Logger.error(
+ "#{__MODULE__} Couldn't delete expired activity: not found activity ##{activity_id}"
+ )
+
+ {:user, _} ->
+ Logger.error(
+ "#{__MODULE__} Couldn't delete expired activity: not found actorof ##{activity_id}"
+ )
+ end
+ end
+end
diff --git a/lib/pleroma/workers/cron/stats_worker.ex b/lib/pleroma/workers/cron/stats_worker.ex
new file mode 100644
index 000000000..425ad41ca
--- /dev/null
+++ b/lib/pleroma/workers/cron/stats_worker.ex
@@ -0,0 +1,16 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.Cron.StatsWorker do
+ @moduledoc """
+ The worker to update peers statistics.
+ """
+
+ use Oban.Worker, queue: "background"
+
+ @impl Oban.Worker
+ def perform(_opts, _job) do
+ Pleroma.Stats.do_collect()
+ end
+end
diff --git a/lib/pleroma/workers/digest_emails_worker.ex b/lib/pleroma/workers/digest_emails_worker.ex
deleted file mode 100644
index 3e5a836d0..000000000
--- a/lib/pleroma/workers/digest_emails_worker.ex
+++ /dev/null
@@ -1,16 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Workers.DigestEmailsWorker do
- alias Pleroma.User
-
- use Pleroma.Workers.WorkerHelper, queue: "digest_emails"
-
- @impl Oban.Worker
- def perform(%{"op" => "digest_email", "user_id" => user_id}, _job) do
- user_id
- |> User.get_cached_by_id()
- |> Pleroma.Daemons.DigestEmailDaemon.perform()
- end
-end
diff --git a/lib/pleroma/workers/scheduled_activity_worker.ex b/lib/pleroma/workers/scheduled_activity_worker.ex
index ca7d53af1..bd41ab4ce 100644
--- a/lib/pleroma/workers/scheduled_activity_worker.ex
+++ b/lib/pleroma/workers/scheduled_activity_worker.ex
@@ -3,10 +3,42 @@
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.ScheduledActivityWorker do
+ @moduledoc """
+ The worker to post scheduled activity.
+ """
+
use Pleroma.Workers.WorkerHelper, queue: "scheduled_activities"
+ alias Pleroma.Config
+ alias Pleroma.ScheduledActivity
+ alias Pleroma.User
+ alias Pleroma.Web.CommonAPI
+
+ require Logger
+
@impl Oban.Worker
- def perform(%{"op" => "execute", "activity_id" => activity_id}, _job) do
- Pleroma.Daemons.ScheduledActivityDaemon.perform(:execute, activity_id)
+ def perform(%{"activity_id" => activity_id}, _job) do
+ if Config.get([ScheduledActivity, :enabled]) do
+ case Pleroma.Repo.get(ScheduledActivity, activity_id) do
+ %ScheduledActivity{} = scheduled_activity ->
+ post_activity(scheduled_activity)
+
+ _ ->
+ Logger.error("#{__MODULE__} Couldn't find scheduled activity: #{activity_id}")
+ end
+ end
+ end
+
+ defp post_activity(%ScheduledActivity{user_id: user_id, params: params} = scheduled_activity) do
+ with {:delete, {:ok, _}} <- {:delete, ScheduledActivity.delete(scheduled_activity)},
+ {:user, %User{} = user} <- {:user, User.get_cached_by_id(user_id)},
+ {:post, {:ok, _}} <- {:post, CommonAPI.post(user, params)} do
+ :ok
+ else
+ error ->
+ Logger.error(
+ "#{__MODULE__} Couldn't create a status from the scheduled activity: #{inspect(error)}"
+ )
+ end
end
end