aboutsummaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/mix/tasks/pleroma/database.ex17
-rw-r--r--lib/mix/tasks/pleroma/emoji.ex12
-rw-r--r--lib/pleroma/activity.ex7
-rw-r--r--lib/pleroma/activity_expiration.ex74
-rw-r--r--lib/pleroma/config/deprecation_warnings.ex19
-rw-r--r--lib/pleroma/config/oban.ex6
-rw-r--r--lib/pleroma/gun/conn.ex12
-rw-r--r--lib/pleroma/gun/connection_pool/worker.ex22
-rw-r--r--lib/pleroma/instances/instance.ex12
-rw-r--r--lib/pleroma/mfa/token.ex71
-rw-r--r--lib/pleroma/object/containment.ex7
-rw-r--r--lib/pleroma/reverse_proxy/client/tesla.ex2
-rw-r--r--lib/pleroma/telemetry/logger.ex18
-rw-r--r--lib/pleroma/user.ex17
-rw-r--r--lib/pleroma/web/activity_pub/activity_pub.ex63
-rw-r--r--lib/pleroma/web/activity_pub/mrf/activity_expiration_policy.ex4
-rw-r--r--lib/pleroma/web/activity_pub/side_effects.ex5
-rw-r--r--lib/pleroma/web/activity_pub/transmogrifier.ex4
-rw-r--r--lib/pleroma/web/common_api/activity_draft.ex2
-rw-r--r--lib/pleroma/web/common_api/common_api.ex5
-rw-r--r--lib/pleroma/web/mastodon_api/views/status_view.ex5
-rw-r--r--lib/pleroma/web/mastodon_api/websocket_handler.ex12
-rw-r--r--lib/pleroma/web/oauth/oauth_controller.ex9
-rw-r--r--lib/pleroma/web/oauth/token.ex25
-rw-r--r--lib/pleroma/web/oauth/token/clean_worker.ex38
-rw-r--r--lib/pleroma/web/oauth/token/query.ex6
-rw-r--r--lib/pleroma/web/oauth/token/strategy/refresh_token.ex2
-rw-r--r--lib/pleroma/web/templates/o_auth/o_auth/oob_authorization_created.html.eex2
-rw-r--r--lib/pleroma/web/templates/o_auth/o_auth/oob_token_exists.html.eex2
-rw-r--r--lib/pleroma/web/twitter_api/controllers/remote_follow_controller.ex2
-rw-r--r--lib/pleroma/workers/cron/clear_oauth_token_worker.ex23
-rw-r--r--lib/pleroma/workers/cron/purge_expired_activities_worker.ex48
-rw-r--r--lib/pleroma/workers/purge_expired_activity.ex72
-rw-r--r--lib/pleroma/workers/purge_expired_token.ex29
34 files changed, 321 insertions, 333 deletions
diff --git a/lib/mix/tasks/pleroma/database.ex b/lib/mix/tasks/pleroma/database.ex
index 7d8f00b08..7f1108dcf 100644
--- a/lib/mix/tasks/pleroma/database.ex
+++ b/lib/mix/tasks/pleroma/database.ex
@@ -133,8 +133,7 @@ defmodule Mix.Tasks.Pleroma.Database do
days = Pleroma.Config.get([:mrf_activity_expiration, :days], 365)
Pleroma.Activity
- |> join(:left, [a], u in assoc(a, :expiration))
- |> join(:inner, [a, _u], o in Object,
+ |> join(:inner, [a], o in Object,
on:
fragment(
"(?->>'id') = COALESCE((?)->'object'->> 'id', (?)->>'object')",
@@ -144,14 +143,20 @@ defmodule Mix.Tasks.Pleroma.Database do
)
)
|> where(local: true)
- |> where([a, u], is_nil(u))
|> where([a], fragment("(? ->> 'type'::text) = 'Create'", a.data))
- |> where([_a, _u, o], fragment("?->>'type' = 'Note'", o.data))
+ |> where([_a, o], fragment("?->>'type' = 'Note'", o.data))
|> Pleroma.RepoStreamer.chunk_stream(100)
|> Stream.each(fn activities ->
Enum.each(activities, fn activity ->
- expires_at = Timex.shift(activity.inserted_at, days: days)
- Pleroma.ActivityExpiration.create(activity, expires_at, false)
+ expires_at =
+ activity.inserted_at
+ |> DateTime.from_naive!("Etc/UTC")
+ |> Timex.shift(days: days)
+
+ Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
+ activity_id: activity.id,
+ expires_at: expires_at
+ })
end)
end)
|> Stream.run()
diff --git a/lib/mix/tasks/pleroma/emoji.ex b/lib/mix/tasks/pleroma/emoji.ex
index 8f52ee98d..1750373f9 100644
--- a/lib/mix/tasks/pleroma/emoji.ex
+++ b/lib/mix/tasks/pleroma/emoji.ex
@@ -183,7 +183,7 @@ defmodule Mix.Tasks.Pleroma.Emoji do
IO.puts("Downloading the pack and generating SHA256")
- binary_archive = Tesla.get!(client(), src).body
+ {:ok, %{body: binary_archive}} = Pleroma.HTTP.get(src)
archive_sha = :crypto.hash(:sha256, binary_archive) |> Base.encode16()
IO.puts("SHA256 is #{archive_sha}")
@@ -252,7 +252,7 @@ defmodule Mix.Tasks.Pleroma.Emoji do
end
defp fetch("http" <> _ = from) do
- with {:ok, %{body: body}} <- Tesla.get(client(), from) do
+ with {:ok, %{body: body}} <- Pleroma.HTTP.get(from) do
{:ok, body}
end
end
@@ -271,13 +271,5 @@ defmodule Mix.Tasks.Pleroma.Emoji do
)
end
- defp client do
- middleware = [
- {Tesla.Middleware.FollowRedirects, [max_redirects: 3]}
- ]
-
- Tesla.client(middleware)
- end
-
defp default_manifest, do: Pleroma.Config.get!([:emoji, :default_manifest])
end
diff --git a/lib/pleroma/activity.ex b/lib/pleroma/activity.ex
index 97feebeaa..17af04257 100644
--- a/lib/pleroma/activity.ex
+++ b/lib/pleroma/activity.ex
@@ -7,7 +7,6 @@ defmodule Pleroma.Activity do
alias Pleroma.Activity
alias Pleroma.Activity.Queries
- alias Pleroma.ActivityExpiration
alias Pleroma.Bookmark
alias Pleroma.Notification
alias Pleroma.Object
@@ -60,8 +59,6 @@ defmodule Pleroma.Activity do
# typical case.
has_one(:object, Object, on_delete: :nothing, foreign_key: :id)
- has_one(:expiration, ActivityExpiration, on_delete: :delete_all)
-
timestamps()
end
@@ -304,14 +301,14 @@ defmodule Pleroma.Activity do
|> Repo.all()
end
- def follow_requests_for_actor(%Pleroma.User{ap_id: ap_id}) do
+ def follow_requests_for_actor(%User{ap_id: ap_id}) do
ap_id
|> Queries.by_object_id()
|> Queries.by_type("Follow")
|> where([a], fragment("? ->> 'state' = 'pending'", a.data))
end
- def following_requests_for_actor(%Pleroma.User{ap_id: ap_id}) do
+ def following_requests_for_actor(%User{ap_id: ap_id}) do
Queries.by_type("Follow")
|> where([a], fragment("?->>'state' = 'pending'", a.data))
|> where([a], a.actor == ^ap_id)
diff --git a/lib/pleroma/activity_expiration.ex b/lib/pleroma/activity_expiration.ex
deleted file mode 100644
index 955f0578e..000000000
--- a/lib/pleroma/activity_expiration.ex
+++ /dev/null
@@ -1,74 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.ActivityExpiration do
- use Ecto.Schema
-
- alias Pleroma.Activity
- alias Pleroma.ActivityExpiration
- alias Pleroma.Repo
-
- import Ecto.Changeset
- import Ecto.Query
-
- @type t :: %__MODULE__{}
- @min_activity_lifetime :timer.hours(1)
-
- schema "activity_expirations" do
- belongs_to(:activity, Activity, type: FlakeId.Ecto.CompatType)
- field(:scheduled_at, :naive_datetime)
- end
-
- def changeset(%ActivityExpiration{} = expiration, attrs, validate_scheduled_at) do
- expiration
- |> cast(attrs, [:scheduled_at])
- |> validate_required([:scheduled_at])
- |> validate_scheduled_at(validate_scheduled_at)
- end
-
- def get_by_activity_id(activity_id) do
- ActivityExpiration
- |> where([exp], exp.activity_id == ^activity_id)
- |> Repo.one()
- end
-
- def create(%Activity{} = activity, scheduled_at, validate_scheduled_at \\ true) do
- %ActivityExpiration{activity_id: activity.id}
- |> changeset(%{scheduled_at: scheduled_at}, validate_scheduled_at)
- |> Repo.insert()
- end
-
- def due_expirations(offset \\ 0) do
- naive_datetime =
- NaiveDateTime.utc_now()
- |> NaiveDateTime.add(offset, :millisecond)
-
- ActivityExpiration
- |> where([exp], exp.scheduled_at < ^naive_datetime)
- |> limit(50)
- |> preload(:activity)
- |> Repo.all()
- |> Enum.reject(fn %{activity: activity} ->
- Activity.pinned_by_actor?(activity)
- end)
- end
-
- def validate_scheduled_at(changeset, false), do: changeset
-
- def validate_scheduled_at(changeset, true) do
- validate_change(changeset, :scheduled_at, fn _, scheduled_at ->
- if not expires_late_enough?(scheduled_at) do
- [scheduled_at: "an ephemeral activity must live for at least one hour"]
- else
- []
- end
- end)
- end
-
- def expires_late_enough?(scheduled_at) do
- now = NaiveDateTime.utc_now()
- diff = NaiveDateTime.diff(scheduled_at, now, :millisecond)
- diff > @min_activity_lifetime
- end
-end
diff --git a/lib/pleroma/config/deprecation_warnings.ex b/lib/pleroma/config/deprecation_warnings.ex
index 2bfe4ddba..412d55a77 100644
--- a/lib/pleroma/config/deprecation_warnings.ex
+++ b/lib/pleroma/config/deprecation_warnings.ex
@@ -8,7 +8,7 @@ defmodule Pleroma.Config.DeprecationWarnings do
require Logger
alias Pleroma.Config
- @type config_namespace() :: [atom()]
+ @type config_namespace() :: atom() | [atom()]
@type config_map() :: {config_namespace(), config_namespace(), String.t()}
@mrf_config_map [
@@ -57,6 +57,7 @@ defmodule Pleroma.Config.DeprecationWarnings do
check_media_proxy_whitelist_config()
check_welcome_message_config()
check_gun_pool_options()
+ check_activity_expiration_config()
end
def check_welcome_message_config do
@@ -158,4 +159,20 @@ defmodule Pleroma.Config.DeprecationWarnings do
Config.put(:pools, updated_config)
end
end
+
+ @spec check_activity_expiration_config() :: :ok | nil
+ def check_activity_expiration_config do
+ warning_preface = """
+ !!!DEPRECATION WARNING!!!
+ Your config is using old namespace for activity expiration configuration. Setting should work for now, but you are advised to change to new namespace to prevent possible issues later:
+ """
+
+ move_namespace_and_warn(
+ [
+ {Pleroma.ActivityExpiration, Pleroma.Workers.PurgeExpiredActivity,
+ "\n* `config :pleroma, Pleroma.ActivityExpiration` is now `config :pleroma, Pleroma.Workers.PurgeExpiredActivity`"}
+ ],
+ warning_preface
+ )
+ end
end
diff --git a/lib/pleroma/config/oban.ex b/lib/pleroma/config/oban.ex
index c2d56ebab..9f601b1a3 100644
--- a/lib/pleroma/config/oban.ex
+++ b/lib/pleroma/config/oban.ex
@@ -5,7 +5,11 @@ defmodule Pleroma.Config.Oban do
oban_config = Pleroma.Config.get(Oban)
crontab =
- [Pleroma.Workers.Cron.StatsWorker]
+ [
+ Pleroma.Workers.Cron.StatsWorker,
+ Pleroma.Workers.Cron.PurgeExpiredActivitiesWorker,
+ Pleroma.Workers.Cron.ClearOauthTokenWorker
+ ]
|> Enum.reduce(oban_config[:crontab], fn removed_worker, acc ->
with acc when is_list(acc) <- acc,
setting when is_tuple(setting) <-
diff --git a/lib/pleroma/gun/conn.ex b/lib/pleroma/gun/conn.ex
index 75b1ffc0a..477e19c6e 100644
--- a/lib/pleroma/gun/conn.ex
+++ b/lib/pleroma/gun/conn.ex
@@ -50,10 +50,10 @@ defmodule Pleroma.Gun.Conn do
with open_opts <- Map.delete(opts, :tls_opts),
{:ok, conn} <- Gun.open(proxy_host, proxy_port, open_opts),
- {:ok, _} <- Gun.await_up(conn, opts[:connect_timeout]),
+ {:ok, protocol} <- Gun.await_up(conn, opts[:connect_timeout]),
stream <- Gun.connect(conn, connect_opts),
{:response, :fin, 200, _} <- Gun.await(conn, stream) do
- {:ok, conn}
+ {:ok, conn, protocol}
else
error ->
Logger.warn(
@@ -88,8 +88,8 @@ defmodule Pleroma.Gun.Conn do
|> Map.put(:socks_opts, socks_opts)
with {:ok, conn} <- Gun.open(proxy_host, proxy_port, opts),
- {:ok, _} <- Gun.await_up(conn, opts[:connect_timeout]) do
- {:ok, conn}
+ {:ok, protocol} <- Gun.await_up(conn, opts[:connect_timeout]) do
+ {:ok, conn, protocol}
else
error ->
Logger.warn(
@@ -106,8 +106,8 @@ defmodule Pleroma.Gun.Conn do
host = Pleroma.HTTP.AdapterHelper.parse_host(host)
with {:ok, conn} <- Gun.open(host, port, opts),
- {:ok, _} <- Gun.await_up(conn, opts[:connect_timeout]) do
- {:ok, conn}
+ {:ok, protocol} <- Gun.await_up(conn, opts[:connect_timeout]) do
+ {:ok, conn, protocol}
else
error ->
Logger.warn(
diff --git a/lib/pleroma/gun/connection_pool/worker.ex b/lib/pleroma/gun/connection_pool/worker.ex
index c36332817..49d41e4c7 100644
--- a/lib/pleroma/gun/connection_pool/worker.ex
+++ b/lib/pleroma/gun/connection_pool/worker.ex
@@ -15,7 +15,7 @@ defmodule Pleroma.Gun.ConnectionPool.Worker do
@impl true
def handle_continue({:connect, [key, uri, opts, client_pid]}, _) do
- with {:ok, conn_pid} <- Gun.Conn.open(uri, opts),
+ with {:ok, conn_pid, protocol} <- Gun.Conn.open(uri, opts),
Process.link(conn_pid) do
time = :erlang.monotonic_time(:millisecond)
@@ -27,8 +27,12 @@ defmodule Pleroma.Gun.ConnectionPool.Worker do
send(client_pid, {:conn_pid, conn_pid})
{:noreply,
- %{key: key, timer: nil, client_monitors: %{client_pid => Process.monitor(client_pid)}},
- :hibernate}
+ %{
+ key: key,
+ timer: nil,
+ client_monitors: %{client_pid => Process.monitor(client_pid)},
+ protocol: protocol
+ }, :hibernate}
else
err ->
{:stop, {:shutdown, err}, nil}
@@ -53,14 +57,20 @@ defmodule Pleroma.Gun.ConnectionPool.Worker do
end
@impl true
- def handle_call(:add_client, {client_pid, _}, %{key: key} = state) do
+ def handle_call(:add_client, {client_pid, _}, %{key: key, protocol: protocol} = state) do
time = :erlang.monotonic_time(:millisecond)
- {{conn_pid, _, _, _}, _} =
+ {{conn_pid, used_by, _, _}, _} =
Registry.update_value(@registry, key, fn {conn_pid, used_by, crf, last_reference} ->
{conn_pid, [client_pid | used_by], crf(time - last_reference, crf), time}
end)
+ :telemetry.execute(
+ [:pleroma, :connection_pool, :client, :add],
+ %{client_pid: client_pid, clients: used_by},
+ %{key: state.key, protocol: protocol}
+ )
+
state =
if state.timer != nil do
Process.cancel_timer(state[:timer])
@@ -131,7 +141,7 @@ defmodule Pleroma.Gun.ConnectionPool.Worker do
@impl true
def handle_info({:DOWN, _ref, :process, pid, reason}, state) do
:telemetry.execute(
- [:pleroma, :connection_pool, :client_death],
+ [:pleroma, :connection_pool, :client, :dead],
%{client_pid: pid, reason: reason},
%{key: state.key}
)
diff --git a/lib/pleroma/instances/instance.ex b/lib/pleroma/instances/instance.ex
index ad7764f05..f0f601469 100644
--- a/lib/pleroma/instances/instance.ex
+++ b/lib/pleroma/instances/instance.ex
@@ -157,13 +157,11 @@ defmodule Pleroma.Instances.Instance do
try do
with {:ok, %Tesla.Env{body: html}} <-
Pleroma.HTTP.get(to_string(instance_uri), [{"accept", "text/html"}], pool: :media),
- favicon_rel <-
- html
- |> Floki.parse_document!()
- |> Floki.attribute("link[rel=icon]", "href")
- |> List.first(),
- favicon <- URI.merge(instance_uri, favicon_rel) |> to_string(),
- true <- is_binary(favicon) do
+ {_, [favicon_rel | _]} when is_binary(favicon_rel) <-
+ {:parse,
+ html |> Floki.parse_document!() |> Floki.attribute("link[rel=icon]", "href")},
+ {_, favicon} when is_binary(favicon) <-
+ {:merge, URI.merge(instance_uri, favicon_rel) |> to_string()} do
favicon
else
_ -> nil
diff --git a/lib/pleroma/mfa/token.ex b/lib/pleroma/mfa/token.ex
index 0b2449971..69b64c0e8 100644
--- a/lib/pleroma/mfa/token.ex
+++ b/lib/pleroma/mfa/token.ex
@@ -10,10 +10,11 @@ defmodule Pleroma.MFA.Token do
alias Pleroma.Repo
alias Pleroma.User
alias Pleroma.Web.OAuth.Authorization
- alias Pleroma.Web.OAuth.Token, as: OAuthToken
@expires 300
+ @type t() :: %__MODULE__{}
+
schema "mfa_tokens" do
field(:token, :string)
field(:valid_until, :naive_datetime_usec)
@@ -24,6 +25,7 @@ defmodule Pleroma.MFA.Token do
timestamps()
end
+ @spec get_by_token(String.t()) :: {:ok, t()} | {:error, :not_found}
def get_by_token(token) do
from(
t in __MODULE__,
@@ -33,33 +35,40 @@ defmodule Pleroma.MFA.Token do
|> Repo.find_resource()
end
- def validate(token) do
- with {:fetch_token, {:ok, token}} <- {:fetch_token, get_by_token(token)},
- {:expired, false} <- {:expired, is_expired?(token)} do
+ @spec validate(String.t()) :: {:ok, t()} | {:error, :not_found} | {:error, :expired_token}
+ def validate(token_str) do
+ with {:ok, token} <- get_by_token(token_str),
+ false <- expired?(token) do
{:ok, token}
- else
- {:expired, _} -> {:error, :expired_token}
- {:fetch_token, _} -> {:error, :not_found}
- error -> {:error, error}
end
end
- def create_token(%User{} = user) do
- %__MODULE__{}
- |> change
- |> assign_user(user)
- |> put_token
- |> put_valid_until
- |> Repo.insert()
+ defp expired?(%__MODULE__{valid_until: valid_until}) do
+ with true <- NaiveDateTime.diff(NaiveDateTime.utc_now(), valid_until) > 0 do
+ {:error, :expired_token}
+ end
+ end
+
+ @spec create(User.t(), Authorization.t() | nil) :: {:ok, t()} | {:error, Ecto.Changeset.t()}
+ def create(user, authorization \\ nil) do
+ with {:ok, token} <- do_create(user, authorization) do
+ Pleroma.Workers.PurgeExpiredToken.enqueue(%{
+ token_id: token.id,
+ valid_until: DateTime.from_naive!(token.valid_until, "Etc/UTC"),
+ mod: __MODULE__
+ })
+
+ {:ok, token}
+ end
end
- def create_token(user, authorization) do
+ defp do_create(user, authorization) do
%__MODULE__{}
- |> change
+ |> change()
|> assign_user(user)
- |> assign_authorization(authorization)
- |> put_token
- |> put_valid_until
+ |> maybe_assign_authorization(authorization)
+ |> put_token()
+ |> put_valid_until()
|> Repo.insert()
end
@@ -69,15 +78,19 @@ defmodule Pleroma.MFA.Token do
|> validate_required([:user])
end
- defp assign_authorization(changeset, authorization) do
+ defp maybe_assign_authorization(changeset, %Authorization{} = authorization) do
changeset
|> put_assoc(:authorization, authorization)
|> validate_required([:authorization])
end
+ defp maybe_assign_authorization(changeset, _), do: changeset
+
defp put_token(changeset) do
+ token = Pleroma.Web.OAuth.Token.Utils.generate_token()
+
changeset
- |> change(%{token: OAuthToken.Utils.generate_token()})
+ |> change(%{token: token})
|> validate_required([:token])
|> unique_constraint(:token)
end
@@ -89,18 +102,4 @@ defmodule Pleroma.MFA.Token do
|> change(%{valid_until: expires_in})
|> validate_required([:valid_until])
end
-
- def is_expired?(%__MODULE__{valid_until: valid_until}) do
- NaiveDateTime.diff(NaiveDateTime.utc_now(), valid_until) > 0
- end
-
- def is_expired?(_), do: false
-
- def delete_expired_tokens do
- from(
- q in __MODULE__,
- where: fragment("?", q.valid_until) < ^Timex.now()
- )
- |> Repo.delete_all()
- end
end
diff --git a/lib/pleroma/object/containment.ex b/lib/pleroma/object/containment.ex
index bc88e8a0c..29cb3d672 100644
--- a/lib/pleroma/object/containment.ex
+++ b/lib/pleroma/object/containment.ex
@@ -44,13 +44,6 @@ defmodule Pleroma.Object.Containment do
nil
end
- # TODO: We explicitly allow 'tag' URIs through, due to references to legacy OStatus
- # objects being present in the test suite environment. Once these objects are
- # removed, please also remove this.
- if Mix.env() == :test do
- defp compare_uris(_, %URI{scheme: "tag"}), do: :ok
- end
-
defp compare_uris(%URI{host: host} = _id_uri, %URI{host: host} = _other_uri), do: :ok
defp compare_uris(_id_uri, _other_uri), do: :error
diff --git a/lib/pleroma/reverse_proxy/client/tesla.ex b/lib/pleroma/reverse_proxy/client/tesla.ex
index d5a339681..4b118eec2 100644
--- a/lib/pleroma/reverse_proxy/client/tesla.ex
+++ b/lib/pleroma/reverse_proxy/client/tesla.ex
@@ -28,7 +28,7 @@ defmodule Pleroma.ReverseProxy.Client.Tesla do
url,
body,
headers,
- Keyword.put(opts, :adapter, opts)
+ opts
) do
if is_map(response.body) and method != :head do
{:ok, response.status, response.headers, response.body}
diff --git a/lib/pleroma/telemetry/logger.ex b/lib/pleroma/telemetry/logger.ex
index 4cacae02f..197b1d091 100644
--- a/lib/pleroma/telemetry/logger.ex
+++ b/lib/pleroma/telemetry/logger.ex
@@ -7,7 +7,8 @@ defmodule Pleroma.Telemetry.Logger do
[:pleroma, :connection_pool, :reclaim, :start],
[:pleroma, :connection_pool, :reclaim, :stop],
[:pleroma, :connection_pool, :provision_failure],
- [:pleroma, :connection_pool, :client_death]
+ [:pleroma, :connection_pool, :client, :dead],
+ [:pleroma, :connection_pool, :client, :add]
]
def attach do
:telemetry.attach_many("pleroma-logger", @events, &handle_event/4, [])
@@ -62,7 +63,7 @@ defmodule Pleroma.Telemetry.Logger do
end
def handle_event(
- [:pleroma, :connection_pool, :client_death],
+ [:pleroma, :connection_pool, :client, :dead],
%{client_pid: client_pid, reason: reason},
%{key: key},
_
@@ -73,4 +74,17 @@ defmodule Pleroma.Telemetry.Logger do
}"
end)
end
+
+ def handle_event(
+ [:pleroma, :connection_pool, :client, :add],
+ %{clients: [_, _ | _] = clients},
+ %{key: key, protocol: :http},
+ _
+ ) do
+ Logger.info(fn ->
+ "Pool worker for #{key}: #{length(clients)} clients are using an HTTP1 connection at the same time, head-of-line blocking might occur."
+ end)
+ end
+
+ def handle_event([:pleroma, :connection_pool, :client, :add], _, _, _), do: :ok
end
diff --git a/lib/pleroma/user.ex b/lib/pleroma/user.ex
index f323fc6ed..e73d19964 100644
--- a/lib/pleroma/user.ex
+++ b/lib/pleroma/user.ex
@@ -2315,6 +2315,11 @@ defmodule Pleroma.User do
max_pinned_statuses = Config.get([:instance, :max_pinned_statuses], 0)
params = %{pinned_activities: user.pinned_activities ++ [id]}
+ # if pinned activity was scheduled for deletion, we remove job
+ if expiration = Pleroma.Workers.PurgeExpiredActivity.get_expiration(id) do
+ Oban.cancel_job(expiration.id)
+ end
+
user
|> cast(params, [:pinned_activities])
|> validate_length(:pinned_activities,
@@ -2327,9 +2332,19 @@ defmodule Pleroma.User do
|> update_and_set_cache()
end
- def remove_pinnned_activity(user, %Pleroma.Activity{id: id}) do
+ def remove_pinnned_activity(user, %Pleroma.Activity{id: id, data: data}) do
params = %{pinned_activities: List.delete(user.pinned_activities, id)}
+ # if pinned activity was scheduled for deletion, we reschedule it for deletion
+ if data["expires_at"] do
+ {:ok, expires_at, _} = DateTime.from_iso8601(data["expires_at"])
+
+ Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
+ activity_id: id,
+ expires_at: expires_at
+ })
+ end
+
user
|> cast(params, [:pinned_activities])
|> update_and_set_cache()
diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex
index 333621413..66a9f78a3 100644
--- a/lib/pleroma/web/activity_pub/activity_pub.ex
+++ b/lib/pleroma/web/activity_pub/activity_pub.ex
@@ -5,7 +5,6 @@
defmodule Pleroma.Web.ActivityPub.ActivityPub do
alias Pleroma.Activity
alias Pleroma.Activity.Ir.Topics
- alias Pleroma.ActivityExpiration
alias Pleroma.Config
alias Pleroma.Constants
alias Pleroma.Conversation
@@ -102,7 +101,9 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
local: local,
recipients: recipients,
actor: object["actor"]
- }) do
+ }),
+ # TODO: add tests for expired activities, when Note type will be supported in new pipeline
+ {:ok, _} <- maybe_create_activity_expiration(activity) do
{:ok, activity, meta}
end
end
@@ -111,23 +112,14 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
with nil <- Activity.normalize(map),
map <- lazy_put_activity_defaults(map, fake),
- true <- bypass_actor_check || check_actor_is_active(map["actor"]),
- {_, true} <- {:remote_limit_error, check_remote_limit(map)},
+ {_, true} <- {:actor_check, bypass_actor_check || check_actor_is_active(map["actor"])},
+ {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
{:ok, map} <- MRF.filter(map),
{recipients, _, _} = get_recipients(map),
{:fake, false, map, recipients} <- {:fake, fake, map, recipients},
{:containment, :ok} <- {:containment, Containment.contain_child(map)},
- {:ok, map, object} <- insert_full_object(map) do
- {:ok, activity} =
- %Activity{
- data: map,
- local: local,
- actor: map["actor"],
- recipients: recipients
- }
- |> Repo.insert()
- |> maybe_create_activity_expiration()
-
+ {:ok, map, object} <- insert_full_object(map),
+ {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
# Splice in the child object if we have one.
activity = Maps.put_if_present(activity, :object, object)
@@ -138,6 +130,15 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
%Activity{} = activity ->
{:ok, activity}
+ {:actor_check, _} ->
+ {:error, false}
+
+ {:containment, _} = error ->
+ error
+
+ {:error, _} = error ->
+ error
+
{:fake, true, map, recipients} ->
activity = %Activity{
data: map,
@@ -150,8 +151,24 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
{:ok, activity}
- error ->
- {:error, error}
+ {:remote_limit_pass, _} ->
+ {:error, :remote_limit}
+
+ {:reject, reason} ->
+ {:error, reason}
+ end
+ end
+
+ defp insert_activity_with_expiration(data, local, recipients) do
+ struct = %Activity{
+ data: data,
+ local: local,
+ actor: data["actor"],
+ recipients: recipients
+ }
+
+ with {:ok, activity} <- Repo.insert(struct) do
+ maybe_create_activity_expiration(activity)
end
end
@@ -164,13 +181,19 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
stream_out_participations(participations)
end
- defp maybe_create_activity_expiration({:ok, %{data: %{"expires_at" => expires_at}} = activity}) do
- with {:ok, _} <- ActivityExpiration.create(activity, expires_at) do
+ defp maybe_create_activity_expiration(
+ %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
+ ) do
+ with {:ok, _job} <-
+ Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
+ activity_id: activity.id,
+ expires_at: expires_at
+ }) do
{:ok, activity}
end
end
- defp maybe_create_activity_expiration(result), do: result
+ defp maybe_create_activity_expiration(activity), do: {:ok, activity}
defp create_or_bump_conversation(activity, actor) do
with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
diff --git a/lib/pleroma/web/activity_pub/mrf/activity_expiration_policy.ex b/lib/pleroma/web/activity_pub/mrf/activity_expiration_policy.ex
index 7b4c78e0f..bee47b4ed 100644
--- a/lib/pleroma/web/activity_pub/mrf/activity_expiration_policy.ex
+++ b/lib/pleroma/web/activity_pub/mrf/activity_expiration_policy.ex
@@ -31,10 +31,10 @@ defmodule Pleroma.Web.ActivityPub.MRF.ActivityExpirationPolicy do
defp maybe_add_expiration(activity) do
days = Pleroma.Config.get([:mrf_activity_expiration, :days], 365)
- expires_at = NaiveDateTime.utc_now() |> Timex.shift(days: days)
+ expires_at = DateTime.utc_now() |> Timex.shift(days: days)
with %{"expires_at" => existing_expires_at} <- activity,
- :lt <- NaiveDateTime.compare(existing_expires_at, expires_at) do
+ :lt <- DateTime.compare(existing_expires_at, expires_at) do
activity
else
_ -> Map.put(activity, "expires_at", expires_at)
diff --git a/lib/pleroma/web/activity_pub/side_effects.ex b/lib/pleroma/web/activity_pub/side_effects.ex
index a5e2323bd..46a8be767 100644
--- a/lib/pleroma/web/activity_pub/side_effects.ex
+++ b/lib/pleroma/web/activity_pub/side_effects.ex
@@ -7,7 +7,6 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
"""
alias Pleroma.Activity
alias Pleroma.Activity.Ir.Topics
- alias Pleroma.ActivityExpiration
alias Pleroma.Chat
alias Pleroma.Chat.MessageReference
alias Pleroma.FollowingRelationship
@@ -188,10 +187,6 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
Object.increase_replies_count(in_reply_to)
end
- if expires_at = activity.data["expires_at"] do
- ActivityExpiration.create(activity, expires_at)
- end
-
BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
meta =
diff --git a/lib/pleroma/web/activity_pub/transmogrifier.ex b/lib/pleroma/web/activity_pub/transmogrifier.ex
index 0831efadc..af4384213 100644
--- a/lib/pleroma/web/activity_pub/transmogrifier.ex
+++ b/lib/pleroma/web/activity_pub/transmogrifier.ex
@@ -168,7 +168,6 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do
def fix_in_reply_to(%{"inReplyTo" => in_reply_to} = object, options)
when not is_nil(in_reply_to) do
in_reply_to_id = prepare_in_reply_to(in_reply_to)
- object = Map.put(object, "inReplyToAtomUri", in_reply_to_id)
depth = (options[:depth] || 0) + 1
if Federator.allowed_thread_distance?(depth) do
@@ -176,9 +175,8 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do
%Activity{} <- Activity.get_create_by_object_ap_id(replied_object.data["id"]) do
object
|> Map.put("inReplyTo", replied_object.data["id"])
- |> Map.put("inReplyToAtomUri", object["inReplyToAtomUri"] || in_reply_to_id)
|> Map.put("context", replied_object.data["context"] || object["conversation"])
- |> Map.drop(["conversation"])
+ |> Map.drop(["conversation", "inReplyToAtomUri"])
else
e ->
Logger.warn("Couldn't fetch #{inspect(in_reply_to_id)}, error: #{inspect(e)}")
diff --git a/lib/pleroma/web/common_api/activity_draft.ex b/lib/pleroma/web/common_api/activity_draft.ex
index f849b2e01..548f76609 100644
--- a/lib/pleroma/web/common_api/activity_draft.ex
+++ b/lib/pleroma/web/common_api/activity_draft.ex
@@ -202,7 +202,7 @@ defmodule Pleroma.Web.CommonAPI.ActivityDraft do
additional =
case draft.expires_at do
- %NaiveDateTime{} = expires_at -> Map.put(additional, "expires_at", expires_at)
+ %DateTime{} = expires_at -> Map.put(additional, "expires_at", expires_at)
_ -> additional
end
diff --git a/lib/pleroma/web/common_api/common_api.ex b/lib/pleroma/web/common_api/common_api.ex
index 4ab533658..500c3883e 100644
--- a/lib/pleroma/web/common_api/common_api.ex
+++ b/lib/pleroma/web/common_api/common_api.ex
@@ -4,7 +4,6 @@
defmodule Pleroma.Web.CommonAPI do
alias Pleroma.Activity
- alias Pleroma.ActivityExpiration
alias Pleroma.Conversation.Participation
alias Pleroma.Formatter
alias Pleroma.Object
@@ -381,9 +380,9 @@ defmodule Pleroma.Web.CommonAPI do
def check_expiry_date({:ok, nil} = res), do: res
def check_expiry_date({:ok, in_seconds}) do
- expiry = NaiveDateTime.utc_now() |> NaiveDateTime.add(in_seconds)
+ expiry = DateTime.add(DateTime.utc_now(), in_seconds)
- if ActivityExpiration.expires_late_enough?(expiry) do
+ if Pleroma.Workers.PurgeExpiredActivity.expires_late_enough?(expiry) do
{:ok, expiry}
else
{:error, "Expiry date is too soon"}
diff --git a/lib/pleroma/web/mastodon_api/views/status_view.ex b/lib/pleroma/web/mastodon_api/views/status_view.ex
index 0882d680e..4f7f27f35 100644
--- a/lib/pleroma/web/mastodon_api/views/status_view.ex
+++ b/lib/pleroma/web/mastodon_api/views/status_view.ex
@@ -8,7 +8,6 @@ defmodule Pleroma.Web.MastodonAPI.StatusView do
require Pleroma.Constants
alias Pleroma.Activity
- alias Pleroma.ActivityExpiration
alias Pleroma.HTML
alias Pleroma.Object
alias Pleroma.Repo
@@ -245,8 +244,8 @@ defmodule Pleroma.Web.MastodonAPI.StatusView do
expires_at =
with true <- client_posted_this_activity,
- %ActivityExpiration{scheduled_at: scheduled_at} <-
- ActivityExpiration.get_by_activity_id(activity.id) do
+ %Oban.Job{scheduled_at: scheduled_at} <-
+ Pleroma.Workers.PurgeExpiredActivity.get_expiration(activity.id) do
scheduled_at
else
_ -> nil
diff --git a/lib/pleroma/web/mastodon_api/websocket_handler.ex b/lib/pleroma/web/mastodon_api/websocket_handler.ex
index 94e4595d8..cf923ded8 100644
--- a/lib/pleroma/web/mastodon_api/websocket_handler.ex
+++ b/lib/pleroma/web/mastodon_api/websocket_handler.ex
@@ -37,12 +37,12 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
else
{:error, :bad_topic} ->
Logger.debug("#{__MODULE__} bad topic #{inspect(req)}")
- {:ok, req} = :cowboy_req.reply(404, req)
+ req = :cowboy_req.reply(404, req)
{:ok, req, state}
{:error, :unauthorized} ->
Logger.debug("#{__MODULE__} authentication error: #{inspect(req)}")
- {:ok, req} = :cowboy_req.reply(401, req)
+ req = :cowboy_req.reply(401, req)
{:ok, req, state}
end
end
@@ -64,7 +64,9 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
{:ok, %{state | timer: timer()}}
end
- # We never receive messages.
+ # We only receive pings for now
+ def websocket_handle(:ping, state), do: {:ok, state}
+
def websocket_handle(frame, state) do
Logger.error("#{__MODULE__} received frame: #{inspect(frame)}")
{:ok, state}
@@ -98,6 +100,10 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
{:reply, :ping, %{state | timer: nil, count: 0}, :hibernate}
end
+ # State can be `[]` only in case we terminate before switching to websocket,
+ # we already log errors for these cases in `init/1`, so just do nothing here
+ def terminate(_reason, _req, []), do: :ok
+
def terminate(reason, _req, state) do
Logger.debug(
"#{__MODULE__} terminating websocket connection for user #{
diff --git a/lib/pleroma/web/oauth/oauth_controller.ex b/lib/pleroma/web/oauth/oauth_controller.ex
index dd00600ea..26e68be42 100644
--- a/lib/pleroma/web/oauth/oauth_controller.ex
+++ b/lib/pleroma/web/oauth/oauth_controller.ex
@@ -145,7 +145,10 @@ defmodule Pleroma.Web.OAuth.OAuthController do
def after_create_authorization(%Plug.Conn{} = conn, %Authorization{} = auth, %{
"authorization" => %{"redirect_uri" => @oob_token_redirect_uri}
}) do
- render(conn, "oob_authorization_created.html", %{auth: auth})
+ # Enforcing the view to reuse the template when calling from other controllers
+ conn
+ |> put_view(OAuthView)
+ |> render("oob_authorization_created.html", %{auth: auth})
end
def after_create_authorization(%Plug.Conn{} = conn, %Authorization{} = auth, %{
@@ -197,7 +200,7 @@ defmodule Pleroma.Web.OAuth.OAuthController do
{:mfa_required, user, auth, _},
params
) do
- {:ok, token} = MFA.Token.create_token(user, auth)
+ {:ok, token} = MFA.Token.create(user, auth)
data = %{
"mfa_token" => token.token,
@@ -579,7 +582,7 @@ defmodule Pleroma.Web.OAuth.OAuthController do
do: put_session(conn, :registration_id, registration_id)
defp build_and_response_mfa_token(user, auth) do
- with {:ok, token} <- MFA.Token.create_token(user, auth) do
+ with {:ok, token} <- MFA.Token.create(user, auth) do
MFAView.render("mfa_response.json", %{token: token, user: user})
end
end
diff --git a/lib/pleroma/web/oauth/token.ex b/lib/pleroma/web/oauth/token.ex
index 08bb7326d..de37998f2 100644
--- a/lib/pleroma/web/oauth/token.ex
+++ b/lib/pleroma/web/oauth/token.ex
@@ -50,7 +50,7 @@ defmodule Pleroma.Web.OAuth.Token do
true <- auth.app_id == app.id do
user = if auth.user_id, do: User.get_cached_by_id(auth.user_id), else: %User{}
- create_token(
+ create(
app,
user,
%{scopes: auth.scopes}
@@ -83,8 +83,22 @@ defmodule Pleroma.Web.OAuth.Token do
|> validate_required([:valid_until])
end
- @spec create_token(App.t(), User.t(), map()) :: {:ok, Token} | {:error, Changeset.t()}
- def create_token(%App{} = app, %User{} = user, attrs \\ %{}) do
+ @spec create(App.t(), User.t(), map()) :: {:ok, Token} | {:error, Changeset.t()}
+ def create(%App{} = app, %User{} = user, attrs \\ %{}) do
+ with {:ok, token} <- do_create(app, user, attrs) do
+ if Pleroma.Config.get([:oauth2, :clean_expired_tokens]) do
+ Pleroma.Workers.PurgeExpiredToken.enqueue(%{
+ token_id: token.id,
+ valid_until: DateTime.from_naive!(token.valid_until, "Etc/UTC"),
+ mod: __MODULE__
+ })
+ end
+
+ {:ok, token}
+ end
+ end
+
+ defp do_create(app, user, attrs) do
%__MODULE__{user_id: user.id, app_id: app.id}
|> cast(%{scopes: attrs[:scopes] || app.scopes}, [:scopes])
|> validate_required([:scopes, :app_id])
@@ -105,11 +119,6 @@ defmodule Pleroma.Web.OAuth.Token do
|> Repo.delete_all()
end
- def delete_expired_tokens do
- Query.get_expired_tokens()
- |> Repo.delete_all()
- end
-
def get_user_tokens(%User{id: user_id}) do
Query.get_by_user(user_id)
|> Query.preload([:app])
diff --git a/lib/pleroma/web/oauth/token/clean_worker.ex b/lib/pleroma/web/oauth/token/clean_worker.ex
deleted file mode 100644
index e3aa4eb7e..000000000
--- a/lib/pleroma/web/oauth/token/clean_worker.ex
+++ /dev/null
@@ -1,38 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Web.OAuth.Token.CleanWorker do
- @moduledoc """
- The module represents functions to clean an expired OAuth and MFA tokens.
- """
- use GenServer
-
- @ten_seconds 10_000
- @one_day 86_400_000
-
- alias Pleroma.MFA
- alias Pleroma.Web.OAuth
- alias Pleroma.Workers.BackgroundWorker
-
- def start_link(_), do: GenServer.start_link(__MODULE__, %{})
-
- def init(_) do
- Process.send_after(self(), :perform, @ten_seconds)
- {:ok, nil}
- end
-
- @doc false
- def handle_info(:perform, state) do
- BackgroundWorker.enqueue("clean_expired_tokens", %{})
- interval = Pleroma.Config.get([:oauth2, :clean_expired_tokens_interval], @one_day)
-
- Process.send_after(self(), :perform, interval)
- {:noreply, state}
- end
-
- def perform(:clean) do
- OAuth.Token.delete_expired_tokens()
- MFA.Token.delete_expired_tokens()
- end
-end
diff --git a/lib/pleroma/web/oauth/token/query.ex b/lib/pleroma/web/oauth/token/query.ex
index 93d6e26ed..fd6d9b112 100644
--- a/lib/pleroma/web/oauth/token/query.ex
+++ b/lib/pleroma/web/oauth/token/query.ex
@@ -33,12 +33,6 @@ defmodule Pleroma.Web.OAuth.Token.Query do
from(q in query, where: q.id == ^id)
end
- @spec get_expired_tokens(query, DateTime.t() | nil) :: query
- def get_expired_tokens(query \\ Token, date \\ nil) do
- expired_date = date || Timex.now()
- from(q in query, where: fragment("?", q.valid_until) < ^expired_date)
- end
-
@spec get_by_user(query, String.t()) :: query
def get_by_user(query \\ Token, user_id) do
from(q in query, where: q.user_id == ^user_id)
diff --git a/lib/pleroma/web/oauth/token/strategy/refresh_token.ex b/lib/pleroma/web/oauth/token/strategy/refresh_token.ex
index debc29b0b..625b0fde2 100644
--- a/lib/pleroma/web/oauth/token/strategy/refresh_token.ex
+++ b/lib/pleroma/web/oauth/token/strategy/refresh_token.ex
@@ -46,7 +46,7 @@ defmodule Pleroma.Web.OAuth.Token.Strategy.RefreshToken do
defp create_access_token({:error, error}, _), do: {:error, error}
defp create_access_token({:ok, token}, %{app: app, user: user} = token_params) do
- Token.create_token(app, user, add_refresh_token(token_params, token.refresh_token))
+ Token.create(app, user, add_refresh_token(token_params, token.refresh_token))
end
defp add_refresh_token(params, token) do
diff --git a/lib/pleroma/web/templates/o_auth/o_auth/oob_authorization_created.html.eex b/lib/pleroma/web/templates/o_auth/o_auth/oob_authorization_created.html.eex
index 8443d906b..ffabe29a6 100644
--- a/lib/pleroma/web/templates/o_auth/o_auth/oob_authorization_created.html.eex
+++ b/lib/pleroma/web/templates/o_auth/o_auth/oob_authorization_created.html.eex
@@ -1,2 +1,2 @@
<h1>Successfully authorized</h1>
-<h2>Token code is <%= @auth.token %></h2>
+<h2>Token code is <br><%= @auth.token %></h2>
diff --git a/lib/pleroma/web/templates/o_auth/o_auth/oob_token_exists.html.eex b/lib/pleroma/web/templates/o_auth/o_auth/oob_token_exists.html.eex
index 961aad976..82785c4b9 100644
--- a/lib/pleroma/web/templates/o_auth/o_auth/oob_token_exists.html.eex
+++ b/lib/pleroma/web/templates/o_auth/o_auth/oob_token_exists.html.eex
@@ -1,2 +1,2 @@
<h1>Authorization exists</h1>
-<h2>Access token is <%= @token.token %></h2>
+<h2>Access token is <br><%= @token.token %></h2>
diff --git a/lib/pleroma/web/twitter_api/controllers/remote_follow_controller.ex b/lib/pleroma/web/twitter_api/controllers/remote_follow_controller.ex
index 521dc9322..072d889e2 100644
--- a/lib/pleroma/web/twitter_api/controllers/remote_follow_controller.ex
+++ b/lib/pleroma/web/twitter_api/controllers/remote_follow_controller.ex
@@ -135,7 +135,7 @@ defmodule Pleroma.Web.TwitterAPI.RemoteFollowController do
end
defp handle_follow_error(conn, {:mfa_required, followee, user, _} = _) do
- {:ok, %{token: token}} = MFA.Token.create_token(user)
+ {:ok, %{token: token}} = MFA.Token.create(user)
render(conn, "follow_mfa.html", %{followee: followee, mfa_token: token, error: false})
end
diff --git a/lib/pleroma/workers/cron/clear_oauth_token_worker.ex b/lib/pleroma/workers/cron/clear_oauth_token_worker.ex
deleted file mode 100644
index 276f47efc..000000000
--- a/lib/pleroma/workers/cron/clear_oauth_token_worker.ex
+++ /dev/null
@@ -1,23 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Workers.Cron.ClearOauthTokenWorker do
- @moduledoc """
- The worker to cleanup expired oAuth tokens.
- """
-
- use Oban.Worker, queue: "background"
-
- alias Pleroma.Config
- alias Pleroma.Web.OAuth.Token
-
- @impl Oban.Worker
- def perform(_job) do
- if Config.get([:oauth2, :clean_expired_tokens], false) do
- Token.delete_expired_tokens()
- end
-
- :ok
- end
-end
diff --git a/lib/pleroma/workers/cron/purge_expired_activities_worker.ex b/lib/pleroma/workers/cron/purge_expired_activities_worker.ex
deleted file mode 100644
index 6549207fc..000000000
--- a/lib/pleroma/workers/cron/purge_expired_activities_worker.ex
+++ /dev/null
@@ -1,48 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Workers.Cron.PurgeExpiredActivitiesWorker do
- @moduledoc """
- The worker to purge expired activities.
- """
-
- use Oban.Worker, queue: "activity_expiration"
-
- alias Pleroma.Activity
- alias Pleroma.ActivityExpiration
- alias Pleroma.Config
- alias Pleroma.User
- alias Pleroma.Web.CommonAPI
-
- require Logger
-
- @interval :timer.minutes(1)
-
- @impl Oban.Worker
- def perform(_job) do
- if Config.get([ActivityExpiration, :enabled]) do
- Enum.each(ActivityExpiration.due_expirations(@interval), &delete_activity/1)
- end
- after
- :ok
- end
-
- def delete_activity(%ActivityExpiration{activity_id: activity_id}) do
- with {:activity, %Activity{} = activity} <-
- {:activity, Activity.get_by_id_with_object(activity_id)},
- {:user, %User{} = user} <- {:user, User.get_by_ap_id(activity.object.data["actor"])} do
- CommonAPI.delete(activity.id, user)
- else
- {:activity, _} ->
- Logger.error(
- "#{__MODULE__} Couldn't delete expired activity: not found activity ##{activity_id}"
- )
-
- {:user, _} ->
- Logger.error(
- "#{__MODULE__} Couldn't delete expired activity: not found actor of ##{activity_id}"
- )
- end
- end
-end
diff --git a/lib/pleroma/workers/purge_expired_activity.ex b/lib/pleroma/workers/purge_expired_activity.ex
new file mode 100644
index 000000000..c168890a2
--- /dev/null
+++ b/lib/pleroma/workers/purge_expired_activity.ex
@@ -0,0 +1,72 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.PurgeExpiredActivity do
+ @moduledoc """
+ Worker which purges expired activity.
+ """
+
+ use Oban.Worker, queue: :activity_expiration, max_attempts: 1
+
+ import Ecto.Query
+
+ alias Pleroma.Activity
+
+ @spec enqueue(map()) ::
+ {:ok, Oban.Job.t()}
+ | {:error, :expired_activities_disabled}
+ | {:error, :expiration_too_close}
+ def enqueue(args) do
+ with true <- enabled?() do
+ {scheduled_at, args} = Map.pop(args, :expires_at)
+
+ args
+ |> new(scheduled_at: scheduled_at)
+ |> Oban.insert()
+ end
+ end
+
+ @impl true
+ def perform(%Oban.Job{args: %{"activity_id" => id}}) do
+ with %Activity{} = activity <- find_activity(id),
+ %Pleroma.User{} = user <- find_user(activity.object.data["actor"]) do
+ Pleroma.Web.CommonAPI.delete(activity.id, user)
+ end
+ end
+
+ defp enabled? do
+ with false <- Pleroma.Config.get([__MODULE__, :enabled], false) do
+ {:error, :expired_activities_disabled}
+ end
+ end
+
+ defp find_activity(id) do
+ with nil <- Activity.get_by_id_with_object(id) do
+ {:error, :activity_not_found}
+ end
+ end
+
+ defp find_user(ap_id) do
+ with nil <- Pleroma.User.get_by_ap_id(ap_id) do
+ {:error, :user_not_found}
+ end
+ end
+
+ def get_expiration(id) do
+ from(j in Oban.Job,
+ where: j.state == "scheduled",
+ where: j.queue == "activity_expiration",
+ where: fragment("?->>'activity_id' = ?", j.args, ^id)
+ )
+ |> Pleroma.Repo.one()
+ end
+
+ @spec expires_late_enough?(DateTime.t()) :: boolean()
+ def expires_late_enough?(scheduled_at) do
+ now = DateTime.utc_now()
+ diff = DateTime.diff(scheduled_at, now, :millisecond)
+ min_lifetime = Pleroma.Config.get([__MODULE__, :min_lifetime], 600)
+ diff > :timer.seconds(min_lifetime)
+ end
+end
diff --git a/lib/pleroma/workers/purge_expired_token.ex b/lib/pleroma/workers/purge_expired_token.ex
new file mode 100644
index 000000000..a81e0cd28
--- /dev/null
+++ b/lib/pleroma/workers/purge_expired_token.ex
@@ -0,0 +1,29 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.PurgeExpiredToken do
+ @moduledoc """
+ Worker which purges expired OAuth tokens
+ """
+
+ use Oban.Worker, queue: :token_expiration, max_attempts: 1
+
+ @spec enqueue(%{token_id: integer(), valid_until: DateTime.t(), mod: module()}) ::
+ {:ok, Oban.Job.t()} | {:error, Ecto.Changeset.t()}
+ def enqueue(args) do
+ {scheduled_at, args} = Map.pop(args, :valid_until)
+
+ args
+ |> __MODULE__.new(scheduled_at: scheduled_at)
+ |> Oban.insert()
+ end
+
+ @impl true
+ def perform(%Oban.Job{args: %{"token_id" => id, "mod" => module}}) do
+ module
+ |> String.to_existing_atom()
+ |> Pleroma.Repo.get(id)
+ |> Pleroma.Repo.delete()
+ end
+end