aboutsummaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/mix/tasks/pleroma/docs.ex42
-rw-r--r--lib/pleroma/activity.ex193
-rw-r--r--lib/pleroma/activity/queries.ex32
-rw-r--r--lib/pleroma/application.ex24
-rw-r--r--lib/pleroma/daemons/activity_expiration_daemon.ex (renamed from lib/pleroma/activity_expiration_worker.ex)8
-rw-r--r--lib/pleroma/daemons/digest_email_daemon.ex (renamed from lib/pleroma/digest_email_worker.ex)13
-rw-r--r--lib/pleroma/daemons/scheduled_activity_daemon.ex (renamed from lib/pleroma/scheduled_activity_worker.ex)8
-rw-r--r--lib/pleroma/docs/generator.ex73
-rw-r--r--lib/pleroma/docs/json.ex20
-rw-r--r--lib/pleroma/docs/markdown.ex78
-rw-r--r--lib/pleroma/emails/mailer.ex8
-rw-r--r--lib/pleroma/instances/instance.ex8
-rw-r--r--lib/pleroma/scheduler.ex7
-rw-r--r--lib/pleroma/user.ex68
-rw-r--r--lib/pleroma/user/info.ex17
-rw-r--r--lib/pleroma/web/activity_pub/activity_pub.ex3
-rw-r--r--lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex5
-rw-r--r--lib/pleroma/web/activity_pub/publisher.ex16
-rw-r--r--lib/pleroma/web/activity_pub/transmogrifier.ex7
-rw-r--r--lib/pleroma/web/activity_pub/utils.ex176
-rw-r--r--lib/pleroma/web/admin_api/config.ex15
-rw-r--r--lib/pleroma/web/controller_helper.ex95
-rw-r--r--lib/pleroma/web/federator/federator.ex90
-rw-r--r--lib/pleroma/web/federator/publisher.ex24
-rw-r--r--lib/pleroma/web/federator/retry_queue.ex239
-rw-r--r--lib/pleroma/web/mastodon_api/controllers/mastodon_api_controller.ex32
-rw-r--r--lib/pleroma/web/oauth/token/clean_worker.ex5
-rw-r--r--lib/pleroma/web/pleroma_api/pleroma_api_controller.ex27
-rw-r--r--lib/pleroma/web/push/push.ex7
-rw-r--r--lib/pleroma/web/router.ex1
-rw-r--r--lib/pleroma/web/salmon/salmon.ex11
-rw-r--r--lib/pleroma/web/twitter_api/controllers/util_controller.ex33
-rw-r--r--lib/pleroma/workers/activity_expiration_worker.ex18
-rw-r--r--lib/pleroma/workers/background_worker.ex69
-rw-r--r--lib/pleroma/workers/digest_emails_worker.ex16
-rw-r--r--lib/pleroma/workers/mailer_worker.ex15
-rw-r--r--lib/pleroma/workers/publisher_worker.ex25
-rw-r--r--lib/pleroma/workers/receiver_worker.ex18
-rw-r--r--lib/pleroma/workers/scheduled_activity_worker.ex12
-rw-r--r--lib/pleroma/workers/subscriber_worker.ex26
-rw-r--r--lib/pleroma/workers/transmogrifier_worker.ex15
-rw-r--r--lib/pleroma/workers/web_pusher_worker.ex16
-rw-r--r--lib/pleroma/workers/worker_helper.ex46
43 files changed, 897 insertions, 764 deletions
diff --git a/lib/mix/tasks/pleroma/docs.ex b/lib/mix/tasks/pleroma/docs.ex
new file mode 100644
index 000000000..0d2663648
--- /dev/null
+++ b/lib/mix/tasks/pleroma/docs.ex
@@ -0,0 +1,42 @@
+defmodule Mix.Tasks.Pleroma.Docs do
+ use Mix.Task
+ import Mix.Pleroma
+
+ @shortdoc "Generates docs from descriptions.exs"
+ @moduledoc """
+ Generates docs from `descriptions.exs`.
+
+ Supports two formats: `markdown` and `json`.
+
+ ## Generate Markdown docs
+
+ `mix pleroma.docs`
+
+ ## Generate JSON docs
+
+ `mix pleroma.docs json`
+ """
+
+ def run(["json"]) do
+ do_run(Pleroma.Docs.JSON)
+ end
+
+ def run(_) do
+ do_run(Pleroma.Docs.Markdown)
+ end
+
+ defp do_run(implementation) do
+ start_pleroma()
+
+ with {descriptions, _paths} <- Mix.Config.eval!("config/description.exs"),
+ {:ok, file_path} <-
+ Pleroma.Docs.Generator.process(
+ implementation,
+ descriptions[:pleroma][:config_description]
+ ) do
+ type = if implementation == Pleroma.Docs.Markdown, do: "Markdown", else: "JSON"
+
+ Mix.shell().info([:green, "#{type} docs successfully generated to #{file_path}."])
+ end
+ end
+end
diff --git a/lib/pleroma/activity.ex b/lib/pleroma/activity.ex
index 44f1e3011..ec558168a 100644
--- a/lib/pleroma/activity.ex
+++ b/lib/pleroma/activity.ex
@@ -6,6 +6,7 @@ defmodule Pleroma.Activity do
use Ecto.Schema
alias Pleroma.Activity
+ alias Pleroma.Activity.Queries
alias Pleroma.ActivityExpiration
alias Pleroma.Bookmark
alias Pleroma.Notification
@@ -65,8 +66,8 @@ defmodule Pleroma.Activity do
timestamps()
end
- def with_joined_object(query) do
- join(query, :inner, [activity], o in Object,
+ def with_joined_object(query, join_type \\ :inner) do
+ join(query, join_type, [activity], o in Object,
on:
fragment(
"(?->>'id') = COALESCE(?->'object'->>'id', ?->>'object')",
@@ -78,10 +79,10 @@ defmodule Pleroma.Activity do
)
end
- def with_preloaded_object(query) do
+ def with_preloaded_object(query, join_type \\ :inner) do
query
|> has_named_binding?(:object)
- |> if(do: query, else: with_joined_object(query))
+ |> if(do: query, else: with_joined_object(query, join_type))
|> preload([activity, object: object], object: object)
end
@@ -107,12 +108,9 @@ defmodule Pleroma.Activity do
def with_set_thread_muted_field(query, _), do: query
def get_by_ap_id(ap_id) do
- Repo.one(
- from(
- activity in Activity,
- where: fragment("(?)->>'id' = ?", activity.data, ^to_string(ap_id))
- )
- )
+ ap_id
+ |> Queries.by_ap_id()
+ |> Repo.one()
end
def get_bookmark(%Activity{} = activity, %User{} = user) do
@@ -133,21 +131,10 @@ defmodule Pleroma.Activity do
end
def get_by_ap_id_with_object(ap_id) do
- Repo.one(
- from(
- activity in Activity,
- where: fragment("(?)->>'id' = ?", activity.data, ^to_string(ap_id)),
- left_join: o in Object,
- on:
- fragment(
- "(?->>'id') = COALESCE(?->'object'->>'id', ?->>'object')",
- o.data,
- activity.data,
- activity.data
- ),
- preload: [object: o]
- )
- )
+ ap_id
+ |> Queries.by_ap_id()
+ |> with_preloaded_object(:left)
+ |> Repo.one()
end
def get_by_id(id) do
@@ -158,18 +145,9 @@ defmodule Pleroma.Activity do
end
def get_by_id_with_object(id) do
- from(activity in Activity,
- where: activity.id == ^id,
- inner_join: o in Object,
- on:
- fragment(
- "(?->>'id') = COALESCE(?->'object'->>'id', ?->>'object')",
- o.data,
- activity.data,
- activity.data
- ),
- preload: [object: o]
- )
+ Activity
+ |> where(id: ^id)
+ |> with_preloaded_object()
|> Repo.one()
end
@@ -180,51 +158,21 @@ defmodule Pleroma.Activity do
|> Repo.all()
end
- def by_object_ap_id(ap_id) do
- from(
- activity in Activity,
- where:
- fragment(
- "coalesce((?)->'object'->>'id', (?)->>'object') = ?",
- activity.data,
- activity.data,
- ^to_string(ap_id)
- )
- )
- end
-
- def create_by_object_ap_id(ap_ids) when is_list(ap_ids) do
- from(
- activity in Activity,
- where:
- fragment(
- "coalesce((?)->'object'->>'id', (?)->>'object') = ANY(?)",
- activity.data,
- activity.data,
- ^ap_ids
- ),
- where: fragment("(?)->>'type' = 'Create'", activity.data)
- )
- end
-
- def create_by_object_ap_id(ap_id) when is_binary(ap_id) do
- from(
- activity in Activity,
- where:
- fragment(
- "coalesce((?)->'object'->>'id', (?)->>'object') = ?",
- activity.data,
- activity.data,
- ^to_string(ap_id)
- ),
- where: fragment("(?)->>'type' = 'Create'", activity.data)
- )
+ @doc """
+ Accepts `ap_id` or list of `ap_id`.
+ Returns a query.
+ """
+ @spec create_by_object_ap_id(String.t() | [String.t()]) :: Ecto.Queryable.t()
+ def create_by_object_ap_id(ap_id) do
+ ap_id
+ |> Queries.by_object_id()
+ |> Queries.by_type("Create")
end
- def create_by_object_ap_id(_), do: nil
-
def get_all_create_by_object_ap_id(ap_id) do
- Repo.all(create_by_object_ap_id(ap_id))
+ ap_id
+ |> create_by_object_ap_id()
+ |> Repo.all()
end
def get_create_by_object_ap_id(ap_id) when is_binary(ap_id) do
@@ -235,54 +183,17 @@ defmodule Pleroma.Activity do
def get_create_by_object_ap_id(_), do: nil
- def create_by_object_ap_id_with_object(ap_ids) when is_list(ap_ids) do
- from(
- activity in Activity,
- where:
- fragment(
- "coalesce((?)->'object'->>'id', (?)->>'object') = ANY(?)",
- activity.data,
- activity.data,
- ^ap_ids
- ),
- where: fragment("(?)->>'type' = 'Create'", activity.data),
- inner_join: o in Object,
- on:
- fragment(
- "(?->>'id') = COALESCE(?->'object'->>'id', ?->>'object')",
- o.data,
- activity.data,
- activity.data
- ),
- preload: [object: o]
- )
- end
-
- def create_by_object_ap_id_with_object(ap_id) when is_binary(ap_id) do
- from(
- activity in Activity,
- where:
- fragment(
- "coalesce((?)->'object'->>'id', (?)->>'object') = ?",
- activity.data,
- activity.data,
- ^to_string(ap_id)
- ),
- where: fragment("(?)->>'type' = 'Create'", activity.data),
- inner_join: o in Object,
- on:
- fragment(
- "(?->>'id') = COALESCE(?->'object'->>'id', ?->>'object')",
- o.data,
- activity.data,
- activity.data
- ),
- preload: [object: o]
- )
+ @doc """
+ Accepts `ap_id` or list of `ap_id`.
+ Returns a query.
+ """
+ @spec create_by_object_ap_id_with_object(String.t() | [String.t()]) :: Ecto.Queryable.t()
+ def create_by_object_ap_id_with_object(ap_id) do
+ ap_id
+ |> create_by_object_ap_id()
+ |> with_preloaded_object()
end
- def create_by_object_ap_id_with_object(_), do: nil
-
def get_create_by_object_ap_id_with_object(ap_id) when is_binary(ap_id) do
ap_id
|> create_by_object_ap_id_with_object()
@@ -306,7 +217,8 @@ defmodule Pleroma.Activity do
def normalize(_), do: nil
def delete_by_ap_id(id) when is_binary(id) do
- by_object_ap_id(id)
+ id
+ |> Queries.by_object_id()
|> select([u], u)
|> Repo.delete_all()
|> elem(1)
@@ -350,31 +262,10 @@ defmodule Pleroma.Activity do
end
def follow_requests_for_actor(%Pleroma.User{ap_id: ap_id}) do
- from(
- a in Activity,
- where:
- fragment(
- "? ->> 'type' = 'Follow'",
- a.data
- ),
- where:
- fragment(
- "? ->> 'state' = 'pending'",
- a.data
- ),
- where:
- fragment(
- "coalesce((?)->'object'->>'id', (?)->>'object') = ?",
- a.data,
- a.data,
- ^ap_id
- )
- )
- end
-
- @spec query_by_actor(actor()) :: Ecto.Query.t()
- def query_by_actor(actor) do
- from(a in Activity, where: a.actor == ^actor)
+ ap_id
+ |> Queries.by_object_id()
+ |> Queries.by_type("Follow")
+ |> where([a], fragment("? ->> 'state' = 'pending'", a.data))
end
def restrict_deactivated_users(query) do
diff --git a/lib/pleroma/activity/queries.ex b/lib/pleroma/activity/queries.ex
index aa5b29566..13fa33831 100644
--- a/lib/pleroma/activity/queries.ex
+++ b/lib/pleroma/activity/queries.ex
@@ -13,6 +13,14 @@ defmodule Pleroma.Activity.Queries do
alias Pleroma.Activity
+ @spec by_ap_id(query, String.t()) :: query
+ def by_ap_id(query \\ Activity, ap_id) do
+ from(
+ activity in query,
+ where: fragment("(?)->>'id' = ?", activity.data, ^to_string(ap_id))
+ )
+ end
+
@spec by_actor(query, String.t()) :: query
def by_actor(query \\ Activity, actor) do
from(
@@ -21,8 +29,23 @@ defmodule Pleroma.Activity.Queries do
)
end
- @spec by_object_id(query, String.t()) :: query
- def by_object_id(query \\ Activity, object_id) do
+ @spec by_object_id(query, String.t() | [String.t()]) :: query
+ def by_object_id(query \\ Activity, object_id)
+
+ def by_object_id(query, object_ids) when is_list(object_ids) do
+ from(
+ activity in query,
+ where:
+ fragment(
+ "coalesce((?)->'object'->>'id', (?)->>'object') = ANY(?)",
+ activity.data,
+ activity.data,
+ ^object_ids
+ )
+ )
+ end
+
+ def by_object_id(query, object_id) when is_binary(object_id) do
from(activity in query,
where:
fragment(
@@ -41,9 +64,4 @@ defmodule Pleroma.Activity.Queries do
where: fragment("(?)->>'type' = ?", activity.data, ^activity_type)
)
end
-
- @spec limit(query, pos_integer()) :: query
- def limit(query \\ Activity, limit) do
- from(activity in query, limit: ^limit)
- end
end
diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex
index 1d46925f8..49094704b 100644
--- a/lib/pleroma/application.ex
+++ b/lib/pleroma/application.ex
@@ -31,18 +31,19 @@ defmodule Pleroma.Application do
children =
[
Pleroma.Repo,
+ Pleroma.Scheduler,
Pleroma.Config.TransferTask,
Pleroma.Emoji,
Pleroma.Captcha,
Pleroma.FlakeId,
- Pleroma.ScheduledActivityWorker,
- Pleroma.ActivityExpirationWorker
+ Pleroma.Daemons.ScheduledActivityDaemon,
+ Pleroma.Daemons.ActivityExpirationDaemon
] ++
cachex_children() ++
hackney_pool_children() ++
[
- Pleroma.Web.Federator.RetryQueue,
Pleroma.Stats,
+ {Oban, Pleroma.Config.get(Oban)},
%{
id: :web_push_init,
start: {Task, :start_link, [&Pleroma.Web.Push.init/0]},
@@ -70,9 +71,7 @@ defmodule Pleroma.Application do
# See http://elixir-lang.org/docs/stable/elixir/Supervisor.html
# for other strategies and supported options
opts = [strategy: :one_for_one, name: Pleroma.Supervisor]
- result = Supervisor.start_link(children, opts)
- :ok = after_supervisor_start()
- result
+ Supervisor.start_link(children, opts)
end
defp setup_instrumenters do
@@ -164,17 +163,4 @@ defmodule Pleroma.Application do
:hackney_pool.child_spec(pool, options)
end
end
-
- defp after_supervisor_start do
- with digest_config <- Application.get_env(:pleroma, :email_notifications)[:digest],
- true <- digest_config[:active] do
- PleromaJobQueue.schedule(
- digest_config[:schedule],
- :digest_emails,
- Pleroma.DigestEmailWorker
- )
- end
-
- :ok
- end
end
diff --git a/lib/pleroma/activity_expiration_worker.ex b/lib/pleroma/daemons/activity_expiration_daemon.ex
index 0f9e715f8..cab7628c4 100644
--- a/lib/pleroma/activity_expiration_worker.ex
+++ b/lib/pleroma/daemons/activity_expiration_daemon.ex
@@ -2,13 +2,14 @@
# Copyright © 2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
-defmodule Pleroma.ActivityExpirationWorker do
+defmodule Pleroma.Daemons.ActivityExpirationDaemon do
alias Pleroma.Activity
alias Pleroma.ActivityExpiration
alias Pleroma.Config
alias Pleroma.Repo
alias Pleroma.User
alias Pleroma.Web.CommonAPI
+
require Logger
use GenServer
import Ecto.Query
@@ -49,7 +50,10 @@ defmodule Pleroma.ActivityExpirationWorker do
def handle_info(:perform, state) do
ActivityExpiration.due_expirations(@schedule_interval)
|> Enum.each(fn expiration ->
- PleromaJobQueue.enqueue(:activity_expiration, __MODULE__, [:execute, expiration.id])
+ Pleroma.Workers.ActivityExpirationWorker.enqueue(
+ "activity_expiration",
+ %{"activity_expiration_id" => expiration.id}
+ )
end)
schedule_next()
diff --git a/lib/pleroma/digest_email_worker.ex b/lib/pleroma/daemons/digest_email_daemon.ex
index 5644d6a67..462ad2c55 100644
--- a/lib/pleroma/digest_email_worker.ex
+++ b/lib/pleroma/daemons/digest_email_daemon.ex
@@ -2,10 +2,11 @@
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
-defmodule Pleroma.DigestEmailWorker do
- import Ecto.Query
+defmodule Pleroma.Daemons.DigestEmailDaemon do
+ alias Pleroma.Repo
+ alias Pleroma.Workers.DigestEmailsWorker
- @queue_name :digest_emails
+ import Ecto.Query
def perform do
config = Pleroma.Config.get([:email_notifications, :digest])
@@ -20,8 +21,10 @@ defmodule Pleroma.DigestEmailWorker do
where: u.last_digest_emailed_at < datetime_add(^now, ^negative_interval, "day"),
select: u
)
- |> Pleroma.Repo.all()
- |> Enum.each(&PleromaJobQueue.enqueue(@queue_name, __MODULE__, [&1]))
+ |> Repo.all()
+ |> Enum.each(fn user ->
+ DigestEmailsWorker.enqueue("digest_email", %{"user_id" => user.id})
+ end)
end
@doc """
diff --git a/lib/pleroma/scheduled_activity_worker.ex b/lib/pleroma/daemons/scheduled_activity_daemon.ex
index 8578cab5e..aee5f723a 100644
--- a/lib/pleroma/scheduled_activity_worker.ex
+++ b/lib/pleroma/daemons/scheduled_activity_daemon.ex
@@ -2,7 +2,7 @@
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
-defmodule Pleroma.ScheduledActivityWorker do
+defmodule Pleroma.Daemons.ScheduledActivityDaemon do
@moduledoc """
Sends scheduled activities to the job queue.
"""
@@ -11,6 +11,7 @@ defmodule Pleroma.ScheduledActivityWorker do
alias Pleroma.ScheduledActivity
alias Pleroma.User
alias Pleroma.Web.CommonAPI
+
use GenServer
require Logger
@@ -45,7 +46,10 @@ defmodule Pleroma.ScheduledActivityWorker do
def handle_info(:perform, state) do
ScheduledActivity.due_activities(@schedule_interval)
|> Enum.each(fn scheduled_activity ->
- PleromaJobQueue.enqueue(:scheduled_activities, __MODULE__, [:execute, scheduled_activity.id])
+ Pleroma.Workers.ScheduledActivityWorker.enqueue(
+ "execute",
+ %{"activity_id" => scheduled_activity.id}
+ )
end)
schedule_next()
diff --git a/lib/pleroma/docs/generator.ex b/lib/pleroma/docs/generator.ex
new file mode 100644
index 000000000..aa578eee2
--- /dev/null
+++ b/lib/pleroma/docs/generator.ex
@@ -0,0 +1,73 @@
+defmodule Pleroma.Docs.Generator do
+ @callback process(keyword()) :: {:ok, String.t()}
+
+ @spec process(module(), keyword()) :: {:ok, String.t()}
+ def process(implementation, descriptions) do
+ implementation.process(descriptions)
+ end
+
+ @spec uploaders_list() :: [module()]
+ def uploaders_list do
+ {:ok, modules} = :application.get_key(:pleroma, :modules)
+
+ Enum.filter(modules, fn module ->
+ name_as_list = Module.split(module)
+
+ List.starts_with?(name_as_list, ["Pleroma", "Uploaders"]) and
+ List.last(name_as_list) != "Uploader"
+ end)
+ end
+
+ @spec filters_list() :: [module()]
+ def filters_list do
+ {:ok, modules} = :application.get_key(:pleroma, :modules)
+
+ Enum.filter(modules, fn module ->
+ name_as_list = Module.split(module)
+
+ List.starts_with?(name_as_list, ["Pleroma", "Upload", "Filter"])
+ end)
+ end
+
+ @spec mrf_list() :: [module()]
+ def mrf_list do
+ {:ok, modules} = :application.get_key(:pleroma, :modules)
+
+ Enum.filter(modules, fn module ->
+ name_as_list = Module.split(module)
+
+ List.starts_with?(name_as_list, ["Pleroma", "Web", "ActivityPub", "MRF"]) and
+ length(name_as_list) > 4
+ end)
+ end
+
+ @spec richmedia_parsers() :: [module()]
+ def richmedia_parsers do
+ {:ok, modules} = :application.get_key(:pleroma, :modules)
+
+ Enum.filter(modules, fn module ->
+ name_as_list = Module.split(module)
+
+ List.starts_with?(name_as_list, ["Pleroma", "Web", "RichMedia", "Parsers"]) and
+ length(name_as_list) == 5
+ end)
+ end
+end
+
+defimpl Jason.Encoder, for: Tuple do
+ def encode(tuple, opts) do
+ Jason.Encode.list(Tuple.to_list(tuple), opts)
+ end
+end
+
+defimpl Jason.Encoder, for: [Regex, Function] do
+ def encode(term, opts) do
+ Jason.Encode.string(inspect(term), opts)
+ end
+end
+
+defimpl String.Chars, for: Regex do
+ def to_string(term) do
+ inspect(term)
+ end
+end
diff --git a/lib/pleroma/docs/json.ex b/lib/pleroma/docs/json.ex
new file mode 100644
index 000000000..18ba01d58
--- /dev/null
+++ b/lib/pleroma/docs/json.ex
@@ -0,0 +1,20 @@
+defmodule Pleroma.Docs.JSON do
+ @behaviour Pleroma.Docs.Generator
+
+ @spec process(keyword()) :: {:ok, String.t()}
+ def process(descriptions) do
+ config_path = "docs/generate_config.json"
+
+ with {:ok, file} <- File.open(config_path, [:write]),
+ json <- generate_json(descriptions),
+ :ok <- IO.write(file, json),
+ :ok <- File.close(file) do
+ {:ok, config_path}
+ end
+ end
+
+ @spec generate_json([keyword()]) :: String.t()
+ def generate_json(descriptions) do
+ Jason.encode!(descriptions)
+ end
+end
diff --git a/lib/pleroma/docs/markdown.ex b/lib/pleroma/docs/markdown.ex
new file mode 100644
index 000000000..8386dc2fb
--- /dev/null
+++ b/lib/pleroma/docs/markdown.ex
@@ -0,0 +1,78 @@
+defmodule Pleroma.Docs.Markdown do
+ @behaviour Pleroma.Docs.Generator
+
+ @spec process(keyword()) :: {:ok, String.t()}
+ def process(descriptions) do
+ config_path = "docs/generated_config.md"
+ {:ok, file} = File.open(config_path, [:utf8, :write])
+ IO.write(file, "# Generated configuration\n")
+ IO.write(file, "Date of generation: #{Date.utc_today()}\n\n")
+
+ IO.write(
+ file,
+ "This file describe the configuration, it is recommended to edit the relevant `*.secret.exs` file instead of the others founds in the ``config`` directory.\n\n" <>
+ "If you run Pleroma with ``MIX_ENV=prod`` the file is ``prod.secret.exs``, otherwise it is ``dev.secret.exs``.\n\n"
+ )
+
+ for group <- descriptions do
+ if is_nil(group[:key]) do
+ IO.write(file, "## #{inspect(group[:group])}\n")
+ else
+ IO.write(file, "## #{inspect(group[:key])}\n")
+ end
+
+ IO.write(file, "#{group[:description]}\n")
+
+ for child <- group[:children] do
+ print_child_header(file, child)
+
+ print_suggestions(file, child[:suggestions])
+
+ if child[:children] do
+ for subchild <- child[:children] do
+ print_child_header(file, subchild)
+
+ print_suggestions(file, subchild[:suggestions])
+ end
+ end
+ end
+
+ IO.write(file, "\n")
+ end
+
+ :ok = File.close(file)
+ {:ok, config_path}
+ end
+
+ defp print_suggestion(file, suggestion) when is_list(suggestion) do
+ IO.write(file, " `#{inspect(suggestion)}`\n")
+ end
+
+ defp print_suggestion(file, suggestion) when is_function(suggestion) do
+ IO.write(file, " `#{inspect(suggestion.())}`\n")
+ end
+
+ defp print_suggestion(file, suggestion, as_list \\ false) do
+ list_mark = if as_list, do: "- ", else: ""
+ IO.write(file, " #{list_mark}`#{inspect(suggestion)}`\n")
+ end
+
+ defp print_suggestions(_file, nil), do: nil
+
+ defp print_suggestions(file, suggestions) do
+ IO.write(file, "Suggestions:\n")
+
+ if length(suggestions) > 1 do
+ for suggestion <- suggestions do
+ print_suggestion(file, suggestion, true)
+ end
+ else
+ print_suggestion(file, List.first(suggestions))
+ end
+ end
+
+ defp print_child_header(file, child) do
+ IO.write(file, "- `#{inspect(child[:key])}` -`#{inspect(child[:type])}` \n")
+ IO.write(file, "#{child[:description]} \n")
+ end
+end
diff --git a/lib/pleroma/emails/mailer.ex b/lib/pleroma/emails/mailer.ex
index 2e4657b7c..eb96f2e8b 100644
--- a/lib/pleroma/emails/mailer.ex
+++ b/lib/pleroma/emails/mailer.ex
@@ -9,6 +9,7 @@ defmodule Pleroma.Emails.Mailer do
The module contains functions to delivery email using Swoosh.Mailer.
"""
+ alias Pleroma.Workers.MailerWorker
alias Swoosh.DeliveryError
@otp_app :pleroma
@@ -19,7 +20,12 @@ defmodule Pleroma.Emails.Mailer do
@doc "add email to queue"
def deliver_async(email, config \\ []) do
- PleromaJobQueue.enqueue(:mailer, __MODULE__, [:deliver_async, email, config])
+ encoded_email =
+ email
+ |> :erlang.term_to_binary()
+ |> Base.encode64()
+
+ MailerWorker.enqueue("email", %{"encoded_email" => encoded_email, "config" => config})
end
@doc "callback to perform send email from queue"
diff --git a/lib/pleroma/instances/instance.ex b/lib/pleroma/instances/instance.ex
index 4d7ed4ca1..544c4b687 100644
--- a/lib/pleroma/instances/instance.ex
+++ b/lib/pleroma/instances/instance.ex
@@ -90,7 +90,7 @@ defmodule Pleroma.Instances.Instance do
def set_unreachable(url_or_host, unreachable_since \\ nil)
def set_unreachable(url_or_host, unreachable_since) when is_binary(url_or_host) do
- unreachable_since = unreachable_since || DateTime.utc_now()
+ unreachable_since = parse_datetime(unreachable_since) || NaiveDateTime.utc_now()
host = host(url_or_host)
existing_record = Repo.get_by(Instance, %{host: host})
@@ -114,4 +114,10 @@ defmodule Pleroma.Instances.Instance do
end
def set_unreachable(_, _), do: {:error, nil}
+
+ defp parse_datetime(datetime) when is_binary(datetime) do
+ NaiveDateTime.from_iso8601(datetime)
+ end
+
+ defp parse_datetime(datetime), do: datetime
end
diff --git a/lib/pleroma/scheduler.ex b/lib/pleroma/scheduler.ex
new file mode 100644
index 000000000..d84cd99ad
--- /dev/null
+++ b/lib/pleroma/scheduler.ex
@@ -0,0 +1,7 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Scheduler do
+ use Quantum.Scheduler, otp_app: :pleroma
+end
diff --git a/lib/pleroma/user.ex b/lib/pleroma/user.ex
index 3aa245f2a..f0306652c 100644
--- a/lib/pleroma/user.ex
+++ b/lib/pleroma/user.ex
@@ -27,6 +27,7 @@ defmodule Pleroma.User do
alias Pleroma.Web.OStatus
alias Pleroma.Web.RelMe
alias Pleroma.Web.Websub
+ alias Pleroma.Workers.BackgroundWorker
require Logger
@@ -174,11 +175,25 @@ defmodule Pleroma.User do
|> Repo.aggregate(:count, :id)
end
+ defp truncate_if_exists(params, key, max_length) do
+ if Map.has_key?(params, key) and is_binary(params[key]) do
+ {value, _chopped} = String.split_at(params[key], max_length)
+ Map.put(params, key, value)
+ else
+ params
+ end
+ end
+
def remote_user_creation(params) do
bio_limit = Pleroma.Config.get([:instance, :user_bio_length], 5000)
name_limit = Pleroma.Config.get([:instance, :user_name_length], 100)
- params = Map.put(params, :info, params[:info] || %{})
+ params =
+ params
+ |> Map.put(:info, params[:info] || %{})
+ |> truncate_if_exists(:name, name_limit)
+ |> truncate_if_exists(:bio, bio_limit)
+
info_cng = User.Info.remote_user_creation(%User.Info{}, params[:info])
changes =
@@ -633,8 +648,9 @@ defmodule Pleroma.User do
end
@doc "Fetch some posts when the user has just been federated with"
- def fetch_initial_posts(user),
- do: PleromaJobQueue.enqueue(:background, __MODULE__, [:fetch_initial_posts, user])
+ def fetch_initial_posts(user) do
+ BackgroundWorker.enqueue("fetch_initial_posts", %{"user_id" => user.id})
+ end
@spec get_followers_query(User.t(), pos_integer() | nil) :: Ecto.Query.t()
def get_followers_query(%User{} = user, nil) do
@@ -1064,7 +1080,7 @@ defmodule Pleroma.User do
end
def deactivate_async(user, status \\ true) do
- PleromaJobQueue.enqueue(:background, __MODULE__, [:deactivate_async, user, status])
+ BackgroundWorker.enqueue("deactivate_user", %{"user_id" => user.id, "status" => status})
end
def deactivate(%User{} = user, status \\ true) do
@@ -1092,9 +1108,9 @@ defmodule Pleroma.User do
|> update_and_set_cache()
end
- @spec delete(User.t()) :: :ok
- def delete(%User{} = user),
- do: PleromaJobQueue.enqueue(:background, __MODULE__, [:delete, user])
+ def delete(%User{} = user) do
+ BackgroundWorker.enqueue("delete_user", %{"user_id" => user.id})
+ end
@spec perform(atom(), User.t()) :: {:ok, User.t()}
def perform(:delete, %User{} = user) do
@@ -1201,25 +1217,24 @@ defmodule Pleroma.User do
Repo.all(query)
end
- def blocks_import(%User{} = blocker, blocked_identifiers) when is_list(blocked_identifiers),
- do:
- PleromaJobQueue.enqueue(:background, __MODULE__, [
- :blocks_import,
- blocker,
- blocked_identifiers
- ])
+ def blocks_import(%User{} = blocker, blocked_identifiers) when is_list(blocked_identifiers) do
+ BackgroundWorker.enqueue("blocks_import", %{
+ "blocker_id" => blocker.id,
+ "blocked_identifiers" => blocked_identifiers
+ })
+ end
- def follow_import(%User{} = follower, followed_identifiers) when is_list(followed_identifiers),
- do:
- PleromaJobQueue.enqueue(:background, __MODULE__, [
- :follow_import,
- follower,
- followed_identifiers
- ])
+ def follow_import(%User{} = follower, followed_identifiers)
+ when is_list(followed_identifiers) do
+ BackgroundWorker.enqueue("follow_import", %{
+ "follower_id" => follower.id,
+ "followed_identifiers" => followed_identifiers
+ })
+ end
def delete_user_activities(%User{ap_id: ap_id} = user) do
ap_id
- |> Activity.query_by_actor()
+ |> Activity.Queries.by_actor()
|> RepoStreamer.chunk_stream(50)
|> Stream.each(fn activities ->
Enum.each(activities, &delete_activity(&1))
@@ -1624,4 +1639,13 @@ defmodule Pleroma.User do
def is_internal_user?(%User{nickname: nil}), do: true
def is_internal_user?(%User{local: true, nickname: "internal." <> _}), do: true
def is_internal_user?(_), do: false
+
+ def change_email(user, email) do
+ user
+ |> cast(%{email: email}, [:email])
+ |> validate_required([:email])
+ |> unique_constraint(:email)
+ |> validate_format(:email, @email_regex)
+ |> update_and_set_cache()
+ end
end
diff --git a/lib/pleroma/user/info.ex b/lib/pleroma/user/info.ex
index 779bfbc18..151e025de 100644
--- a/lib/pleroma/user/info.ex
+++ b/lib/pleroma/user/info.ex
@@ -242,6 +242,13 @@ defmodule Pleroma.User.Info do
end
def remote_user_creation(info, params) do
+ params =
+ if Map.has_key?(params, :fields) do
+ Map.put(params, :fields, Enum.map(params[:fields], &truncate_field/1))
+ else
+ params
+ end
+
info
|> cast(params, [
:ap_enabled,
@@ -326,6 +333,16 @@ defmodule Pleroma.User.Info do
defp valid_field?(_), do: false
+ defp truncate_field(%{"name" => name, "value" => value}) do
+ {name, _chopped} =
+ String.split_at(name, Pleroma.Config.get([:instance, :account_field_name_length], 255))
+
+ {value, _chopped} =
+ String.split_at(value, Pleroma.Config.get([:instance, :account_field_value_length], 255))
+
+ %{"name" => name, "value" => value}
+ end
+
@spec confirmation_changeset(Info.t(), keyword()) :: Changeset.t()
def confirmation_changeset(info, opts) do
need_confirmation? = Keyword.get(opts, :need_confirmation)
diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex
index d23ec66ac..41f6a0f1f 100644
--- a/lib/pleroma/web/activity_pub/activity_pub.ex
+++ b/lib/pleroma/web/activity_pub/activity_pub.ex
@@ -17,6 +17,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
alias Pleroma.Web.ActivityPub.MRF
alias Pleroma.Web.ActivityPub.Transmogrifier
alias Pleroma.Web.WebFinger
+ alias Pleroma.Workers.BackgroundWorker
import Ecto.Query
import Pleroma.Web.ActivityPub.Utils
@@ -145,7 +146,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
activity
end
- PleromaJobQueue.enqueue(:background, Pleroma.Web.RichMedia.Helpers, [:fetch, activity])
+ BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
Notification.create_notifications(activity)
diff --git a/lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex b/lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex
index a179dd54d..26b8539fe 100644
--- a/lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex
+++ b/lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex
@@ -8,6 +8,7 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do
alias Pleroma.HTTP
alias Pleroma.Web.MediaProxy
+ alias Pleroma.Workers.BackgroundWorker
require Logger
@@ -30,7 +31,7 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do
url
|> Enum.each(fn
%{"href" => href} ->
- PleromaJobQueue.enqueue(:background, __MODULE__, [:prefetch, href])
+ BackgroundWorker.enqueue("media_proxy_prefetch", %{"url" => href})
x ->
Logger.debug("Unhandled attachment URL object #{inspect(x)}")
@@ -46,7 +47,7 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do
%{"type" => "Create", "object" => %{"attachment" => attachments} = _object} = message
)
when is_list(attachments) and length(attachments) > 0 do
- PleromaJobQueue.enqueue(:background, __MODULE__, [:preload, message])
+ BackgroundWorker.enqueue("media_proxy_preload", %{"message" => message})
{:ok, message}
end
diff --git a/lib/pleroma/web/activity_pub/publisher.ex b/lib/pleroma/web/activity_pub/publisher.ex
index c97405690..a6322e25a 100644
--- a/lib/pleroma/web/activity_pub/publisher.ex
+++ b/lib/pleroma/web/activity_pub/publisher.ex
@@ -84,6 +84,15 @@ defmodule Pleroma.Web.ActivityPub.Publisher do
end
end
+ def publish_one(%{actor_id: actor_id} = params) do
+ actor = User.get_cached_by_id(actor_id)
+
+ params
+ |> Map.delete(:actor_id)
+ |> Map.put(:actor, actor)
+ |> publish_one()
+ end
+
defp should_federate?(inbox, public) do
if public do
true
@@ -159,7 +168,8 @@ defmodule Pleroma.Web.ActivityPub.Publisher do
Publishes an activity with BCC to all relevant peers.
"""
- def publish(actor, %{data: %{"bcc" => bcc}} = activity) when is_list(bcc) and bcc != [] do
+ def publish(%User{} = actor, %{data: %{"bcc" => bcc}} = activity)
+ when is_list(bcc) and bcc != [] do
public = is_public?(activity)
{:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
@@ -186,7 +196,7 @@ defmodule Pleroma.Web.ActivityPub.Publisher do
Pleroma.Web.Federator.Publisher.enqueue_one(__MODULE__, %{
inbox: inbox,
json: json,
- actor: actor,
+ actor_id: actor.id,
id: activity.data["id"],
unreachable_since: unreachable_since
})
@@ -221,7 +231,7 @@ defmodule Pleroma.Web.ActivityPub.Publisher do
%{
inbox: inbox,
json: json,
- actor: actor,
+ actor_id: actor.id,
id: activity.data["id"],
unreachable_since: unreachable_since
}
diff --git a/lib/pleroma/web/activity_pub/transmogrifier.ex b/lib/pleroma/web/activity_pub/transmogrifier.ex
index acd61bda3..2fbb16623 100644
--- a/lib/pleroma/web/activity_pub/transmogrifier.ex
+++ b/lib/pleroma/web/activity_pub/transmogrifier.ex
@@ -15,6 +15,7 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do
alias Pleroma.Web.ActivityPub.Utils
alias Pleroma.Web.ActivityPub.Visibility
alias Pleroma.Web.Federator
+ alias Pleroma.Workers.TransmogrifierWorker
import Ecto.Query
@@ -185,12 +186,12 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do
|> Map.put("context", replied_object.data["context"] || object["conversation"])
else
e ->
- Logger.error("Couldn't fetch \"#{inspect(in_reply_to_id)}\", error: #{inspect(e)}")
+ Logger.error("Couldn't fetch #{inspect(in_reply_to_id)}, error: #{inspect(e)}")
object
end
e ->
- Logger.error("Couldn't fetch \"#{inspect(in_reply_to_id)}\", error: #{inspect(e)}")
+ Logger.error("Couldn't fetch #{inspect(in_reply_to_id)}, error: #{inspect(e)}")
object
end
else
@@ -1051,7 +1052,7 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do
already_ap <- User.ap_enabled?(user),
{:ok, user} <- upgrade_user(user, data) do
if not already_ap do
- PleromaJobQueue.enqueue(:transmogrifier, __MODULE__, [:user_upgrade, user])
+ TransmogrifierWorker.enqueue("user_upgrade", %{"user_id" => user.id})
end
{:ok, user}
diff --git a/lib/pleroma/web/activity_pub/utils.ex b/lib/pleroma/web/activity_pub/utils.ex
index c9c0c3763..258e56066 100644
--- a/lib/pleroma/web/activity_pub/utils.ex
+++ b/lib/pleroma/web/activity_pub/utils.ex
@@ -85,15 +85,13 @@ defmodule Pleroma.Web.ActivityPub.Utils do
defp extract_list(_), do: []
def maybe_splice_recipient(ap_id, params) do
- need_splice =
+ need_splice? =
!recipient_in_collection(ap_id, params["to"]) &&
!recipient_in_collection(ap_id, params["cc"])
- cc_list = extract_list(params["cc"])
-
- if need_splice do
- params
- |> Map.put("cc", [ap_id | cc_list])
+ if need_splice? do
+ cc_list = extract_list(params["cc"])
+ Map.put(params, "cc", [ap_id | cc_list])
else
params
end
@@ -139,7 +137,7 @@ defmodule Pleroma.Web.ActivityPub.Utils do
"object" => object
}
- Notification.get_notified_from_activity(%Activity{data: fake_create_activity}, false)
+ get_notified_from_object(fake_create_activity)
end
def get_notified_from_object(object) do
@@ -169,14 +167,7 @@ defmodule Pleroma.Web.ActivityPub.Utils do
@spec maybe_federate(any()) :: :ok
def maybe_federate(%Activity{local: true} = activity) do
if Pleroma.Config.get!([:instance, :federating]) do
- priority =
- case activity.data["type"] do
- "Delete" -> 10
- "Create" -> 1
- _ -> 5
- end
-
- Pleroma.Web.Federator.publish(activity, priority)
+ Pleroma.Web.Federator.publish(activity)
end
:ok
@@ -188,9 +179,9 @@ defmodule Pleroma.Web.ActivityPub.Utils do
Adds an id and a published data if they aren't there,
also adds it to an included object
"""
- def lazy_put_activity_defaults(map, fake \\ false) do
+ def lazy_put_activity_defaults(map, fake? \\ false) do
map =
- unless fake do
+ if not fake? do
%{data: %{"id" => context}, id: context_id} = create_context(map["context"])
map
@@ -207,7 +198,7 @@ defmodule Pleroma.Web.ActivityPub.Utils do
end
if is_map(map["object"]) do
- object = lazy_put_object_defaults(map["object"], map, fake)
+ object = lazy_put_object_defaults(map["object"], map, fake?)
%{map | "object" => object}
else
map
@@ -217,9 +208,9 @@ defmodule Pleroma.Web.ActivityPub.Utils do
@doc """
Adds an id and published date if they aren't there.
"""
- def lazy_put_object_defaults(map, activity \\ %{}, fake)
+ def lazy_put_object_defaults(map, activity \\ %{}, fake?)
- def lazy_put_object_defaults(map, activity, true = _fake) do
+ def lazy_put_object_defaults(map, activity, true = _fake?) do
map
|> Map.put_new_lazy("published", &make_date/0)
|> Map.put_new("id", "pleroma:fake_object_id")
@@ -228,7 +219,7 @@ defmodule Pleroma.Web.ActivityPub.Utils do
|> Map.put_new("context_id", activity["context_id"])
end
- def lazy_put_object_defaults(map, activity, _fake) do
+ def lazy_put_object_defaults(map, activity, _fake?) do
map
|> Map.put_new_lazy("id", &generate_object_id/0)
|> Map.put_new_lazy("published", &make_date/0)
@@ -242,9 +233,7 @@ defmodule Pleroma.Web.ActivityPub.Utils do
def insert_full_object(%{"object" => %{"type" => type} = object_data} = map)
when is_map(object_data) and type in @supported_object_types do
with {:ok, object} <- Object.create(object_data) do
- map =
- map
- |> Map.put("object", object.data["id"])
+ map = Map.put(map, "object", object.data["id"])
{:ok, map, object}
end
@@ -263,7 +252,7 @@ defmodule Pleroma.Web.ActivityPub.Utils do
|> Activity.Queries.by_actor()
|> Activity.Queries.by_object_id(id)
|> Activity.Queries.by_type("Like")
- |> Activity.Queries.limit(1)
+ |> limit(1)
|> Repo.one()
end
@@ -380,12 +369,11 @@ defmodule Pleroma.Web.ActivityPub.Utils do
%Activity{data: %{"actor" => actor, "object" => object}} = activity,
state
) do
- with new_data <-
- activity.data
- |> Map.put("state", state),
- changeset <- Changeset.change(activity, data: new_data),
- {:ok, activity} <- Repo.update(changeset),
- _ <- User.set_follow_state_cache(actor, object, state) do
+ new_data = Map.put(activity.data, "state", state)
+ changeset = Changeset.change(activity, data: new_data)
+
+ with {:ok, activity} <- Repo.update(changeset) do
+ User.set_follow_state_cache(actor, object, state)
{:ok, activity}
end
end
@@ -410,28 +398,14 @@ defmodule Pleroma.Web.ActivityPub.Utils do
end
def fetch_latest_follow(%User{ap_id: follower_id}, %User{ap_id: followed_id}) do
- query =
- from(
- activity in Activity,
- where:
- fragment(
- "? ->> 'type' = 'Follow'",
- activity.data
- ),
- where: activity.actor == ^follower_id,
- # this is to use the index
- where:
- fragment(
- "coalesce((?)->'object'->>'id', (?)->>'object') = ?",
- activity.data,
- activity.data,
- ^followed_id
- ),
- order_by: [fragment("? desc nulls last", activity.id)],
- limit: 1
- )
-
- Repo.one(query)
+ "Follow"
+ |> Activity.Queries.by_type()
+ |> where(actor: ^follower_id)
+ # this is to use the index
+ |> Activity.Queries.by_object_id(followed_id)
+ |> order_by([activity], fragment("? desc nulls last", activity.id))
+ |> limit(1)
+ |> Repo.one()
end
#### Announce-related helpers
@@ -439,23 +413,13 @@ defmodule Pleroma.Web.ActivityPub.Utils do
@doc """
Retruns an existing announce activity if the notice has already been announced
"""
- def get_existing_announce(actor, %{data: %{"id" => id}}) do
- query =
- from(
- activity in Activity,
- where: activity.actor == ^actor,
- # this is to use the index
- where:
- fragment(
- "coalesce((?)->'object'->>'id', (?)->>'object') = ?",
- activity.data,
- activity.data,
- ^id
- ),
- where: fragment("(?)->>'type' = 'Announce'", activity.data)
- )
-
- Repo.one(query)
+ def get_existing_announce(actor, %{data: %{"id" => ap_id}}) do
+ "Announce"
+ |> Activity.Queries.by_type()
+ |> where(actor: ^actor)
+ # this is to use the index
+ |> Activity.Queries.by_object_id(ap_id)
+ |> Repo.one()
end
@doc """
@@ -538,11 +502,13 @@ defmodule Pleroma.Web.ActivityPub.Utils do
object
) do
announcements =
- if is_list(object.data["announcements"]), do: object.data["announcements"], else: []
+ if is_list(object.data["announcements"]) do
+ Enum.uniq([actor | object.data["announcements"]])
+ else
+ [actor]
+ end
- with announcements <- [actor | announcements] |> Enum.uniq() do
- update_element_in_object("announcement", announcements, object)
- end
+ update_element_in_object("announcement", announcements, object)
end
def add_announce_to_object(_, object), do: {:ok, object}
@@ -570,28 +536,14 @@ defmodule Pleroma.Web.ActivityPub.Utils do
#### Block-related helpers
def fetch_latest_block(%User{ap_id: blocker_id}, %User{ap_id: blocked_id}) do
- query =
- from(
- activity in Activity,
- where:
- fragment(
- "? ->> 'type' = 'Block'",
- activity.data
- ),
- where: activity.actor == ^blocker_id,
- # this is to use the index
- where:
- fragment(
- "coalesce((?)->'object'->>'id', (?)->>'object') = ?",
- activity.data,
- activity.data,
- ^blocked_id
- ),
- order_by: [fragment("? desc nulls last", activity.id)],
- limit: 1
- )
-
- Repo.one(query)
+ "Block"
+ |> Activity.Queries.by_type()
+ |> where(actor: ^blocker_id)
+ # this is to use the index
+ |> Activity.Queries.by_object_id(blocked_id)
+ |> order_by([activity], fragment("? desc nulls last", activity.id))
+ |> limit(1)
+ |> Repo.one()
end
def make_block_data(blocker, blocked, activity_id) do
@@ -695,11 +647,11 @@ defmodule Pleroma.Web.ActivityPub.Utils do
#### Report-related helpers
def update_report_state(%Activity{} = activity, state) when state in @supported_report_states do
- with new_data <- Map.put(activity.data, "state", state),
- changeset <- Changeset.change(activity, data: new_data),
- {:ok, activity} <- Repo.update(changeset) do
- {:ok, activity}
- end
+ new_data = Map.put(activity.data, "state", state)
+
+ activity
+ |> Changeset.change(data: new_data)
+ |> Repo.update()
end
def update_report_state(_, _), do: {:error, "Unsupported state"}
@@ -766,21 +718,13 @@ defmodule Pleroma.Web.ActivityPub.Utils do
end
def get_existing_votes(actor, %{data: %{"id" => id}}) do
- query =
- from(
- [activity, object: object] in Activity.with_preloaded_object(Activity),
- where: fragment("(?)->>'type' = 'Create'", activity.data),
- where: fragment("(?)->>'actor' = ?", activity.data, ^actor),
- where:
- fragment(
- "(?)->>'inReplyTo' = ?",
- object.data,
- ^to_string(id)
- ),
- where: fragment("(?)->>'type' = 'Answer'", object.data)
- )
-
- Repo.all(query)
+ actor
+ |> Activity.Queries.by_actor()
+ |> Activity.Queries.by_type("Create")
+ |> Activity.with_preloaded_object()
+ |> where([a, object: o], fragment("(?)->>'inReplyTo' = ?", o.data, ^to_string(id)))
+ |> where([a, object: o], fragment("(?)->>'type' = 'Answer'", o.data))
+ |> Repo.all()
end
defp maybe_put(map, _key, nil), do: map
diff --git a/lib/pleroma/web/admin_api/config.ex b/lib/pleroma/web/admin_api/config.ex
index a10cc779b..1917a5580 100644
--- a/lib/pleroma/web/admin_api/config.ex
+++ b/lib/pleroma/web/admin_api/config.ex
@@ -90,6 +90,8 @@ defmodule Pleroma.Web.AdminAPI.Config do
for v <- entity, into: [], do: do_convert(v)
end
+ defp do_convert(%Regex{} = entity), do: inspect(entity)
+
defp do_convert(entity) when is_map(entity) do
for {k, v} <- entity, into: %{}, do: {do_convert(k), do_convert(v)}
end
@@ -122,7 +124,7 @@ defmodule Pleroma.Web.AdminAPI.Config do
def transform(entity), do: :erlang.term_to_binary(entity)
- defp do_transform(%Regex{} = entity) when is_map(entity), do: entity
+ defp do_transform(%Regex{} = entity), do: entity
defp do_transform(%{"tuple" => [":dispatch", [entity]]}) do
{dispatch_settings, []} = do_eval(entity)
@@ -154,8 +156,15 @@ defmodule Pleroma.Web.AdminAPI.Config do
defp do_transform(entity), do: entity
defp do_transform_string("~r/" <> pattern) do
- pattern = String.trim_trailing(pattern, "/")
- ~r/#{pattern}/
+ modificator = String.split(pattern, "/") |> List.last()
+ pattern = String.trim_trailing(pattern, "/" <> modificator)
+
+ case modificator do
+ "" -> ~r/#{pattern}/
+ "i" -> ~r/#{pattern}/i
+ "u" -> ~r/#{pattern}/u
+ "s" -> ~r/#{pattern}/s
+ end
end
defp do_transform_string(":" <> atom), do: String.to_atom(atom)
diff --git a/lib/pleroma/web/controller_helper.ex b/lib/pleroma/web/controller_helper.ex
index eeac9f503..b53a01955 100644
--- a/lib/pleroma/web/controller_helper.ex
+++ b/lib/pleroma/web/controller_helper.ex
@@ -34,79 +34,38 @@ defmodule Pleroma.Web.ControllerHelper do
defp param_to_integer(_, default), do: default
- def add_link_headers(
- conn,
- method,
- activities,
- param \\ nil,
- params \\ %{},
- func3 \\ nil,
- func4 \\ nil
- ) do
- params =
- conn.params
- |> Map.drop(["since_id", "max_id", "min_id"])
- |> Map.merge(params)
+ def add_link_headers(conn, activities, extra_params \\ %{}) do
+ case List.last(activities) do
+ %{id: max_id} ->
+ params =
+ conn.params
+ |> Map.drop(Map.keys(conn.path_params))
+ |> Map.drop(["since_id", "max_id", "min_id"])
+ |> Map.merge(extra_params)
- last = List.last(activities)
+ limit =
+ params
+ |> Map.get("limit", "20")
+ |> String.to_integer()
- func3 = func3 || (&mastodon_api_url/3)
- func4 = func4 || (&mastodon_api_url/4)
+ min_id =
+ if length(activities) <= limit do
+ activities
+ |> List.first()
+ |> Map.get(:id)
+ else
+ activities
+ |> Enum.at(limit * -1)
+ |> Map.get(:id)
+ end
- if last do
- max_id = last.id
+ next_url = current_url(conn, Map.merge(params, %{max_id: max_id}))
+ prev_url = current_url(conn, Map.merge(params, %{min_id: min_id}))
- limit =
- params
- |> Map.get("limit", "20")
- |> String.to_integer()
+ put_resp_header(conn, "link", "<#{next_url}>; rel=\"next\", <#{prev_url}>; rel=\"prev\"")
- min_id =
- if length(activities) <= limit do
- activities
- |> List.first()
- |> Map.get(:id)
- else
- activities
- |> Enum.at(limit * -1)
- |> Map.get(:id)
- end
-
- {next_url, prev_url} =
- if param do
- {
- func4.(
- Pleroma.Web.Endpoint,
- method,
- param,
- Map.merge(params, %{max_id: max_id})
- ),
- func4.(
- Pleroma.Web.Endpoint,
- method,
- param,
- Map.merge(params, %{min_id: min_id})
- )
- }
- else
- {
- func3.(
- Pleroma.Web.Endpoint,
- method,
- Map.merge(params, %{max_id: max_id})
- ),
- func3.(
- Pleroma.Web.Endpoint,
- method,
- Map.merge(params, %{min_id: min_id})
- )
- }
- end
-
- conn
- |> put_resp_header("link", "<#{next_url}>; rel=\"next\", <#{prev_url}>; rel=\"prev\"")
- else
- conn
+ _ ->
+ conn
end
end
end
diff --git a/lib/pleroma/web/federator/federator.ex b/lib/pleroma/web/federator/federator.ex
index f4f9e83e0..1a2da014a 100644
--- a/lib/pleroma/web/federator/federator.ex
+++ b/lib/pleroma/web/federator/federator.ex
@@ -10,16 +10,17 @@ defmodule Pleroma.Web.Federator do
alias Pleroma.Web.ActivityPub.Transmogrifier
alias Pleroma.Web.ActivityPub.Utils
alias Pleroma.Web.Federator.Publisher
- alias Pleroma.Web.Federator.RetryQueue
alias Pleroma.Web.OStatus
alias Pleroma.Web.Websub
+ alias Pleroma.Workers.PublisherWorker
+ alias Pleroma.Workers.ReceiverWorker
+ alias Pleroma.Workers.SubscriberWorker
require Logger
def init do
- # 1 minute
- Process.sleep(1000 * 60)
- refresh_subscriptions()
+ # To do: consider removing this call in favor of scheduled execution (`quantum`-based)
+ refresh_subscriptions(schedule_in: 60)
end
@doc "Addresses [memory leaks on recursive replies fetching](https://git.pleroma.social/pleroma/pleroma/issues/161)"
@@ -37,50 +38,38 @@ defmodule Pleroma.Web.Federator do
# Client API
def incoming_doc(doc) do
- PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_doc, doc])
+ ReceiverWorker.enqueue("incoming_doc", %{"body" => doc})
end
def incoming_ap_doc(params) do
- PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_ap_doc, params])
+ ReceiverWorker.enqueue("incoming_ap_doc", %{"params" => params})
end
- def publish(activity, priority \\ 1) do
- PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish, activity], priority)
+ def publish(%{id: "pleroma:fakeid"} = activity) do
+ perform(:publish, activity)
end
- def verify_websub(websub) do
- PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:verify_websub, websub])
+ def publish(activity) do
+ PublisherWorker.enqueue("publish", %{"activity_id" => activity.id})
end
- def request_subscription(sub) do
- PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:request_subscription, sub])
+ def verify_websub(websub) do
+ SubscriberWorker.enqueue("verify_websub", %{"websub_id" => websub.id})
end
- def refresh_subscriptions do
- PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:refresh_subscriptions])
+ def request_subscription(websub) do
+ SubscriberWorker.enqueue("request_subscription", %{"websub_id" => websub.id})
end
- # Job Worker Callbacks
-
- def perform(:refresh_subscriptions) do
- Logger.debug("Federator running refresh subscriptions")
- Websub.refresh_subscriptions()
-
- spawn(fn ->
- # 6 hours
- Process.sleep(1000 * 60 * 60 * 6)
- refresh_subscriptions()
- end)
+ def refresh_subscriptions(worker_args \\ []) do
+ SubscriberWorker.enqueue("refresh_subscriptions", %{}, worker_args ++ [max_attempts: 1])
end
- def perform(:request_subscription, websub) do
- Logger.debug("Refreshing #{websub.topic}")
+ # Job Worker Callbacks
- with {:ok, websub} <- Websub.request_subscription(websub) do
- Logger.debug("Successfully refreshed #{websub.topic}")
- else
- _e -> Logger.debug("Couldn't refresh #{websub.topic}")
- end
+ @spec perform(atom(), module(), any()) :: {:ok, any()} | {:error, any()}
+ def perform(:publish_one, module, params) do
+ apply(module, :publish_one, [params])
end
def perform(:publish, activity) do
@@ -92,14 +81,6 @@ defmodule Pleroma.Web.Federator do
end
end
- def perform(:verify_websub, websub) do
- Logger.debug(fn ->
- "Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})"
- end)
-
- Websub.verify(websub)
- end
-
def perform(:incoming_doc, doc) do
Logger.info("Got document, trying to parse")
OStatus.handle_incoming(doc)
@@ -130,22 +111,27 @@ defmodule Pleroma.Web.Federator do
end
end
- def perform(
- :publish_single_websub,
- %{xml: _xml, topic: _topic, callback: _callback, secret: _secret} = params
- ) do
- case Websub.publish_one(params) do
- {:ok, _} ->
- :ok
+ def perform(:request_subscription, websub) do
+ Logger.debug("Refreshing #{websub.topic}")
- {:error, _} ->
- RetryQueue.enqueue(params, Websub)
+ with {:ok, websub} <- Websub.request_subscription(websub) do
+ Logger.debug("Successfully refreshed #{websub.topic}")
+ else
+ _e -> Logger.debug("Couldn't refresh #{websub.topic}")
end
end
- def perform(type, _) do
- Logger.debug(fn -> "Unknown task: #{type}" end)
- {:error, "Don't know what to do with this"}
+ def perform(:verify_websub, websub) do
+ Logger.debug(fn ->
+ "Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})"
+ end)
+
+ Websub.verify(websub)
+ end
+
+ def perform(:refresh_subscriptions) do
+ Logger.debug("Federator running refresh subscriptions")
+ Websub.refresh_subscriptions()
end
def ap_enabled_actor(id) do
diff --git a/lib/pleroma/web/federator/publisher.ex b/lib/pleroma/web/federator/publisher.ex
index 70f870244..937064638 100644
--- a/lib/pleroma/web/federator/publisher.ex
+++ b/lib/pleroma/web/federator/publisher.ex
@@ -6,7 +6,7 @@ defmodule Pleroma.Web.Federator.Publisher do
alias Pleroma.Activity
alias Pleroma.Config
alias Pleroma.User
- alias Pleroma.Web.Federator.RetryQueue
+ alias Pleroma.Workers.PublisherWorker
require Logger
@@ -30,23 +30,11 @@ defmodule Pleroma.Web.Federator.Publisher do
Enqueue publishing a single activity.
"""
@spec enqueue_one(module(), Map.t()) :: :ok
- def enqueue_one(module, %{} = params),
- do: PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish_one, module, params])
-
- @spec perform(atom(), module(), any()) :: {:ok, any()} | {:error, any()}
- def perform(:publish_one, module, params) do
- case apply(module, :publish_one, [params]) do
- {:ok, _} ->
- :ok
-
- {:error, _e} ->
- RetryQueue.enqueue(params, module)
- end
- end
-
- def perform(type, _, _) do
- Logger.debug("Unknown task: #{type}")
- {:error, "Don't know what to do with this"}
+ def enqueue_one(module, %{} = params) do
+ PublisherWorker.enqueue(
+ "publish_one",
+ %{"module" => to_string(module), "params" => params}
+ )
end
@doc """
diff --git a/lib/pleroma/web/federator/retry_queue.ex b/lib/pleroma/web/federator/retry_queue.ex
deleted file mode 100644
index 9eab8c218..000000000
--- a/lib/pleroma/web/federator/retry_queue.ex
+++ /dev/null
@@ -1,239 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Web.Federator.RetryQueue do
- use GenServer
-
- require Logger
-
- def init(args) do
- queue_table = :ets.new(:pleroma_retry_queue, [:bag, :protected])
-
- {:ok, %{args | queue_table: queue_table, running_jobs: :sets.new()}}
- end
-
- def start_link(_) do
- enabled =
- if Pleroma.Config.get(:env) == :test,
- do: true,
- else: Pleroma.Config.get([__MODULE__, :enabled], false)
-
- if enabled do
- Logger.info("Starting retry queue")
-
- linkres =
- GenServer.start_link(
- __MODULE__,
- %{delivered: 0, dropped: 0, queue_table: nil, running_jobs: nil},
- name: __MODULE__
- )
-
- maybe_kickoff_timer()
- linkres
- else
- Logger.info("Retry queue disabled")
- :ignore
- end
- end
-
- def enqueue(data, transport, retries \\ 0) do
- GenServer.cast(__MODULE__, {:maybe_enqueue, data, transport, retries + 1})
- end
-
- def get_stats do
- GenServer.call(__MODULE__, :get_stats)
- end
-
- def reset_stats do
- GenServer.call(__MODULE__, :reset_stats)
- end
-
- def get_retry_params(retries) do
- if retries > Pleroma.Config.get([__MODULE__, :max_retries]) do
- {:drop, "Max retries reached"}
- else
- {:retry, growth_function(retries)}
- end
- end
-
- def get_retry_timer_interval do
- Pleroma.Config.get([:retry_queue, :interval], 1000)
- end
-
- defp ets_count_expires(table, current_time) do
- :ets.select_count(
- table,
- [
- {
- {:"$1", :"$2"},
- [{:"=<", :"$1", {:const, current_time}}],
- [true]
- }
- ]
- )
- end
-
- defp ets_pop_n_expired(table, current_time, desired) do
- {popped, _continuation} =
- :ets.select(
- table,
- [
- {
- {:"$1", :"$2"},
- [{:"=<", :"$1", {:const, current_time}}],
- [:"$_"]
- }
- ],
- desired
- )
-
- popped
- |> Enum.each(fn e ->
- :ets.delete_object(table, e)
- end)
-
- popped
- end
-
- def maybe_start_job(running_jobs, queue_table) do
- # we don't want to hit the ets or the DateTime more times than we have to
- # could optimize slightly further by not using the count, and instead grabbing
- # up to N objects early...
- current_time = DateTime.to_unix(DateTime.utc_now())
- n_running_jobs = :sets.size(running_jobs)
-
- if n_running_jobs < Pleroma.Config.get([__MODULE__, :max_jobs]) do
- n_ready_jobs = ets_count_expires(queue_table, current_time)
-
- if n_ready_jobs > 0 do
- # figure out how many we could start
- available_job_slots = Pleroma.Config.get([__MODULE__, :max_jobs]) - n_running_jobs
- start_n_jobs(running_jobs, queue_table, current_time, available_job_slots)
- else
- running_jobs
- end
- else
- running_jobs
- end
- end
-
- defp start_n_jobs(running_jobs, _queue_table, _current_time, 0) do
- running_jobs
- end
-
- defp start_n_jobs(running_jobs, queue_table, current_time, available_job_slots)
- when available_job_slots > 0 do
- candidates = ets_pop_n_expired(queue_table, current_time, available_job_slots)
-
- candidates
- |> List.foldl(running_jobs, fn {_, e}, rj ->
- {:ok, pid} = Task.start(fn -> worker(e) end)
- mref = Process.monitor(pid)
- :sets.add_element(mref, rj)
- end)
- end
-
- def worker({:send, data, transport, retries}) do
- case transport.publish_one(data) do
- {:ok, _} ->
- GenServer.cast(__MODULE__, :inc_delivered)
- :delivered
-
- {:error, _reason} ->
- enqueue(data, transport, retries)
- :retry
- end
- end
-
- def handle_call(:get_stats, _from, %{delivered: delivery_count, dropped: drop_count} = state) do
- {:reply, %{delivered: delivery_count, dropped: drop_count}, state}
- end
-
- def handle_call(:reset_stats, _from, %{delivered: delivery_count, dropped: drop_count} = state) do
- {:reply, %{delivered: delivery_count, dropped: drop_count},
- %{state | delivered: 0, dropped: 0}}
- end
-
- def handle_cast(:reset_stats, state) do
- {:noreply, %{state | delivered: 0, dropped: 0}}
- end
-
- def handle_cast(
- {:maybe_enqueue, data, transport, retries},
- %{dropped: drop_count, queue_table: queue_table, running_jobs: running_jobs} = state
- ) do
- case get_retry_params(retries) do
- {:retry, timeout} ->
- :ets.insert(queue_table, {timeout, {:send, data, transport, retries}})
- running_jobs = maybe_start_job(running_jobs, queue_table)
- {:noreply, %{state | running_jobs: running_jobs}}
-
- {:drop, message} ->
- Logger.debug(message)
- {:noreply, %{state | dropped: drop_count + 1}}
- end
- end
-
- def handle_cast(:kickoff_timer, state) do
- retry_interval = get_retry_timer_interval()
- Process.send_after(__MODULE__, :retry_timer_run, retry_interval)
- {:noreply, state}
- end
-
- def handle_cast(:inc_delivered, %{delivered: delivery_count} = state) do
- {:noreply, %{state | delivered: delivery_count + 1}}
- end
-
- def handle_cast(:inc_dropped, %{dropped: drop_count} = state) do
- {:noreply, %{state | dropped: drop_count + 1}}
- end
-
- def handle_info({:send, data, transport, retries}, %{delivered: delivery_count} = state) do
- case transport.publish_one(data) do
- {:ok, _} ->
- {:noreply, %{state | delivered: delivery_count + 1}}
-
- {:error, _reason} ->
- enqueue(data, transport, retries)
- {:noreply, state}
- end
- end
-
- def handle_info(
- :retry_timer_run,
- %{queue_table: queue_table, running_jobs: running_jobs} = state
- ) do
- maybe_kickoff_timer()
- running_jobs = maybe_start_job(running_jobs, queue_table)
- {:noreply, %{state | running_jobs: running_jobs}}
- end
-
- def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do
- %{running_jobs: running_jobs, queue_table: queue_table} = state
- running_jobs = :sets.del_element(ref, running_jobs)
- running_jobs = maybe_start_job(running_jobs, queue_table)
- {:noreply, %{state | running_jobs: running_jobs}}
- end
-
- def handle_info(unknown, state) do
- Logger.debug("RetryQueue: don't know what to do with #{inspect(unknown)}, ignoring")
- {:noreply, state}
- end
-
- if Pleroma.Config.get(:env) == :test do
- defp growth_function(_retries) do
- _shutit = Pleroma.Config.get([__MODULE__, :initial_timeout])
- DateTime.to_unix(DateTime.utc_now()) - 1
- end
- else
- defp growth_function(retries) do
- round(Pleroma.Config.get([__MODULE__, :initial_timeout]) * :math.pow(retries, 3)) +
- DateTime.to_unix(DateTime.utc_now())
- end
- end
-
- defp maybe_kickoff_timer do
- GenServer.cast(__MODULE__, :kickoff_timer)
- end
-end
diff --git a/lib/pleroma/web/mastodon_api/controllers/mastodon_api_controller.ex b/lib/pleroma/web/mastodon_api/controllers/mastodon_api_controller.ex
index c54462bb3..060137b80 100644
--- a/lib/pleroma/web/mastodon_api/controllers/mastodon_api_controller.ex
+++ b/lib/pleroma/web/mastodon_api/controllers/mastodon_api_controller.ex
@@ -6,7 +6,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
use Pleroma.Web, :controller
import Pleroma.Web.ControllerHelper,
- only: [json_response: 3, add_link_headers: 5, add_link_headers: 4, add_link_headers: 3]
+ only: [json_response: 3, add_link_headers: 2, add_link_headers: 3]
alias Ecto.Changeset
alias Pleroma.Activity
@@ -365,7 +365,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
|> Enum.reverse()
conn
- |> add_link_headers(:home_timeline, activities)
+ |> add_link_headers(activities)
|> put_view(StatusView)
|> render("index.json", %{activities: activities, for: user, as: :activity})
end
@@ -384,7 +384,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
|> Enum.reverse()
conn
- |> add_link_headers(:public_timeline, activities, false, %{"local" => local_only})
+ |> add_link_headers(activities, %{"local" => local_only})
|> put_view(StatusView)
|> render("index.json", %{activities: activities, for: user, as: :activity})
end
@@ -398,7 +398,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
activities = ActivityPub.fetch_user_activities(user, reading_user, params)
conn
- |> add_link_headers(:user_statuses, activities, params["id"])
+ |> add_link_headers(activities)
|> put_view(StatusView)
|> render("index.json", %{
activities: activities,
@@ -422,7 +422,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
|> Pagination.fetch_paginated(params)
conn
- |> add_link_headers(:dm_timeline, activities)
+ |> add_link_headers(activities)
|> put_view(StatusView)
|> render("index.json", %{activities: activities, for: user, as: :activity})
end
@@ -537,7 +537,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
def scheduled_statuses(%{assigns: %{user: user}} = conn, params) do
with scheduled_activities <- MastodonAPI.get_scheduled_activities(user, params) do
conn
- |> add_link_headers(:scheduled_statuses, scheduled_activities)
+ |> add_link_headers(scheduled_activities)
|> put_view(ScheduledActivityView)
|> render("index.json", %{scheduled_activities: scheduled_activities})
end
@@ -720,7 +720,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
notifications = MastodonAPI.get_notifications(user, params)
conn
- |> add_link_headers(:notifications, notifications)
+ |> add_link_headers(notifications)
|> put_view(NotificationView)
|> render("index.json", %{notifications: notifications, for: user})
end
@@ -842,6 +842,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
def favourited_by(%{assigns: %{user: user}} = conn, %{"id" => id}) do
with %Activity{} = activity <- Activity.get_by_id_with_object(id),
+ {:visible, true} <- {:visible, Visibility.visible_for_user?(activity, user)},
%Object{data: %{"likes" => likes}} <- Object.normalize(activity) do
q = from(u in User, where: u.ap_id in ^likes)
@@ -853,12 +854,14 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
|> put_view(AccountView)
|> render("accounts.json", %{for: user, users: users, as: :user})
else
+ {:visible, false} -> {:error, :not_found}
_ -> json(conn, [])
end
end
def reblogged_by(%{assigns: %{user: user}} = conn, %{"id" => id}) do
with %Activity{} = activity <- Activity.get_by_id_with_object(id),
+ {:visible, true} <- {:visible, Visibility.visible_for_user?(activity, user)},
%Object{data: %{"announcements" => announces}} <- Object.normalize(activity) do
q = from(u in User, where: u.ap_id in ^announces)
@@ -870,6 +873,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
|> put_view(AccountView)
|> render("accounts.json", %{for: user, users: users, as: :user})
else
+ {:visible, false} -> {:error, :not_found}
_ -> json(conn, [])
end
end
@@ -908,7 +912,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
|> Enum.reverse()
conn
- |> add_link_headers(:hashtag_timeline, activities, params["tag"], %{"local" => local_only})
+ |> add_link_headers(activities, %{"local" => local_only})
|> put_view(StatusView)
|> render("index.json", %{activities: activities, for: user, as: :activity})
end
@@ -924,7 +928,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
end
conn
- |> add_link_headers(:followers, followers, user)
+ |> add_link_headers(followers)
|> put_view(AccountView)
|> render("accounts.json", %{for: for_user, users: followers, as: :user})
end
@@ -941,7 +945,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
end
conn
- |> add_link_headers(:following, followers, user)
+ |> add_link_headers(followers)
|> put_view(AccountView)
|> render("accounts.json", %{for: for_user, users: followers, as: :user})
end
@@ -1166,7 +1170,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
|> Enum.reverse()
conn
- |> add_link_headers(:favourites, activities)
+ |> add_link_headers(activities)
|> put_view(StatusView)
|> render("index.json", %{activities: activities, for: user, as: :activity})
end
@@ -1193,7 +1197,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
|> Enum.reverse()
conn
- |> add_link_headers(:favourites, activities)
+ |> add_link_headers(activities)
|> put_view(StatusView)
|> render("index.json", %{activities: activities, for: for_user, as: :activity})
else
@@ -1214,7 +1218,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
|> Enum.map(fn b -> Map.put(b.activity, :bookmark, Map.delete(b, :activity)) end)
conn
- |> add_link_headers(:bookmarks, bookmarks)
+ |> add_link_headers(bookmarks)
|> put_view(StatusView)
|> render("index.json", %{activities: activities, for: user, as: :activity})
end
@@ -1654,7 +1658,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
end)
conn
- |> add_link_headers(:conversations, participations)
+ |> add_link_headers(participations)
|> json(conversations)
end
diff --git a/lib/pleroma/web/oauth/token/clean_worker.ex b/lib/pleroma/web/oauth/token/clean_worker.ex
index f50098302..eb94bf86f 100644
--- a/lib/pleroma/web/oauth/token/clean_worker.ex
+++ b/lib/pleroma/web/oauth/token/clean_worker.ex
@@ -17,6 +17,7 @@ defmodule Pleroma.Web.OAuth.Token.CleanWorker do
)
alias Pleroma.Web.OAuth.Token
+ alias Pleroma.Workers.BackgroundWorker
def start_link(_), do: GenServer.start_link(__MODULE__, %{})
@@ -27,9 +28,11 @@ defmodule Pleroma.Web.OAuth.Token.CleanWorker do
@doc false
def handle_info(:perform, state) do
- Token.delete_expired_tokens()
+ BackgroundWorker.enqueue("clean_expired_tokens", %{})
Process.send_after(self(), :perform, @interval)
{:noreply, state}
end
+
+ def perform(:clean), do: Token.delete_expired_tokens()
end
diff --git a/lib/pleroma/web/pleroma_api/pleroma_api_controller.ex b/lib/pleroma/web/pleroma_api/pleroma_api_controller.ex
index f4df3b024..d17ccf84d 100644
--- a/lib/pleroma/web/pleroma_api/pleroma_api_controller.ex
+++ b/lib/pleroma/web/pleroma_api/pleroma_api_controller.ex
@@ -5,7 +5,7 @@
defmodule Pleroma.Web.PleromaAPI.PleromaAPIController do
use Pleroma.Web, :controller
- import Pleroma.Web.ControllerHelper, only: [add_link_headers: 7]
+ import Pleroma.Web.ControllerHelper, only: [add_link_headers: 2]
alias Pleroma.Conversation.Participation
alias Pleroma.Notification
@@ -27,31 +27,22 @@ defmodule Pleroma.Web.PleromaAPI.PleromaAPIController do
%{assigns: %{user: user}} = conn,
%{"id" => participation_id} = params
) do
- params =
- params
- |> Map.put("blocking_user", user)
- |> Map.put("muting_user", user)
- |> Map.put("user", user)
-
- participation =
- participation_id
- |> Participation.get(preload: [:conversation])
+ participation = Participation.get(participation_id, preload: [:conversation])
if user.id == participation.user_id do
+ params =
+ params
+ |> Map.put("blocking_user", user)
+ |> Map.put("muting_user", user)
+ |> Map.put("user", user)
+
activities =
participation.conversation.ap_id
|> ActivityPub.fetch_activities_for_context(params)
|> Enum.reverse()
conn
- |> add_link_headers(
- :conversation_statuses,
- activities,
- participation_id,
- params,
- nil,
- &pleroma_api_url/4
- )
+ |> add_link_headers(activities)
|> put_view(StatusView)
|> render("index.json", %{activities: activities, for: user, as: :activity})
end
diff --git a/lib/pleroma/web/push/push.ex b/lib/pleroma/web/push/push.ex
index 729dad02a..7ef1532ac 100644
--- a/lib/pleroma/web/push/push.ex
+++ b/lib/pleroma/web/push/push.ex
@@ -3,7 +3,7 @@
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.Push do
- alias Pleroma.Web.Push.Impl
+ alias Pleroma.Workers.WebPusherWorker
require Logger
@@ -31,6 +31,7 @@ defmodule Pleroma.Web.Push do
end
end
- def send(notification),
- do: PleromaJobQueue.enqueue(:web_push, Impl, [notification])
+ def send(notification) do
+ WebPusherWorker.enqueue("web_push", %{"notification_id" => notification.id})
+ end
end
diff --git a/lib/pleroma/web/router.ex b/lib/pleroma/web/router.ex
index 7cd59acb2..b0464037e 100644
--- a/lib/pleroma/web/router.ex
+++ b/lib/pleroma/web/router.ex
@@ -224,6 +224,7 @@ defmodule Pleroma.Web.Router do
scope [] do
pipe_through(:oauth_write)
+ post("/change_email", UtilController, :change_email)
post("/change_password", UtilController, :change_password)
post("/delete_account", UtilController, :delete_account)
put("/notification_settings", UtilController, :update_notificaton_settings)
diff --git a/lib/pleroma/web/salmon/salmon.ex b/lib/pleroma/web/salmon/salmon.ex
index 9b01ebcc6..8ba7380c0 100644
--- a/lib/pleroma/web/salmon/salmon.ex
+++ b/lib/pleroma/web/salmon/salmon.ex
@@ -170,6 +170,15 @@ defmodule Pleroma.Web.Salmon do
end
end
+ def publish_one(%{recipient_id: recipient_id} = params) do
+ recipient = User.get_cached_by_id(recipient_id)
+
+ params
+ |> Map.delete(:recipient_id)
+ |> Map.put(:recipient, recipient)
+ |> publish_one()
+ end
+
def publish_one(_), do: :noop
@supported_activities [
@@ -218,7 +227,7 @@ defmodule Pleroma.Web.Salmon do
Logger.debug(fn -> "Sending Salmon to #{remote_user.ap_id}" end)
Publisher.enqueue_one(__MODULE__, %{
- recipient: remote_user,
+ recipient_id: remote_user.id,
feed: feed,
unreachable_since: reachable_urls_metadata[remote_user.info.salmon]
})
diff --git a/lib/pleroma/web/twitter_api/controllers/util_controller.ex b/lib/pleroma/web/twitter_api/controllers/util_controller.ex
index 3405bd3b7..d7745ae7a 100644
--- a/lib/pleroma/web/twitter_api/controllers/util_controller.ex
+++ b/lib/pleroma/web/twitter_api/controllers/util_controller.ex
@@ -265,12 +265,7 @@ defmodule Pleroma.Web.TwitterAPI.UtilController do
String.split(line, ",") |> List.first()
end)
|> List.delete("Account address") do
- PleromaJobQueue.enqueue(:background, User, [
- :follow_import,
- follower,
- followed_identifiers
- ])
-
+ User.follow_import(follower, followed_identifiers)
json(conn, "job started")
end
end
@@ -281,12 +276,7 @@ defmodule Pleroma.Web.TwitterAPI.UtilController do
def blocks_import(%{assigns: %{user: blocker}} = conn, %{"list" => list}) do
with blocked_identifiers <- String.split(list) do
- PleromaJobQueue.enqueue(:background, User, [
- :blocks_import,
- blocker,
- blocked_identifiers
- ])
-
+ User.blocks_import(blocker, blocked_identifiers)
json(conn, "job started")
end
end
@@ -314,6 +304,25 @@ defmodule Pleroma.Web.TwitterAPI.UtilController do
end
end
+ def change_email(%{assigns: %{user: user}} = conn, params) do
+ case CommonAPI.Utils.confirm_current_password(user, params["password"]) do
+ {:ok, user} ->
+ with {:ok, _user} <- User.change_email(user, params["email"]) do
+ json(conn, %{status: "success"})
+ else
+ {:error, changeset} ->
+ {_, {error, _}} = Enum.at(changeset.errors, 0)
+ json(conn, %{error: "Email #{error}."})
+
+ _ ->
+ json(conn, %{error: "Unable to change email."})
+ end
+
+ {:error, msg} ->
+ json(conn, %{error: msg})
+ end
+ end
+
def delete_account(%{assigns: %{user: user}} = conn, params) do
case CommonAPI.Utils.confirm_current_password(user, params["password"]) do
{:ok, user} ->
diff --git a/lib/pleroma/workers/activity_expiration_worker.ex b/lib/pleroma/workers/activity_expiration_worker.ex
new file mode 100644
index 000000000..4e3e4195f
--- /dev/null
+++ b/lib/pleroma/workers/activity_expiration_worker.ex
@@ -0,0 +1,18 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.ActivityExpirationWorker do
+ use Pleroma.Workers.WorkerHelper, queue: "activity_expiration"
+
+ @impl Oban.Worker
+ def perform(
+ %{
+ "op" => "activity_expiration",
+ "activity_expiration_id" => activity_expiration_id
+ },
+ _job
+ ) do
+ Pleroma.Daemons.ActivityExpirationDaemon.perform(:execute, activity_expiration_id)
+ end
+end
diff --git a/lib/pleroma/workers/background_worker.ex b/lib/pleroma/workers/background_worker.ex
new file mode 100644
index 000000000..082f20ab7
--- /dev/null
+++ b/lib/pleroma/workers/background_worker.ex
@@ -0,0 +1,69 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.BackgroundWorker do
+ alias Pleroma.Activity
+ alias Pleroma.User
+ alias Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy
+ alias Pleroma.Web.OAuth.Token.CleanWorker
+
+ use Pleroma.Workers.WorkerHelper, queue: "background"
+
+ @impl Oban.Worker
+ def perform(%{"op" => "fetch_initial_posts", "user_id" => user_id}, _job) do
+ user = User.get_cached_by_id(user_id)
+ User.perform(:fetch_initial_posts, user)
+ end
+
+ def perform(%{"op" => "deactivate_user", "user_id" => user_id, "status" => status}, _job) do
+ user = User.get_cached_by_id(user_id)
+ User.perform(:deactivate_async, user, status)
+ end
+
+ def perform(%{"op" => "delete_user", "user_id" => user_id}, _job) do
+ user = User.get_cached_by_id(user_id)
+ User.perform(:delete, user)
+ end
+
+ def perform(
+ %{
+ "op" => "blocks_import",
+ "blocker_id" => blocker_id,
+ "blocked_identifiers" => blocked_identifiers
+ },
+ _job
+ ) do
+ blocker = User.get_cached_by_id(blocker_id)
+ User.perform(:blocks_import, blocker, blocked_identifiers)
+ end
+
+ def perform(
+ %{
+ "op" => "follow_import",
+ "follower_id" => follower_id,
+ "followed_identifiers" => followed_identifiers
+ },
+ _job
+ ) do
+ follower = User.get_cached_by_id(follower_id)
+ User.perform(:follow_import, follower, followed_identifiers)
+ end
+
+ def perform(%{"op" => "clean_expired_tokens"}, _job) do
+ CleanWorker.perform(:clean)
+ end
+
+ def perform(%{"op" => "media_proxy_preload", "message" => message}, _job) do
+ MediaProxyWarmingPolicy.perform(:preload, message)
+ end
+
+ def perform(%{"op" => "media_proxy_prefetch", "url" => url}, _job) do
+ MediaProxyWarmingPolicy.perform(:prefetch, url)
+ end
+
+ def perform(%{"op" => "fetch_data_for_activity", "activity_id" => activity_id}, _job) do
+ activity = Activity.get_by_id(activity_id)
+ Pleroma.Web.RichMedia.Helpers.perform(:fetch, activity)
+ end
+end
diff --git a/lib/pleroma/workers/digest_emails_worker.ex b/lib/pleroma/workers/digest_emails_worker.ex
new file mode 100644
index 000000000..3e5a836d0
--- /dev/null
+++ b/lib/pleroma/workers/digest_emails_worker.ex
@@ -0,0 +1,16 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.DigestEmailsWorker do
+ alias Pleroma.User
+
+ use Pleroma.Workers.WorkerHelper, queue: "digest_emails"
+
+ @impl Oban.Worker
+ def perform(%{"op" => "digest_email", "user_id" => user_id}, _job) do
+ user_id
+ |> User.get_cached_by_id()
+ |> Pleroma.Daemons.DigestEmailDaemon.perform()
+ end
+end
diff --git a/lib/pleroma/workers/mailer_worker.ex b/lib/pleroma/workers/mailer_worker.ex
new file mode 100644
index 000000000..1b7a0eb3e
--- /dev/null
+++ b/lib/pleroma/workers/mailer_worker.ex
@@ -0,0 +1,15 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.MailerWorker do
+ use Pleroma.Workers.WorkerHelper, queue: "mailer"
+
+ @impl Oban.Worker
+ def perform(%{"op" => "email", "encoded_email" => encoded_email, "config" => config}, _job) do
+ encoded_email
+ |> Base.decode64!()
+ |> :erlang.binary_to_term()
+ |> Pleroma.Emails.Mailer.deliver(config)
+ end
+end
diff --git a/lib/pleroma/workers/publisher_worker.ex b/lib/pleroma/workers/publisher_worker.ex
new file mode 100644
index 000000000..455f7fc7e
--- /dev/null
+++ b/lib/pleroma/workers/publisher_worker.ex
@@ -0,0 +1,25 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.PublisherWorker do
+ alias Pleroma.Activity
+ alias Pleroma.Web.Federator
+
+ use Pleroma.Workers.WorkerHelper, queue: "federator_outgoing"
+
+ def backoff(attempt) when is_integer(attempt) do
+ Pleroma.Workers.WorkerHelper.sidekiq_backoff(attempt, 5)
+ end
+
+ @impl Oban.Worker
+ def perform(%{"op" => "publish", "activity_id" => activity_id}, _job) do
+ activity = Activity.get_by_id(activity_id)
+ Federator.perform(:publish, activity)
+ end
+
+ def perform(%{"op" => "publish_one", "module" => module_name, "params" => params}, _job) do
+ params = Map.new(params, fn {k, v} -> {String.to_atom(k), v} end)
+ Federator.perform(:publish_one, String.to_atom(module_name), params)
+ end
+end
diff --git a/lib/pleroma/workers/receiver_worker.ex b/lib/pleroma/workers/receiver_worker.ex
new file mode 100644
index 000000000..83d528a66
--- /dev/null
+++ b/lib/pleroma/workers/receiver_worker.ex
@@ -0,0 +1,18 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.ReceiverWorker do
+ alias Pleroma.Web.Federator
+
+ use Pleroma.Workers.WorkerHelper, queue: "federator_incoming"
+
+ @impl Oban.Worker
+ def perform(%{"op" => "incoming_doc", "body" => doc}, _job) do
+ Federator.perform(:incoming_doc, doc)
+ end
+
+ def perform(%{"op" => "incoming_ap_doc", "params" => params}, _job) do
+ Federator.perform(:incoming_ap_doc, params)
+ end
+end
diff --git a/lib/pleroma/workers/scheduled_activity_worker.ex b/lib/pleroma/workers/scheduled_activity_worker.ex
new file mode 100644
index 000000000..ca7d53af1
--- /dev/null
+++ b/lib/pleroma/workers/scheduled_activity_worker.ex
@@ -0,0 +1,12 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.ScheduledActivityWorker do
+ use Pleroma.Workers.WorkerHelper, queue: "scheduled_activities"
+
+ @impl Oban.Worker
+ def perform(%{"op" => "execute", "activity_id" => activity_id}, _job) do
+ Pleroma.Daemons.ScheduledActivityDaemon.perform(:execute, activity_id)
+ end
+end
diff --git a/lib/pleroma/workers/subscriber_worker.ex b/lib/pleroma/workers/subscriber_worker.ex
new file mode 100644
index 000000000..fc490e300
--- /dev/null
+++ b/lib/pleroma/workers/subscriber_worker.ex
@@ -0,0 +1,26 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.SubscriberWorker do
+ alias Pleroma.Repo
+ alias Pleroma.Web.Federator
+ alias Pleroma.Web.Websub
+
+ use Pleroma.Workers.WorkerHelper, queue: "federator_outgoing"
+
+ @impl Oban.Worker
+ def perform(%{"op" => "refresh_subscriptions"}, _job) do
+ Federator.perform(:refresh_subscriptions)
+ end
+
+ def perform(%{"op" => "request_subscription", "websub_id" => websub_id}, _job) do
+ websub = Repo.get(Websub.WebsubClientSubscription, websub_id)
+ Federator.perform(:request_subscription, websub)
+ end
+
+ def perform(%{"op" => "verify_websub", "websub_id" => websub_id}, _job) do
+ websub = Repo.get(Websub.WebsubServerSubscription, websub_id)
+ Federator.perform(:verify_websub, websub)
+ end
+end
diff --git a/lib/pleroma/workers/transmogrifier_worker.ex b/lib/pleroma/workers/transmogrifier_worker.ex
new file mode 100644
index 000000000..b581a2f86
--- /dev/null
+++ b/lib/pleroma/workers/transmogrifier_worker.ex
@@ -0,0 +1,15 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.TransmogrifierWorker do
+ alias Pleroma.User
+
+ use Pleroma.Workers.WorkerHelper, queue: "transmogrifier"
+
+ @impl Oban.Worker
+ def perform(%{"op" => "user_upgrade", "user_id" => user_id}, _job) do
+ user = User.get_cached_by_id(user_id)
+ Pleroma.Web.ActivityPub.Transmogrifier.perform(:user_upgrade, user)
+ end
+end
diff --git a/lib/pleroma/workers/web_pusher_worker.ex b/lib/pleroma/workers/web_pusher_worker.ex
new file mode 100644
index 000000000..bea2baffb
--- /dev/null
+++ b/lib/pleroma/workers/web_pusher_worker.ex
@@ -0,0 +1,16 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.WebPusherWorker do
+ alias Pleroma.Notification
+ alias Pleroma.Repo
+
+ use Pleroma.Workers.WorkerHelper, queue: "web_push"
+
+ @impl Oban.Worker
+ def perform(%{"op" => "web_push", "notification_id" => notification_id}, _job) do
+ notification = Repo.get(Notification, notification_id)
+ Pleroma.Web.Push.Impl.perform(notification)
+ end
+end
diff --git a/lib/pleroma/workers/worker_helper.ex b/lib/pleroma/workers/worker_helper.ex
new file mode 100644
index 000000000..358efa14a
--- /dev/null
+++ b/lib/pleroma/workers/worker_helper.ex
@@ -0,0 +1,46 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.WorkerHelper do
+ alias Pleroma.Config
+ alias Pleroma.Workers.WorkerHelper
+
+ def worker_args(queue) do
+ case Config.get([:workers, :retries, queue]) do
+ nil -> []
+ max_attempts -> [max_attempts: max_attempts]
+ end
+ end
+
+ def sidekiq_backoff(attempt, pow \\ 4, base_backoff \\ 15) do
+ backoff =
+ :math.pow(attempt, pow) +
+ base_backoff +
+ :rand.uniform(2 * base_backoff) * attempt
+
+ trunc(backoff)
+ end
+
+ defmacro __using__(opts) do
+ caller_module = __CALLER__.module
+ queue = Keyword.fetch!(opts, :queue)
+
+ quote do
+ # Note: `max_attempts` is intended to be overridden in `new/2` call
+ use Oban.Worker,
+ queue: unquote(queue),
+ max_attempts: 1
+
+ def enqueue(op, params, worker_args \\ []) do
+ params = Map.merge(%{"op" => op}, params)
+ queue_atom = String.to_atom(unquote(queue))
+ worker_args = worker_args ++ WorkerHelper.worker_args(queue_atom)
+
+ unquote(caller_module)
+ |> apply(:new, [params, worker_args])
+ |> Pleroma.Repo.insert()
+ end
+ end
+ end
+end