diff options
author | Maksim Pechnikov <parallel588@gmail.com> | 2019-09-16 07:34:22 +0300 |
---|---|---|
committer | Maksim Pechnikov <parallel588@gmail.com> | 2019-09-16 07:34:22 +0300 |
commit | d75bc728e7e309174373e234079aa4825c81ab3e (patch) | |
tree | 1ac6b44f5758ff028103c37d3712520f45a0695e /lib | |
parent | 4f548cb2b7b4a16a956a4f4a0ff64d279777925e (diff) | |
parent | 2990c0a53b14646eab19b57d068ac8aa7e17ea4e (diff) | |
download | pleroma-d75bc728e7e309174373e234079aa4825c81ab3e.tar.gz |
Merge branch 'develop' into issue/733
Diffstat (limited to 'lib')
43 files changed, 897 insertions, 764 deletions
diff --git a/lib/mix/tasks/pleroma/docs.ex b/lib/mix/tasks/pleroma/docs.ex new file mode 100644 index 000000000..0d2663648 --- /dev/null +++ b/lib/mix/tasks/pleroma/docs.ex @@ -0,0 +1,42 @@ +defmodule Mix.Tasks.Pleroma.Docs do + use Mix.Task + import Mix.Pleroma + + @shortdoc "Generates docs from descriptions.exs" + @moduledoc """ + Generates docs from `descriptions.exs`. + + Supports two formats: `markdown` and `json`. + + ## Generate Markdown docs + + `mix pleroma.docs` + + ## Generate JSON docs + + `mix pleroma.docs json` + """ + + def run(["json"]) do + do_run(Pleroma.Docs.JSON) + end + + def run(_) do + do_run(Pleroma.Docs.Markdown) + end + + defp do_run(implementation) do + start_pleroma() + + with {descriptions, _paths} <- Mix.Config.eval!("config/description.exs"), + {:ok, file_path} <- + Pleroma.Docs.Generator.process( + implementation, + descriptions[:pleroma][:config_description] + ) do + type = if implementation == Pleroma.Docs.Markdown, do: "Markdown", else: "JSON" + + Mix.shell().info([:green, "#{type} docs successfully generated to #{file_path}."]) + end + end +end diff --git a/lib/pleroma/activity.ex b/lib/pleroma/activity.ex index 44f1e3011..ec558168a 100644 --- a/lib/pleroma/activity.ex +++ b/lib/pleroma/activity.ex @@ -6,6 +6,7 @@ defmodule Pleroma.Activity do use Ecto.Schema alias Pleroma.Activity + alias Pleroma.Activity.Queries alias Pleroma.ActivityExpiration alias Pleroma.Bookmark alias Pleroma.Notification @@ -65,8 +66,8 @@ defmodule Pleroma.Activity do timestamps() end - def with_joined_object(query) do - join(query, :inner, [activity], o in Object, + def with_joined_object(query, join_type \\ :inner) do + join(query, join_type, [activity], o in Object, on: fragment( "(?->>'id') = COALESCE(?->'object'->>'id', ?->>'object')", @@ -78,10 +79,10 @@ defmodule Pleroma.Activity do ) end - def with_preloaded_object(query) do + def with_preloaded_object(query, join_type \\ :inner) do query |> has_named_binding?(:object) - |> if(do: query, else: with_joined_object(query)) + |> if(do: query, else: with_joined_object(query, join_type)) |> preload([activity, object: object], object: object) end @@ -107,12 +108,9 @@ defmodule Pleroma.Activity do def with_set_thread_muted_field(query, _), do: query def get_by_ap_id(ap_id) do - Repo.one( - from( - activity in Activity, - where: fragment("(?)->>'id' = ?", activity.data, ^to_string(ap_id)) - ) - ) + ap_id + |> Queries.by_ap_id() + |> Repo.one() end def get_bookmark(%Activity{} = activity, %User{} = user) do @@ -133,21 +131,10 @@ defmodule Pleroma.Activity do end def get_by_ap_id_with_object(ap_id) do - Repo.one( - from( - activity in Activity, - where: fragment("(?)->>'id' = ?", activity.data, ^to_string(ap_id)), - left_join: o in Object, - on: - fragment( - "(?->>'id') = COALESCE(?->'object'->>'id', ?->>'object')", - o.data, - activity.data, - activity.data - ), - preload: [object: o] - ) - ) + ap_id + |> Queries.by_ap_id() + |> with_preloaded_object(:left) + |> Repo.one() end def get_by_id(id) do @@ -158,18 +145,9 @@ defmodule Pleroma.Activity do end def get_by_id_with_object(id) do - from(activity in Activity, - where: activity.id == ^id, - inner_join: o in Object, - on: - fragment( - "(?->>'id') = COALESCE(?->'object'->>'id', ?->>'object')", - o.data, - activity.data, - activity.data - ), - preload: [object: o] - ) + Activity + |> where(id: ^id) + |> with_preloaded_object() |> Repo.one() end @@ -180,51 +158,21 @@ defmodule Pleroma.Activity do |> Repo.all() end - def by_object_ap_id(ap_id) do - from( - activity in Activity, - where: - fragment( - "coalesce((?)->'object'->>'id', (?)->>'object') = ?", - activity.data, - activity.data, - ^to_string(ap_id) - ) - ) - end - - def create_by_object_ap_id(ap_ids) when is_list(ap_ids) do - from( - activity in Activity, - where: - fragment( - "coalesce((?)->'object'->>'id', (?)->>'object') = ANY(?)", - activity.data, - activity.data, - ^ap_ids - ), - where: fragment("(?)->>'type' = 'Create'", activity.data) - ) - end - - def create_by_object_ap_id(ap_id) when is_binary(ap_id) do - from( - activity in Activity, - where: - fragment( - "coalesce((?)->'object'->>'id', (?)->>'object') = ?", - activity.data, - activity.data, - ^to_string(ap_id) - ), - where: fragment("(?)->>'type' = 'Create'", activity.data) - ) + @doc """ + Accepts `ap_id` or list of `ap_id`. + Returns a query. + """ + @spec create_by_object_ap_id(String.t() | [String.t()]) :: Ecto.Queryable.t() + def create_by_object_ap_id(ap_id) do + ap_id + |> Queries.by_object_id() + |> Queries.by_type("Create") end - def create_by_object_ap_id(_), do: nil - def get_all_create_by_object_ap_id(ap_id) do - Repo.all(create_by_object_ap_id(ap_id)) + ap_id + |> create_by_object_ap_id() + |> Repo.all() end def get_create_by_object_ap_id(ap_id) when is_binary(ap_id) do @@ -235,54 +183,17 @@ defmodule Pleroma.Activity do def get_create_by_object_ap_id(_), do: nil - def create_by_object_ap_id_with_object(ap_ids) when is_list(ap_ids) do - from( - activity in Activity, - where: - fragment( - "coalesce((?)->'object'->>'id', (?)->>'object') = ANY(?)", - activity.data, - activity.data, - ^ap_ids - ), - where: fragment("(?)->>'type' = 'Create'", activity.data), - inner_join: o in Object, - on: - fragment( - "(?->>'id') = COALESCE(?->'object'->>'id', ?->>'object')", - o.data, - activity.data, - activity.data - ), - preload: [object: o] - ) - end - - def create_by_object_ap_id_with_object(ap_id) when is_binary(ap_id) do - from( - activity in Activity, - where: - fragment( - "coalesce((?)->'object'->>'id', (?)->>'object') = ?", - activity.data, - activity.data, - ^to_string(ap_id) - ), - where: fragment("(?)->>'type' = 'Create'", activity.data), - inner_join: o in Object, - on: - fragment( - "(?->>'id') = COALESCE(?->'object'->>'id', ?->>'object')", - o.data, - activity.data, - activity.data - ), - preload: [object: o] - ) + @doc """ + Accepts `ap_id` or list of `ap_id`. + Returns a query. + """ + @spec create_by_object_ap_id_with_object(String.t() | [String.t()]) :: Ecto.Queryable.t() + def create_by_object_ap_id_with_object(ap_id) do + ap_id + |> create_by_object_ap_id() + |> with_preloaded_object() end - def create_by_object_ap_id_with_object(_), do: nil - def get_create_by_object_ap_id_with_object(ap_id) when is_binary(ap_id) do ap_id |> create_by_object_ap_id_with_object() @@ -306,7 +217,8 @@ defmodule Pleroma.Activity do def normalize(_), do: nil def delete_by_ap_id(id) when is_binary(id) do - by_object_ap_id(id) + id + |> Queries.by_object_id() |> select([u], u) |> Repo.delete_all() |> elem(1) @@ -350,31 +262,10 @@ defmodule Pleroma.Activity do end def follow_requests_for_actor(%Pleroma.User{ap_id: ap_id}) do - from( - a in Activity, - where: - fragment( - "? ->> 'type' = 'Follow'", - a.data - ), - where: - fragment( - "? ->> 'state' = 'pending'", - a.data - ), - where: - fragment( - "coalesce((?)->'object'->>'id', (?)->>'object') = ?", - a.data, - a.data, - ^ap_id - ) - ) - end - - @spec query_by_actor(actor()) :: Ecto.Query.t() - def query_by_actor(actor) do - from(a in Activity, where: a.actor == ^actor) + ap_id + |> Queries.by_object_id() + |> Queries.by_type("Follow") + |> where([a], fragment("? ->> 'state' = 'pending'", a.data)) end def restrict_deactivated_users(query) do diff --git a/lib/pleroma/activity/queries.ex b/lib/pleroma/activity/queries.ex index aa5b29566..13fa33831 100644 --- a/lib/pleroma/activity/queries.ex +++ b/lib/pleroma/activity/queries.ex @@ -13,6 +13,14 @@ defmodule Pleroma.Activity.Queries do alias Pleroma.Activity + @spec by_ap_id(query, String.t()) :: query + def by_ap_id(query \\ Activity, ap_id) do + from( + activity in query, + where: fragment("(?)->>'id' = ?", activity.data, ^to_string(ap_id)) + ) + end + @spec by_actor(query, String.t()) :: query def by_actor(query \\ Activity, actor) do from( @@ -21,8 +29,23 @@ defmodule Pleroma.Activity.Queries do ) end - @spec by_object_id(query, String.t()) :: query - def by_object_id(query \\ Activity, object_id) do + @spec by_object_id(query, String.t() | [String.t()]) :: query + def by_object_id(query \\ Activity, object_id) + + def by_object_id(query, object_ids) when is_list(object_ids) do + from( + activity in query, + where: + fragment( + "coalesce((?)->'object'->>'id', (?)->>'object') = ANY(?)", + activity.data, + activity.data, + ^object_ids + ) + ) + end + + def by_object_id(query, object_id) when is_binary(object_id) do from(activity in query, where: fragment( @@ -41,9 +64,4 @@ defmodule Pleroma.Activity.Queries do where: fragment("(?)->>'type' = ?", activity.data, ^activity_type) ) end - - @spec limit(query, pos_integer()) :: query - def limit(query \\ Activity, limit) do - from(activity in query, limit: ^limit) - end end diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex index 1d46925f8..49094704b 100644 --- a/lib/pleroma/application.ex +++ b/lib/pleroma/application.ex @@ -31,18 +31,19 @@ defmodule Pleroma.Application do children = [ Pleroma.Repo, + Pleroma.Scheduler, Pleroma.Config.TransferTask, Pleroma.Emoji, Pleroma.Captcha, Pleroma.FlakeId, - Pleroma.ScheduledActivityWorker, - Pleroma.ActivityExpirationWorker + Pleroma.Daemons.ScheduledActivityDaemon, + Pleroma.Daemons.ActivityExpirationDaemon ] ++ cachex_children() ++ hackney_pool_children() ++ [ - Pleroma.Web.Federator.RetryQueue, Pleroma.Stats, + {Oban, Pleroma.Config.get(Oban)}, %{ id: :web_push_init, start: {Task, :start_link, [&Pleroma.Web.Push.init/0]}, @@ -70,9 +71,7 @@ defmodule Pleroma.Application do # See http://elixir-lang.org/docs/stable/elixir/Supervisor.html # for other strategies and supported options opts = [strategy: :one_for_one, name: Pleroma.Supervisor] - result = Supervisor.start_link(children, opts) - :ok = after_supervisor_start() - result + Supervisor.start_link(children, opts) end defp setup_instrumenters do @@ -164,17 +163,4 @@ defmodule Pleroma.Application do :hackney_pool.child_spec(pool, options) end end - - defp after_supervisor_start do - with digest_config <- Application.get_env(:pleroma, :email_notifications)[:digest], - true <- digest_config[:active] do - PleromaJobQueue.schedule( - digest_config[:schedule], - :digest_emails, - Pleroma.DigestEmailWorker - ) - end - - :ok - end end diff --git a/lib/pleroma/activity_expiration_worker.ex b/lib/pleroma/daemons/activity_expiration_daemon.ex index 0f9e715f8..cab7628c4 100644 --- a/lib/pleroma/activity_expiration_worker.ex +++ b/lib/pleroma/daemons/activity_expiration_daemon.ex @@ -2,13 +2,14 @@ # Copyright © 2019 Pleroma Authors <https://pleroma.social/> # SPDX-License-Identifier: AGPL-3.0-only -defmodule Pleroma.ActivityExpirationWorker do +defmodule Pleroma.Daemons.ActivityExpirationDaemon do alias Pleroma.Activity alias Pleroma.ActivityExpiration alias Pleroma.Config alias Pleroma.Repo alias Pleroma.User alias Pleroma.Web.CommonAPI + require Logger use GenServer import Ecto.Query @@ -49,7 +50,10 @@ defmodule Pleroma.ActivityExpirationWorker do def handle_info(:perform, state) do ActivityExpiration.due_expirations(@schedule_interval) |> Enum.each(fn expiration -> - PleromaJobQueue.enqueue(:activity_expiration, __MODULE__, [:execute, expiration.id]) + Pleroma.Workers.ActivityExpirationWorker.enqueue( + "activity_expiration", + %{"activity_expiration_id" => expiration.id} + ) end) schedule_next() diff --git a/lib/pleroma/digest_email_worker.ex b/lib/pleroma/daemons/digest_email_daemon.ex index 5644d6a67..462ad2c55 100644 --- a/lib/pleroma/digest_email_worker.ex +++ b/lib/pleroma/daemons/digest_email_daemon.ex @@ -2,10 +2,11 @@ # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> # SPDX-License-Identifier: AGPL-3.0-only -defmodule Pleroma.DigestEmailWorker do - import Ecto.Query +defmodule Pleroma.Daemons.DigestEmailDaemon do + alias Pleroma.Repo + alias Pleroma.Workers.DigestEmailsWorker - @queue_name :digest_emails + import Ecto.Query def perform do config = Pleroma.Config.get([:email_notifications, :digest]) @@ -20,8 +21,10 @@ defmodule Pleroma.DigestEmailWorker do where: u.last_digest_emailed_at < datetime_add(^now, ^negative_interval, "day"), select: u ) - |> Pleroma.Repo.all() - |> Enum.each(&PleromaJobQueue.enqueue(@queue_name, __MODULE__, [&1])) + |> Repo.all() + |> Enum.each(fn user -> + DigestEmailsWorker.enqueue("digest_email", %{"user_id" => user.id}) + end) end @doc """ diff --git a/lib/pleroma/scheduled_activity_worker.ex b/lib/pleroma/daemons/scheduled_activity_daemon.ex index 8578cab5e..aee5f723a 100644 --- a/lib/pleroma/scheduled_activity_worker.ex +++ b/lib/pleroma/daemons/scheduled_activity_daemon.ex @@ -2,7 +2,7 @@ # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> # SPDX-License-Identifier: AGPL-3.0-only -defmodule Pleroma.ScheduledActivityWorker do +defmodule Pleroma.Daemons.ScheduledActivityDaemon do @moduledoc """ Sends scheduled activities to the job queue. """ @@ -11,6 +11,7 @@ defmodule Pleroma.ScheduledActivityWorker do alias Pleroma.ScheduledActivity alias Pleroma.User alias Pleroma.Web.CommonAPI + use GenServer require Logger @@ -45,7 +46,10 @@ defmodule Pleroma.ScheduledActivityWorker do def handle_info(:perform, state) do ScheduledActivity.due_activities(@schedule_interval) |> Enum.each(fn scheduled_activity -> - PleromaJobQueue.enqueue(:scheduled_activities, __MODULE__, [:execute, scheduled_activity.id]) + Pleroma.Workers.ScheduledActivityWorker.enqueue( + "execute", + %{"activity_id" => scheduled_activity.id} + ) end) schedule_next() diff --git a/lib/pleroma/docs/generator.ex b/lib/pleroma/docs/generator.ex new file mode 100644 index 000000000..aa578eee2 --- /dev/null +++ b/lib/pleroma/docs/generator.ex @@ -0,0 +1,73 @@ +defmodule Pleroma.Docs.Generator do + @callback process(keyword()) :: {:ok, String.t()} + + @spec process(module(), keyword()) :: {:ok, String.t()} + def process(implementation, descriptions) do + implementation.process(descriptions) + end + + @spec uploaders_list() :: [module()] + def uploaders_list do + {:ok, modules} = :application.get_key(:pleroma, :modules) + + Enum.filter(modules, fn module -> + name_as_list = Module.split(module) + + List.starts_with?(name_as_list, ["Pleroma", "Uploaders"]) and + List.last(name_as_list) != "Uploader" + end) + end + + @spec filters_list() :: [module()] + def filters_list do + {:ok, modules} = :application.get_key(:pleroma, :modules) + + Enum.filter(modules, fn module -> + name_as_list = Module.split(module) + + List.starts_with?(name_as_list, ["Pleroma", "Upload", "Filter"]) + end) + end + + @spec mrf_list() :: [module()] + def mrf_list do + {:ok, modules} = :application.get_key(:pleroma, :modules) + + Enum.filter(modules, fn module -> + name_as_list = Module.split(module) + + List.starts_with?(name_as_list, ["Pleroma", "Web", "ActivityPub", "MRF"]) and + length(name_as_list) > 4 + end) + end + + @spec richmedia_parsers() :: [module()] + def richmedia_parsers do + {:ok, modules} = :application.get_key(:pleroma, :modules) + + Enum.filter(modules, fn module -> + name_as_list = Module.split(module) + + List.starts_with?(name_as_list, ["Pleroma", "Web", "RichMedia", "Parsers"]) and + length(name_as_list) == 5 + end) + end +end + +defimpl Jason.Encoder, for: Tuple do + def encode(tuple, opts) do + Jason.Encode.list(Tuple.to_list(tuple), opts) + end +end + +defimpl Jason.Encoder, for: [Regex, Function] do + def encode(term, opts) do + Jason.Encode.string(inspect(term), opts) + end +end + +defimpl String.Chars, for: Regex do + def to_string(term) do + inspect(term) + end +end diff --git a/lib/pleroma/docs/json.ex b/lib/pleroma/docs/json.ex new file mode 100644 index 000000000..18ba01d58 --- /dev/null +++ b/lib/pleroma/docs/json.ex @@ -0,0 +1,20 @@ +defmodule Pleroma.Docs.JSON do + @behaviour Pleroma.Docs.Generator + + @spec process(keyword()) :: {:ok, String.t()} + def process(descriptions) do + config_path = "docs/generate_config.json" + + with {:ok, file} <- File.open(config_path, [:write]), + json <- generate_json(descriptions), + :ok <- IO.write(file, json), + :ok <- File.close(file) do + {:ok, config_path} + end + end + + @spec generate_json([keyword()]) :: String.t() + def generate_json(descriptions) do + Jason.encode!(descriptions) + end +end diff --git a/lib/pleroma/docs/markdown.ex b/lib/pleroma/docs/markdown.ex new file mode 100644 index 000000000..8386dc2fb --- /dev/null +++ b/lib/pleroma/docs/markdown.ex @@ -0,0 +1,78 @@ +defmodule Pleroma.Docs.Markdown do + @behaviour Pleroma.Docs.Generator + + @spec process(keyword()) :: {:ok, String.t()} + def process(descriptions) do + config_path = "docs/generated_config.md" + {:ok, file} = File.open(config_path, [:utf8, :write]) + IO.write(file, "# Generated configuration\n") + IO.write(file, "Date of generation: #{Date.utc_today()}\n\n") + + IO.write( + file, + "This file describe the configuration, it is recommended to edit the relevant `*.secret.exs` file instead of the others founds in the ``config`` directory.\n\n" <> + "If you run Pleroma with ``MIX_ENV=prod`` the file is ``prod.secret.exs``, otherwise it is ``dev.secret.exs``.\n\n" + ) + + for group <- descriptions do + if is_nil(group[:key]) do + IO.write(file, "## #{inspect(group[:group])}\n") + else + IO.write(file, "## #{inspect(group[:key])}\n") + end + + IO.write(file, "#{group[:description]}\n") + + for child <- group[:children] do + print_child_header(file, child) + + print_suggestions(file, child[:suggestions]) + + if child[:children] do + for subchild <- child[:children] do + print_child_header(file, subchild) + + print_suggestions(file, subchild[:suggestions]) + end + end + end + + IO.write(file, "\n") + end + + :ok = File.close(file) + {:ok, config_path} + end + + defp print_suggestion(file, suggestion) when is_list(suggestion) do + IO.write(file, " `#{inspect(suggestion)}`\n") + end + + defp print_suggestion(file, suggestion) when is_function(suggestion) do + IO.write(file, " `#{inspect(suggestion.())}`\n") + end + + defp print_suggestion(file, suggestion, as_list \\ false) do + list_mark = if as_list, do: "- ", else: "" + IO.write(file, " #{list_mark}`#{inspect(suggestion)}`\n") + end + + defp print_suggestions(_file, nil), do: nil + + defp print_suggestions(file, suggestions) do + IO.write(file, "Suggestions:\n") + + if length(suggestions) > 1 do + for suggestion <- suggestions do + print_suggestion(file, suggestion, true) + end + else + print_suggestion(file, List.first(suggestions)) + end + end + + defp print_child_header(file, child) do + IO.write(file, "- `#{inspect(child[:key])}` -`#{inspect(child[:type])}` \n") + IO.write(file, "#{child[:description]} \n") + end +end diff --git a/lib/pleroma/emails/mailer.ex b/lib/pleroma/emails/mailer.ex index 2e4657b7c..eb96f2e8b 100644 --- a/lib/pleroma/emails/mailer.ex +++ b/lib/pleroma/emails/mailer.ex @@ -9,6 +9,7 @@ defmodule Pleroma.Emails.Mailer do The module contains functions to delivery email using Swoosh.Mailer. """ + alias Pleroma.Workers.MailerWorker alias Swoosh.DeliveryError @otp_app :pleroma @@ -19,7 +20,12 @@ defmodule Pleroma.Emails.Mailer do @doc "add email to queue" def deliver_async(email, config \\ []) do - PleromaJobQueue.enqueue(:mailer, __MODULE__, [:deliver_async, email, config]) + encoded_email = + email + |> :erlang.term_to_binary() + |> Base.encode64() + + MailerWorker.enqueue("email", %{"encoded_email" => encoded_email, "config" => config}) end @doc "callback to perform send email from queue" diff --git a/lib/pleroma/instances/instance.ex b/lib/pleroma/instances/instance.ex index 4d7ed4ca1..544c4b687 100644 --- a/lib/pleroma/instances/instance.ex +++ b/lib/pleroma/instances/instance.ex @@ -90,7 +90,7 @@ defmodule Pleroma.Instances.Instance do def set_unreachable(url_or_host, unreachable_since \\ nil) def set_unreachable(url_or_host, unreachable_since) when is_binary(url_or_host) do - unreachable_since = unreachable_since || DateTime.utc_now() + unreachable_since = parse_datetime(unreachable_since) || NaiveDateTime.utc_now() host = host(url_or_host) existing_record = Repo.get_by(Instance, %{host: host}) @@ -114,4 +114,10 @@ defmodule Pleroma.Instances.Instance do end def set_unreachable(_, _), do: {:error, nil} + + defp parse_datetime(datetime) when is_binary(datetime) do + NaiveDateTime.from_iso8601(datetime) + end + + defp parse_datetime(datetime), do: datetime end diff --git a/lib/pleroma/scheduler.ex b/lib/pleroma/scheduler.ex new file mode 100644 index 000000000..d84cd99ad --- /dev/null +++ b/lib/pleroma/scheduler.ex @@ -0,0 +1,7 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Scheduler do + use Quantum.Scheduler, otp_app: :pleroma +end diff --git a/lib/pleroma/user.ex b/lib/pleroma/user.ex index 3aa245f2a..f0306652c 100644 --- a/lib/pleroma/user.ex +++ b/lib/pleroma/user.ex @@ -27,6 +27,7 @@ defmodule Pleroma.User do alias Pleroma.Web.OStatus alias Pleroma.Web.RelMe alias Pleroma.Web.Websub + alias Pleroma.Workers.BackgroundWorker require Logger @@ -174,11 +175,25 @@ defmodule Pleroma.User do |> Repo.aggregate(:count, :id) end + defp truncate_if_exists(params, key, max_length) do + if Map.has_key?(params, key) and is_binary(params[key]) do + {value, _chopped} = String.split_at(params[key], max_length) + Map.put(params, key, value) + else + params + end + end + def remote_user_creation(params) do bio_limit = Pleroma.Config.get([:instance, :user_bio_length], 5000) name_limit = Pleroma.Config.get([:instance, :user_name_length], 100) - params = Map.put(params, :info, params[:info] || %{}) + params = + params + |> Map.put(:info, params[:info] || %{}) + |> truncate_if_exists(:name, name_limit) + |> truncate_if_exists(:bio, bio_limit) + info_cng = User.Info.remote_user_creation(%User.Info{}, params[:info]) changes = @@ -633,8 +648,9 @@ defmodule Pleroma.User do end @doc "Fetch some posts when the user has just been federated with" - def fetch_initial_posts(user), - do: PleromaJobQueue.enqueue(:background, __MODULE__, [:fetch_initial_posts, user]) + def fetch_initial_posts(user) do + BackgroundWorker.enqueue("fetch_initial_posts", %{"user_id" => user.id}) + end @spec get_followers_query(User.t(), pos_integer() | nil) :: Ecto.Query.t() def get_followers_query(%User{} = user, nil) do @@ -1064,7 +1080,7 @@ defmodule Pleroma.User do end def deactivate_async(user, status \\ true) do - PleromaJobQueue.enqueue(:background, __MODULE__, [:deactivate_async, user, status]) + BackgroundWorker.enqueue("deactivate_user", %{"user_id" => user.id, "status" => status}) end def deactivate(%User{} = user, status \\ true) do @@ -1092,9 +1108,9 @@ defmodule Pleroma.User do |> update_and_set_cache() end - @spec delete(User.t()) :: :ok - def delete(%User{} = user), - do: PleromaJobQueue.enqueue(:background, __MODULE__, [:delete, user]) + def delete(%User{} = user) do + BackgroundWorker.enqueue("delete_user", %{"user_id" => user.id}) + end @spec perform(atom(), User.t()) :: {:ok, User.t()} def perform(:delete, %User{} = user) do @@ -1201,25 +1217,24 @@ defmodule Pleroma.User do Repo.all(query) end - def blocks_import(%User{} = blocker, blocked_identifiers) when is_list(blocked_identifiers), - do: - PleromaJobQueue.enqueue(:background, __MODULE__, [ - :blocks_import, - blocker, - blocked_identifiers - ]) + def blocks_import(%User{} = blocker, blocked_identifiers) when is_list(blocked_identifiers) do + BackgroundWorker.enqueue("blocks_import", %{ + "blocker_id" => blocker.id, + "blocked_identifiers" => blocked_identifiers + }) + end - def follow_import(%User{} = follower, followed_identifiers) when is_list(followed_identifiers), - do: - PleromaJobQueue.enqueue(:background, __MODULE__, [ - :follow_import, - follower, - followed_identifiers - ]) + def follow_import(%User{} = follower, followed_identifiers) + when is_list(followed_identifiers) do + BackgroundWorker.enqueue("follow_import", %{ + "follower_id" => follower.id, + "followed_identifiers" => followed_identifiers + }) + end def delete_user_activities(%User{ap_id: ap_id} = user) do ap_id - |> Activity.query_by_actor() + |> Activity.Queries.by_actor() |> RepoStreamer.chunk_stream(50) |> Stream.each(fn activities -> Enum.each(activities, &delete_activity(&1)) @@ -1624,4 +1639,13 @@ defmodule Pleroma.User do def is_internal_user?(%User{nickname: nil}), do: true def is_internal_user?(%User{local: true, nickname: "internal." <> _}), do: true def is_internal_user?(_), do: false + + def change_email(user, email) do + user + |> cast(%{email: email}, [:email]) + |> validate_required([:email]) + |> unique_constraint(:email) + |> validate_format(:email, @email_regex) + |> update_and_set_cache() + end end diff --git a/lib/pleroma/user/info.ex b/lib/pleroma/user/info.ex index 779bfbc18..151e025de 100644 --- a/lib/pleroma/user/info.ex +++ b/lib/pleroma/user/info.ex @@ -242,6 +242,13 @@ defmodule Pleroma.User.Info do end def remote_user_creation(info, params) do + params = + if Map.has_key?(params, :fields) do + Map.put(params, :fields, Enum.map(params[:fields], &truncate_field/1)) + else + params + end + info |> cast(params, [ :ap_enabled, @@ -326,6 +333,16 @@ defmodule Pleroma.User.Info do defp valid_field?(_), do: false + defp truncate_field(%{"name" => name, "value" => value}) do + {name, _chopped} = + String.split_at(name, Pleroma.Config.get([:instance, :account_field_name_length], 255)) + + {value, _chopped} = + String.split_at(value, Pleroma.Config.get([:instance, :account_field_value_length], 255)) + + %{"name" => name, "value" => value} + end + @spec confirmation_changeset(Info.t(), keyword()) :: Changeset.t() def confirmation_changeset(info, opts) do need_confirmation? = Keyword.get(opts, :need_confirmation) diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index d23ec66ac..41f6a0f1f 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -17,6 +17,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do alias Pleroma.Web.ActivityPub.MRF alias Pleroma.Web.ActivityPub.Transmogrifier alias Pleroma.Web.WebFinger + alias Pleroma.Workers.BackgroundWorker import Ecto.Query import Pleroma.Web.ActivityPub.Utils @@ -145,7 +146,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do activity end - PleromaJobQueue.enqueue(:background, Pleroma.Web.RichMedia.Helpers, [:fetch, activity]) + BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id}) Notification.create_notifications(activity) diff --git a/lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex b/lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex index a179dd54d..26b8539fe 100644 --- a/lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex +++ b/lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex @@ -8,6 +8,7 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do alias Pleroma.HTTP alias Pleroma.Web.MediaProxy + alias Pleroma.Workers.BackgroundWorker require Logger @@ -30,7 +31,7 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do url |> Enum.each(fn %{"href" => href} -> - PleromaJobQueue.enqueue(:background, __MODULE__, [:prefetch, href]) + BackgroundWorker.enqueue("media_proxy_prefetch", %{"url" => href}) x -> Logger.debug("Unhandled attachment URL object #{inspect(x)}") @@ -46,7 +47,7 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do %{"type" => "Create", "object" => %{"attachment" => attachments} = _object} = message ) when is_list(attachments) and length(attachments) > 0 do - PleromaJobQueue.enqueue(:background, __MODULE__, [:preload, message]) + BackgroundWorker.enqueue("media_proxy_preload", %{"message" => message}) {:ok, message} end diff --git a/lib/pleroma/web/activity_pub/publisher.ex b/lib/pleroma/web/activity_pub/publisher.ex index c97405690..a6322e25a 100644 --- a/lib/pleroma/web/activity_pub/publisher.ex +++ b/lib/pleroma/web/activity_pub/publisher.ex @@ -84,6 +84,15 @@ defmodule Pleroma.Web.ActivityPub.Publisher do end end + def publish_one(%{actor_id: actor_id} = params) do + actor = User.get_cached_by_id(actor_id) + + params + |> Map.delete(:actor_id) + |> Map.put(:actor, actor) + |> publish_one() + end + defp should_federate?(inbox, public) do if public do true @@ -159,7 +168,8 @@ defmodule Pleroma.Web.ActivityPub.Publisher do Publishes an activity with BCC to all relevant peers. """ - def publish(actor, %{data: %{"bcc" => bcc}} = activity) when is_list(bcc) and bcc != [] do + def publish(%User{} = actor, %{data: %{"bcc" => bcc}} = activity) + when is_list(bcc) and bcc != [] do public = is_public?(activity) {:ok, data} = Transmogrifier.prepare_outgoing(activity.data) @@ -186,7 +196,7 @@ defmodule Pleroma.Web.ActivityPub.Publisher do Pleroma.Web.Federator.Publisher.enqueue_one(__MODULE__, %{ inbox: inbox, json: json, - actor: actor, + actor_id: actor.id, id: activity.data["id"], unreachable_since: unreachable_since }) @@ -221,7 +231,7 @@ defmodule Pleroma.Web.ActivityPub.Publisher do %{ inbox: inbox, json: json, - actor: actor, + actor_id: actor.id, id: activity.data["id"], unreachable_since: unreachable_since } diff --git a/lib/pleroma/web/activity_pub/transmogrifier.ex b/lib/pleroma/web/activity_pub/transmogrifier.ex index acd61bda3..2fbb16623 100644 --- a/lib/pleroma/web/activity_pub/transmogrifier.ex +++ b/lib/pleroma/web/activity_pub/transmogrifier.ex @@ -15,6 +15,7 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do alias Pleroma.Web.ActivityPub.Utils alias Pleroma.Web.ActivityPub.Visibility alias Pleroma.Web.Federator + alias Pleroma.Workers.TransmogrifierWorker import Ecto.Query @@ -185,12 +186,12 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do |> Map.put("context", replied_object.data["context"] || object["conversation"]) else e -> - Logger.error("Couldn't fetch \"#{inspect(in_reply_to_id)}\", error: #{inspect(e)}") + Logger.error("Couldn't fetch #{inspect(in_reply_to_id)}, error: #{inspect(e)}") object end e -> - Logger.error("Couldn't fetch \"#{inspect(in_reply_to_id)}\", error: #{inspect(e)}") + Logger.error("Couldn't fetch #{inspect(in_reply_to_id)}, error: #{inspect(e)}") object end else @@ -1051,7 +1052,7 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do already_ap <- User.ap_enabled?(user), {:ok, user} <- upgrade_user(user, data) do if not already_ap do - PleromaJobQueue.enqueue(:transmogrifier, __MODULE__, [:user_upgrade, user]) + TransmogrifierWorker.enqueue("user_upgrade", %{"user_id" => user.id}) end {:ok, user} diff --git a/lib/pleroma/web/activity_pub/utils.ex b/lib/pleroma/web/activity_pub/utils.ex index c9c0c3763..258e56066 100644 --- a/lib/pleroma/web/activity_pub/utils.ex +++ b/lib/pleroma/web/activity_pub/utils.ex @@ -85,15 +85,13 @@ defmodule Pleroma.Web.ActivityPub.Utils do defp extract_list(_), do: [] def maybe_splice_recipient(ap_id, params) do - need_splice = + need_splice? = !recipient_in_collection(ap_id, params["to"]) && !recipient_in_collection(ap_id, params["cc"]) - cc_list = extract_list(params["cc"]) - - if need_splice do - params - |> Map.put("cc", [ap_id | cc_list]) + if need_splice? do + cc_list = extract_list(params["cc"]) + Map.put(params, "cc", [ap_id | cc_list]) else params end @@ -139,7 +137,7 @@ defmodule Pleroma.Web.ActivityPub.Utils do "object" => object } - Notification.get_notified_from_activity(%Activity{data: fake_create_activity}, false) + get_notified_from_object(fake_create_activity) end def get_notified_from_object(object) do @@ -169,14 +167,7 @@ defmodule Pleroma.Web.ActivityPub.Utils do @spec maybe_federate(any()) :: :ok def maybe_federate(%Activity{local: true} = activity) do if Pleroma.Config.get!([:instance, :federating]) do - priority = - case activity.data["type"] do - "Delete" -> 10 - "Create" -> 1 - _ -> 5 - end - - Pleroma.Web.Federator.publish(activity, priority) + Pleroma.Web.Federator.publish(activity) end :ok @@ -188,9 +179,9 @@ defmodule Pleroma.Web.ActivityPub.Utils do Adds an id and a published data if they aren't there, also adds it to an included object """ - def lazy_put_activity_defaults(map, fake \\ false) do + def lazy_put_activity_defaults(map, fake? \\ false) do map = - unless fake do + if not fake? do %{data: %{"id" => context}, id: context_id} = create_context(map["context"]) map @@ -207,7 +198,7 @@ defmodule Pleroma.Web.ActivityPub.Utils do end if is_map(map["object"]) do - object = lazy_put_object_defaults(map["object"], map, fake) + object = lazy_put_object_defaults(map["object"], map, fake?) %{map | "object" => object} else map @@ -217,9 +208,9 @@ defmodule Pleroma.Web.ActivityPub.Utils do @doc """ Adds an id and published date if they aren't there. """ - def lazy_put_object_defaults(map, activity \\ %{}, fake) + def lazy_put_object_defaults(map, activity \\ %{}, fake?) - def lazy_put_object_defaults(map, activity, true = _fake) do + def lazy_put_object_defaults(map, activity, true = _fake?) do map |> Map.put_new_lazy("published", &make_date/0) |> Map.put_new("id", "pleroma:fake_object_id") @@ -228,7 +219,7 @@ defmodule Pleroma.Web.ActivityPub.Utils do |> Map.put_new("context_id", activity["context_id"]) end - def lazy_put_object_defaults(map, activity, _fake) do + def lazy_put_object_defaults(map, activity, _fake?) do map |> Map.put_new_lazy("id", &generate_object_id/0) |> Map.put_new_lazy("published", &make_date/0) @@ -242,9 +233,7 @@ defmodule Pleroma.Web.ActivityPub.Utils do def insert_full_object(%{"object" => %{"type" => type} = object_data} = map) when is_map(object_data) and type in @supported_object_types do with {:ok, object} <- Object.create(object_data) do - map = - map - |> Map.put("object", object.data["id"]) + map = Map.put(map, "object", object.data["id"]) {:ok, map, object} end @@ -263,7 +252,7 @@ defmodule Pleroma.Web.ActivityPub.Utils do |> Activity.Queries.by_actor() |> Activity.Queries.by_object_id(id) |> Activity.Queries.by_type("Like") - |> Activity.Queries.limit(1) + |> limit(1) |> Repo.one() end @@ -380,12 +369,11 @@ defmodule Pleroma.Web.ActivityPub.Utils do %Activity{data: %{"actor" => actor, "object" => object}} = activity, state ) do - with new_data <- - activity.data - |> Map.put("state", state), - changeset <- Changeset.change(activity, data: new_data), - {:ok, activity} <- Repo.update(changeset), - _ <- User.set_follow_state_cache(actor, object, state) do + new_data = Map.put(activity.data, "state", state) + changeset = Changeset.change(activity, data: new_data) + + with {:ok, activity} <- Repo.update(changeset) do + User.set_follow_state_cache(actor, object, state) {:ok, activity} end end @@ -410,28 +398,14 @@ defmodule Pleroma.Web.ActivityPub.Utils do end def fetch_latest_follow(%User{ap_id: follower_id}, %User{ap_id: followed_id}) do - query = - from( - activity in Activity, - where: - fragment( - "? ->> 'type' = 'Follow'", - activity.data - ), - where: activity.actor == ^follower_id, - # this is to use the index - where: - fragment( - "coalesce((?)->'object'->>'id', (?)->>'object') = ?", - activity.data, - activity.data, - ^followed_id - ), - order_by: [fragment("? desc nulls last", activity.id)], - limit: 1 - ) - - Repo.one(query) + "Follow" + |> Activity.Queries.by_type() + |> where(actor: ^follower_id) + # this is to use the index + |> Activity.Queries.by_object_id(followed_id) + |> order_by([activity], fragment("? desc nulls last", activity.id)) + |> limit(1) + |> Repo.one() end #### Announce-related helpers @@ -439,23 +413,13 @@ defmodule Pleroma.Web.ActivityPub.Utils do @doc """ Retruns an existing announce activity if the notice has already been announced """ - def get_existing_announce(actor, %{data: %{"id" => id}}) do - query = - from( - activity in Activity, - where: activity.actor == ^actor, - # this is to use the index - where: - fragment( - "coalesce((?)->'object'->>'id', (?)->>'object') = ?", - activity.data, - activity.data, - ^id - ), - where: fragment("(?)->>'type' = 'Announce'", activity.data) - ) - - Repo.one(query) + def get_existing_announce(actor, %{data: %{"id" => ap_id}}) do + "Announce" + |> Activity.Queries.by_type() + |> where(actor: ^actor) + # this is to use the index + |> Activity.Queries.by_object_id(ap_id) + |> Repo.one() end @doc """ @@ -538,11 +502,13 @@ defmodule Pleroma.Web.ActivityPub.Utils do object ) do announcements = - if is_list(object.data["announcements"]), do: object.data["announcements"], else: [] + if is_list(object.data["announcements"]) do + Enum.uniq([actor | object.data["announcements"]]) + else + [actor] + end - with announcements <- [actor | announcements] |> Enum.uniq() do - update_element_in_object("announcement", announcements, object) - end + update_element_in_object("announcement", announcements, object) end def add_announce_to_object(_, object), do: {:ok, object} @@ -570,28 +536,14 @@ defmodule Pleroma.Web.ActivityPub.Utils do #### Block-related helpers def fetch_latest_block(%User{ap_id: blocker_id}, %User{ap_id: blocked_id}) do - query = - from( - activity in Activity, - where: - fragment( - "? ->> 'type' = 'Block'", - activity.data - ), - where: activity.actor == ^blocker_id, - # this is to use the index - where: - fragment( - "coalesce((?)->'object'->>'id', (?)->>'object') = ?", - activity.data, - activity.data, - ^blocked_id - ), - order_by: [fragment("? desc nulls last", activity.id)], - limit: 1 - ) - - Repo.one(query) + "Block" + |> Activity.Queries.by_type() + |> where(actor: ^blocker_id) + # this is to use the index + |> Activity.Queries.by_object_id(blocked_id) + |> order_by([activity], fragment("? desc nulls last", activity.id)) + |> limit(1) + |> Repo.one() end def make_block_data(blocker, blocked, activity_id) do @@ -695,11 +647,11 @@ defmodule Pleroma.Web.ActivityPub.Utils do #### Report-related helpers def update_report_state(%Activity{} = activity, state) when state in @supported_report_states do - with new_data <- Map.put(activity.data, "state", state), - changeset <- Changeset.change(activity, data: new_data), - {:ok, activity} <- Repo.update(changeset) do - {:ok, activity} - end + new_data = Map.put(activity.data, "state", state) + + activity + |> Changeset.change(data: new_data) + |> Repo.update() end def update_report_state(_, _), do: {:error, "Unsupported state"} @@ -766,21 +718,13 @@ defmodule Pleroma.Web.ActivityPub.Utils do end def get_existing_votes(actor, %{data: %{"id" => id}}) do - query = - from( - [activity, object: object] in Activity.with_preloaded_object(Activity), - where: fragment("(?)->>'type' = 'Create'", activity.data), - where: fragment("(?)->>'actor' = ?", activity.data, ^actor), - where: - fragment( - "(?)->>'inReplyTo' = ?", - object.data, - ^to_string(id) - ), - where: fragment("(?)->>'type' = 'Answer'", object.data) - ) - - Repo.all(query) + actor + |> Activity.Queries.by_actor() + |> Activity.Queries.by_type("Create") + |> Activity.with_preloaded_object() + |> where([a, object: o], fragment("(?)->>'inReplyTo' = ?", o.data, ^to_string(id))) + |> where([a, object: o], fragment("(?)->>'type' = 'Answer'", o.data)) + |> Repo.all() end defp maybe_put(map, _key, nil), do: map diff --git a/lib/pleroma/web/admin_api/config.ex b/lib/pleroma/web/admin_api/config.ex index a10cc779b..1917a5580 100644 --- a/lib/pleroma/web/admin_api/config.ex +++ b/lib/pleroma/web/admin_api/config.ex @@ -90,6 +90,8 @@ defmodule Pleroma.Web.AdminAPI.Config do for v <- entity, into: [], do: do_convert(v) end + defp do_convert(%Regex{} = entity), do: inspect(entity) + defp do_convert(entity) when is_map(entity) do for {k, v} <- entity, into: %{}, do: {do_convert(k), do_convert(v)} end @@ -122,7 +124,7 @@ defmodule Pleroma.Web.AdminAPI.Config do def transform(entity), do: :erlang.term_to_binary(entity) - defp do_transform(%Regex{} = entity) when is_map(entity), do: entity + defp do_transform(%Regex{} = entity), do: entity defp do_transform(%{"tuple" => [":dispatch", [entity]]}) do {dispatch_settings, []} = do_eval(entity) @@ -154,8 +156,15 @@ defmodule Pleroma.Web.AdminAPI.Config do defp do_transform(entity), do: entity defp do_transform_string("~r/" <> pattern) do - pattern = String.trim_trailing(pattern, "/") - ~r/#{pattern}/ + modificator = String.split(pattern, "/") |> List.last() + pattern = String.trim_trailing(pattern, "/" <> modificator) + + case modificator do + "" -> ~r/#{pattern}/ + "i" -> ~r/#{pattern}/i + "u" -> ~r/#{pattern}/u + "s" -> ~r/#{pattern}/s + end end defp do_transform_string(":" <> atom), do: String.to_atom(atom) diff --git a/lib/pleroma/web/controller_helper.ex b/lib/pleroma/web/controller_helper.ex index eeac9f503..b53a01955 100644 --- a/lib/pleroma/web/controller_helper.ex +++ b/lib/pleroma/web/controller_helper.ex @@ -34,79 +34,38 @@ defmodule Pleroma.Web.ControllerHelper do defp param_to_integer(_, default), do: default - def add_link_headers( - conn, - method, - activities, - param \\ nil, - params \\ %{}, - func3 \\ nil, - func4 \\ nil - ) do - params = - conn.params - |> Map.drop(["since_id", "max_id", "min_id"]) - |> Map.merge(params) + def add_link_headers(conn, activities, extra_params \\ %{}) do + case List.last(activities) do + %{id: max_id} -> + params = + conn.params + |> Map.drop(Map.keys(conn.path_params)) + |> Map.drop(["since_id", "max_id", "min_id"]) + |> Map.merge(extra_params) - last = List.last(activities) + limit = + params + |> Map.get("limit", "20") + |> String.to_integer() - func3 = func3 || (&mastodon_api_url/3) - func4 = func4 || (&mastodon_api_url/4) + min_id = + if length(activities) <= limit do + activities + |> List.first() + |> Map.get(:id) + else + activities + |> Enum.at(limit * -1) + |> Map.get(:id) + end - if last do - max_id = last.id + next_url = current_url(conn, Map.merge(params, %{max_id: max_id})) + prev_url = current_url(conn, Map.merge(params, %{min_id: min_id})) - limit = - params - |> Map.get("limit", "20") - |> String.to_integer() + put_resp_header(conn, "link", "<#{next_url}>; rel=\"next\", <#{prev_url}>; rel=\"prev\"") - min_id = - if length(activities) <= limit do - activities - |> List.first() - |> Map.get(:id) - else - activities - |> Enum.at(limit * -1) - |> Map.get(:id) - end - - {next_url, prev_url} = - if param do - { - func4.( - Pleroma.Web.Endpoint, - method, - param, - Map.merge(params, %{max_id: max_id}) - ), - func4.( - Pleroma.Web.Endpoint, - method, - param, - Map.merge(params, %{min_id: min_id}) - ) - } - else - { - func3.( - Pleroma.Web.Endpoint, - method, - Map.merge(params, %{max_id: max_id}) - ), - func3.( - Pleroma.Web.Endpoint, - method, - Map.merge(params, %{min_id: min_id}) - ) - } - end - - conn - |> put_resp_header("link", "<#{next_url}>; rel=\"next\", <#{prev_url}>; rel=\"prev\"") - else - conn + _ -> + conn end end end diff --git a/lib/pleroma/web/federator/federator.ex b/lib/pleroma/web/federator/federator.ex index f4f9e83e0..1a2da014a 100644 --- a/lib/pleroma/web/federator/federator.ex +++ b/lib/pleroma/web/federator/federator.ex @@ -10,16 +10,17 @@ defmodule Pleroma.Web.Federator do alias Pleroma.Web.ActivityPub.Transmogrifier alias Pleroma.Web.ActivityPub.Utils alias Pleroma.Web.Federator.Publisher - alias Pleroma.Web.Federator.RetryQueue alias Pleroma.Web.OStatus alias Pleroma.Web.Websub + alias Pleroma.Workers.PublisherWorker + alias Pleroma.Workers.ReceiverWorker + alias Pleroma.Workers.SubscriberWorker require Logger def init do - # 1 minute - Process.sleep(1000 * 60) - refresh_subscriptions() + # To do: consider removing this call in favor of scheduled execution (`quantum`-based) + refresh_subscriptions(schedule_in: 60) end @doc "Addresses [memory leaks on recursive replies fetching](https://git.pleroma.social/pleroma/pleroma/issues/161)" @@ -37,50 +38,38 @@ defmodule Pleroma.Web.Federator do # Client API def incoming_doc(doc) do - PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_doc, doc]) + ReceiverWorker.enqueue("incoming_doc", %{"body" => doc}) end def incoming_ap_doc(params) do - PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_ap_doc, params]) + ReceiverWorker.enqueue("incoming_ap_doc", %{"params" => params}) end - def publish(activity, priority \\ 1) do - PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish, activity], priority) + def publish(%{id: "pleroma:fakeid"} = activity) do + perform(:publish, activity) end - def verify_websub(websub) do - PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:verify_websub, websub]) + def publish(activity) do + PublisherWorker.enqueue("publish", %{"activity_id" => activity.id}) end - def request_subscription(sub) do - PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:request_subscription, sub]) + def verify_websub(websub) do + SubscriberWorker.enqueue("verify_websub", %{"websub_id" => websub.id}) end - def refresh_subscriptions do - PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:refresh_subscriptions]) + def request_subscription(websub) do + SubscriberWorker.enqueue("request_subscription", %{"websub_id" => websub.id}) end - # Job Worker Callbacks - - def perform(:refresh_subscriptions) do - Logger.debug("Federator running refresh subscriptions") - Websub.refresh_subscriptions() - - spawn(fn -> - # 6 hours - Process.sleep(1000 * 60 * 60 * 6) - refresh_subscriptions() - end) + def refresh_subscriptions(worker_args \\ []) do + SubscriberWorker.enqueue("refresh_subscriptions", %{}, worker_args ++ [max_attempts: 1]) end - def perform(:request_subscription, websub) do - Logger.debug("Refreshing #{websub.topic}") + # Job Worker Callbacks - with {:ok, websub} <- Websub.request_subscription(websub) do - Logger.debug("Successfully refreshed #{websub.topic}") - else - _e -> Logger.debug("Couldn't refresh #{websub.topic}") - end + @spec perform(atom(), module(), any()) :: {:ok, any()} | {:error, any()} + def perform(:publish_one, module, params) do + apply(module, :publish_one, [params]) end def perform(:publish, activity) do @@ -92,14 +81,6 @@ defmodule Pleroma.Web.Federator do end end - def perform(:verify_websub, websub) do - Logger.debug(fn -> - "Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})" - end) - - Websub.verify(websub) - end - def perform(:incoming_doc, doc) do Logger.info("Got document, trying to parse") OStatus.handle_incoming(doc) @@ -130,22 +111,27 @@ defmodule Pleroma.Web.Federator do end end - def perform( - :publish_single_websub, - %{xml: _xml, topic: _topic, callback: _callback, secret: _secret} = params - ) do - case Websub.publish_one(params) do - {:ok, _} -> - :ok + def perform(:request_subscription, websub) do + Logger.debug("Refreshing #{websub.topic}") - {:error, _} -> - RetryQueue.enqueue(params, Websub) + with {:ok, websub} <- Websub.request_subscription(websub) do + Logger.debug("Successfully refreshed #{websub.topic}") + else + _e -> Logger.debug("Couldn't refresh #{websub.topic}") end end - def perform(type, _) do - Logger.debug(fn -> "Unknown task: #{type}" end) - {:error, "Don't know what to do with this"} + def perform(:verify_websub, websub) do + Logger.debug(fn -> + "Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})" + end) + + Websub.verify(websub) + end + + def perform(:refresh_subscriptions) do + Logger.debug("Federator running refresh subscriptions") + Websub.refresh_subscriptions() end def ap_enabled_actor(id) do diff --git a/lib/pleroma/web/federator/publisher.ex b/lib/pleroma/web/federator/publisher.ex index 70f870244..937064638 100644 --- a/lib/pleroma/web/federator/publisher.ex +++ b/lib/pleroma/web/federator/publisher.ex @@ -6,7 +6,7 @@ defmodule Pleroma.Web.Federator.Publisher do alias Pleroma.Activity alias Pleroma.Config alias Pleroma.User - alias Pleroma.Web.Federator.RetryQueue + alias Pleroma.Workers.PublisherWorker require Logger @@ -30,23 +30,11 @@ defmodule Pleroma.Web.Federator.Publisher do Enqueue publishing a single activity. """ @spec enqueue_one(module(), Map.t()) :: :ok - def enqueue_one(module, %{} = params), - do: PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish_one, module, params]) - - @spec perform(atom(), module(), any()) :: {:ok, any()} | {:error, any()} - def perform(:publish_one, module, params) do - case apply(module, :publish_one, [params]) do - {:ok, _} -> - :ok - - {:error, _e} -> - RetryQueue.enqueue(params, module) - end - end - - def perform(type, _, _) do - Logger.debug("Unknown task: #{type}") - {:error, "Don't know what to do with this"} + def enqueue_one(module, %{} = params) do + PublisherWorker.enqueue( + "publish_one", + %{"module" => to_string(module), "params" => params} + ) end @doc """ diff --git a/lib/pleroma/web/federator/retry_queue.ex b/lib/pleroma/web/federator/retry_queue.ex deleted file mode 100644 index 9eab8c218..000000000 --- a/lib/pleroma/web/federator/retry_queue.ex +++ /dev/null @@ -1,239 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Web.Federator.RetryQueue do - use GenServer - - require Logger - - def init(args) do - queue_table = :ets.new(:pleroma_retry_queue, [:bag, :protected]) - - {:ok, %{args | queue_table: queue_table, running_jobs: :sets.new()}} - end - - def start_link(_) do - enabled = - if Pleroma.Config.get(:env) == :test, - do: true, - else: Pleroma.Config.get([__MODULE__, :enabled], false) - - if enabled do - Logger.info("Starting retry queue") - - linkres = - GenServer.start_link( - __MODULE__, - %{delivered: 0, dropped: 0, queue_table: nil, running_jobs: nil}, - name: __MODULE__ - ) - - maybe_kickoff_timer() - linkres - else - Logger.info("Retry queue disabled") - :ignore - end - end - - def enqueue(data, transport, retries \\ 0) do - GenServer.cast(__MODULE__, {:maybe_enqueue, data, transport, retries + 1}) - end - - def get_stats do - GenServer.call(__MODULE__, :get_stats) - end - - def reset_stats do - GenServer.call(__MODULE__, :reset_stats) - end - - def get_retry_params(retries) do - if retries > Pleroma.Config.get([__MODULE__, :max_retries]) do - {:drop, "Max retries reached"} - else - {:retry, growth_function(retries)} - end - end - - def get_retry_timer_interval do - Pleroma.Config.get([:retry_queue, :interval], 1000) - end - - defp ets_count_expires(table, current_time) do - :ets.select_count( - table, - [ - { - {:"$1", :"$2"}, - [{:"=<", :"$1", {:const, current_time}}], - [true] - } - ] - ) - end - - defp ets_pop_n_expired(table, current_time, desired) do - {popped, _continuation} = - :ets.select( - table, - [ - { - {:"$1", :"$2"}, - [{:"=<", :"$1", {:const, current_time}}], - [:"$_"] - } - ], - desired - ) - - popped - |> Enum.each(fn e -> - :ets.delete_object(table, e) - end) - - popped - end - - def maybe_start_job(running_jobs, queue_table) do - # we don't want to hit the ets or the DateTime more times than we have to - # could optimize slightly further by not using the count, and instead grabbing - # up to N objects early... - current_time = DateTime.to_unix(DateTime.utc_now()) - n_running_jobs = :sets.size(running_jobs) - - if n_running_jobs < Pleroma.Config.get([__MODULE__, :max_jobs]) do - n_ready_jobs = ets_count_expires(queue_table, current_time) - - if n_ready_jobs > 0 do - # figure out how many we could start - available_job_slots = Pleroma.Config.get([__MODULE__, :max_jobs]) - n_running_jobs - start_n_jobs(running_jobs, queue_table, current_time, available_job_slots) - else - running_jobs - end - else - running_jobs - end - end - - defp start_n_jobs(running_jobs, _queue_table, _current_time, 0) do - running_jobs - end - - defp start_n_jobs(running_jobs, queue_table, current_time, available_job_slots) - when available_job_slots > 0 do - candidates = ets_pop_n_expired(queue_table, current_time, available_job_slots) - - candidates - |> List.foldl(running_jobs, fn {_, e}, rj -> - {:ok, pid} = Task.start(fn -> worker(e) end) - mref = Process.monitor(pid) - :sets.add_element(mref, rj) - end) - end - - def worker({:send, data, transport, retries}) do - case transport.publish_one(data) do - {:ok, _} -> - GenServer.cast(__MODULE__, :inc_delivered) - :delivered - - {:error, _reason} -> - enqueue(data, transport, retries) - :retry - end - end - - def handle_call(:get_stats, _from, %{delivered: delivery_count, dropped: drop_count} = state) do - {:reply, %{delivered: delivery_count, dropped: drop_count}, state} - end - - def handle_call(:reset_stats, _from, %{delivered: delivery_count, dropped: drop_count} = state) do - {:reply, %{delivered: delivery_count, dropped: drop_count}, - %{state | delivered: 0, dropped: 0}} - end - - def handle_cast(:reset_stats, state) do - {:noreply, %{state | delivered: 0, dropped: 0}} - end - - def handle_cast( - {:maybe_enqueue, data, transport, retries}, - %{dropped: drop_count, queue_table: queue_table, running_jobs: running_jobs} = state - ) do - case get_retry_params(retries) do - {:retry, timeout} -> - :ets.insert(queue_table, {timeout, {:send, data, transport, retries}}) - running_jobs = maybe_start_job(running_jobs, queue_table) - {:noreply, %{state | running_jobs: running_jobs}} - - {:drop, message} -> - Logger.debug(message) - {:noreply, %{state | dropped: drop_count + 1}} - end - end - - def handle_cast(:kickoff_timer, state) do - retry_interval = get_retry_timer_interval() - Process.send_after(__MODULE__, :retry_timer_run, retry_interval) - {:noreply, state} - end - - def handle_cast(:inc_delivered, %{delivered: delivery_count} = state) do - {:noreply, %{state | delivered: delivery_count + 1}} - end - - def handle_cast(:inc_dropped, %{dropped: drop_count} = state) do - {:noreply, %{state | dropped: drop_count + 1}} - end - - def handle_info({:send, data, transport, retries}, %{delivered: delivery_count} = state) do - case transport.publish_one(data) do - {:ok, _} -> - {:noreply, %{state | delivered: delivery_count + 1}} - - {:error, _reason} -> - enqueue(data, transport, retries) - {:noreply, state} - end - end - - def handle_info( - :retry_timer_run, - %{queue_table: queue_table, running_jobs: running_jobs} = state - ) do - maybe_kickoff_timer() - running_jobs = maybe_start_job(running_jobs, queue_table) - {:noreply, %{state | running_jobs: running_jobs}} - end - - def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do - %{running_jobs: running_jobs, queue_table: queue_table} = state - running_jobs = :sets.del_element(ref, running_jobs) - running_jobs = maybe_start_job(running_jobs, queue_table) - {:noreply, %{state | running_jobs: running_jobs}} - end - - def handle_info(unknown, state) do - Logger.debug("RetryQueue: don't know what to do with #{inspect(unknown)}, ignoring") - {:noreply, state} - end - - if Pleroma.Config.get(:env) == :test do - defp growth_function(_retries) do - _shutit = Pleroma.Config.get([__MODULE__, :initial_timeout]) - DateTime.to_unix(DateTime.utc_now()) - 1 - end - else - defp growth_function(retries) do - round(Pleroma.Config.get([__MODULE__, :initial_timeout]) * :math.pow(retries, 3)) + - DateTime.to_unix(DateTime.utc_now()) - end - end - - defp maybe_kickoff_timer do - GenServer.cast(__MODULE__, :kickoff_timer) - end -end diff --git a/lib/pleroma/web/mastodon_api/controllers/mastodon_api_controller.ex b/lib/pleroma/web/mastodon_api/controllers/mastodon_api_controller.ex index c54462bb3..060137b80 100644 --- a/lib/pleroma/web/mastodon_api/controllers/mastodon_api_controller.ex +++ b/lib/pleroma/web/mastodon_api/controllers/mastodon_api_controller.ex @@ -6,7 +6,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do use Pleroma.Web, :controller import Pleroma.Web.ControllerHelper, - only: [json_response: 3, add_link_headers: 5, add_link_headers: 4, add_link_headers: 3] + only: [json_response: 3, add_link_headers: 2, add_link_headers: 3] alias Ecto.Changeset alias Pleroma.Activity @@ -365,7 +365,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do |> Enum.reverse() conn - |> add_link_headers(:home_timeline, activities) + |> add_link_headers(activities) |> put_view(StatusView) |> render("index.json", %{activities: activities, for: user, as: :activity}) end @@ -384,7 +384,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do |> Enum.reverse() conn - |> add_link_headers(:public_timeline, activities, false, %{"local" => local_only}) + |> add_link_headers(activities, %{"local" => local_only}) |> put_view(StatusView) |> render("index.json", %{activities: activities, for: user, as: :activity}) end @@ -398,7 +398,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do activities = ActivityPub.fetch_user_activities(user, reading_user, params) conn - |> add_link_headers(:user_statuses, activities, params["id"]) + |> add_link_headers(activities) |> put_view(StatusView) |> render("index.json", %{ activities: activities, @@ -422,7 +422,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do |> Pagination.fetch_paginated(params) conn - |> add_link_headers(:dm_timeline, activities) + |> add_link_headers(activities) |> put_view(StatusView) |> render("index.json", %{activities: activities, for: user, as: :activity}) end @@ -537,7 +537,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do def scheduled_statuses(%{assigns: %{user: user}} = conn, params) do with scheduled_activities <- MastodonAPI.get_scheduled_activities(user, params) do conn - |> add_link_headers(:scheduled_statuses, scheduled_activities) + |> add_link_headers(scheduled_activities) |> put_view(ScheduledActivityView) |> render("index.json", %{scheduled_activities: scheduled_activities}) end @@ -720,7 +720,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do notifications = MastodonAPI.get_notifications(user, params) conn - |> add_link_headers(:notifications, notifications) + |> add_link_headers(notifications) |> put_view(NotificationView) |> render("index.json", %{notifications: notifications, for: user}) end @@ -842,6 +842,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do def favourited_by(%{assigns: %{user: user}} = conn, %{"id" => id}) do with %Activity{} = activity <- Activity.get_by_id_with_object(id), + {:visible, true} <- {:visible, Visibility.visible_for_user?(activity, user)}, %Object{data: %{"likes" => likes}} <- Object.normalize(activity) do q = from(u in User, where: u.ap_id in ^likes) @@ -853,12 +854,14 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do |> put_view(AccountView) |> render("accounts.json", %{for: user, users: users, as: :user}) else + {:visible, false} -> {:error, :not_found} _ -> json(conn, []) end end def reblogged_by(%{assigns: %{user: user}} = conn, %{"id" => id}) do with %Activity{} = activity <- Activity.get_by_id_with_object(id), + {:visible, true} <- {:visible, Visibility.visible_for_user?(activity, user)}, %Object{data: %{"announcements" => announces}} <- Object.normalize(activity) do q = from(u in User, where: u.ap_id in ^announces) @@ -870,6 +873,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do |> put_view(AccountView) |> render("accounts.json", %{for: user, users: users, as: :user}) else + {:visible, false} -> {:error, :not_found} _ -> json(conn, []) end end @@ -908,7 +912,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do |> Enum.reverse() conn - |> add_link_headers(:hashtag_timeline, activities, params["tag"], %{"local" => local_only}) + |> add_link_headers(activities, %{"local" => local_only}) |> put_view(StatusView) |> render("index.json", %{activities: activities, for: user, as: :activity}) end @@ -924,7 +928,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do end conn - |> add_link_headers(:followers, followers, user) + |> add_link_headers(followers) |> put_view(AccountView) |> render("accounts.json", %{for: for_user, users: followers, as: :user}) end @@ -941,7 +945,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do end conn - |> add_link_headers(:following, followers, user) + |> add_link_headers(followers) |> put_view(AccountView) |> render("accounts.json", %{for: for_user, users: followers, as: :user}) end @@ -1166,7 +1170,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do |> Enum.reverse() conn - |> add_link_headers(:favourites, activities) + |> add_link_headers(activities) |> put_view(StatusView) |> render("index.json", %{activities: activities, for: user, as: :activity}) end @@ -1193,7 +1197,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do |> Enum.reverse() conn - |> add_link_headers(:favourites, activities) + |> add_link_headers(activities) |> put_view(StatusView) |> render("index.json", %{activities: activities, for: for_user, as: :activity}) else @@ -1214,7 +1218,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do |> Enum.map(fn b -> Map.put(b.activity, :bookmark, Map.delete(b, :activity)) end) conn - |> add_link_headers(:bookmarks, bookmarks) + |> add_link_headers(bookmarks) |> put_view(StatusView) |> render("index.json", %{activities: activities, for: user, as: :activity}) end @@ -1654,7 +1658,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do end) conn - |> add_link_headers(:conversations, participations) + |> add_link_headers(participations) |> json(conversations) end diff --git a/lib/pleroma/web/oauth/token/clean_worker.ex b/lib/pleroma/web/oauth/token/clean_worker.ex index f50098302..eb94bf86f 100644 --- a/lib/pleroma/web/oauth/token/clean_worker.ex +++ b/lib/pleroma/web/oauth/token/clean_worker.ex @@ -17,6 +17,7 @@ defmodule Pleroma.Web.OAuth.Token.CleanWorker do ) alias Pleroma.Web.OAuth.Token + alias Pleroma.Workers.BackgroundWorker def start_link(_), do: GenServer.start_link(__MODULE__, %{}) @@ -27,9 +28,11 @@ defmodule Pleroma.Web.OAuth.Token.CleanWorker do @doc false def handle_info(:perform, state) do - Token.delete_expired_tokens() + BackgroundWorker.enqueue("clean_expired_tokens", %{}) Process.send_after(self(), :perform, @interval) {:noreply, state} end + + def perform(:clean), do: Token.delete_expired_tokens() end diff --git a/lib/pleroma/web/pleroma_api/pleroma_api_controller.ex b/lib/pleroma/web/pleroma_api/pleroma_api_controller.ex index f4df3b024..d17ccf84d 100644 --- a/lib/pleroma/web/pleroma_api/pleroma_api_controller.ex +++ b/lib/pleroma/web/pleroma_api/pleroma_api_controller.ex @@ -5,7 +5,7 @@ defmodule Pleroma.Web.PleromaAPI.PleromaAPIController do use Pleroma.Web, :controller - import Pleroma.Web.ControllerHelper, only: [add_link_headers: 7] + import Pleroma.Web.ControllerHelper, only: [add_link_headers: 2] alias Pleroma.Conversation.Participation alias Pleroma.Notification @@ -27,31 +27,22 @@ defmodule Pleroma.Web.PleromaAPI.PleromaAPIController do %{assigns: %{user: user}} = conn, %{"id" => participation_id} = params ) do - params = - params - |> Map.put("blocking_user", user) - |> Map.put("muting_user", user) - |> Map.put("user", user) - - participation = - participation_id - |> Participation.get(preload: [:conversation]) + participation = Participation.get(participation_id, preload: [:conversation]) if user.id == participation.user_id do + params = + params + |> Map.put("blocking_user", user) + |> Map.put("muting_user", user) + |> Map.put("user", user) + activities = participation.conversation.ap_id |> ActivityPub.fetch_activities_for_context(params) |> Enum.reverse() conn - |> add_link_headers( - :conversation_statuses, - activities, - participation_id, - params, - nil, - &pleroma_api_url/4 - ) + |> add_link_headers(activities) |> put_view(StatusView) |> render("index.json", %{activities: activities, for: user, as: :activity}) end diff --git a/lib/pleroma/web/push/push.ex b/lib/pleroma/web/push/push.ex index 729dad02a..7ef1532ac 100644 --- a/lib/pleroma/web/push/push.ex +++ b/lib/pleroma/web/push/push.ex @@ -3,7 +3,7 @@ # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Web.Push do - alias Pleroma.Web.Push.Impl + alias Pleroma.Workers.WebPusherWorker require Logger @@ -31,6 +31,7 @@ defmodule Pleroma.Web.Push do end end - def send(notification), - do: PleromaJobQueue.enqueue(:web_push, Impl, [notification]) + def send(notification) do + WebPusherWorker.enqueue("web_push", %{"notification_id" => notification.id}) + end end diff --git a/lib/pleroma/web/router.ex b/lib/pleroma/web/router.ex index 7cd59acb2..b0464037e 100644 --- a/lib/pleroma/web/router.ex +++ b/lib/pleroma/web/router.ex @@ -224,6 +224,7 @@ defmodule Pleroma.Web.Router do scope [] do pipe_through(:oauth_write) + post("/change_email", UtilController, :change_email) post("/change_password", UtilController, :change_password) post("/delete_account", UtilController, :delete_account) put("/notification_settings", UtilController, :update_notificaton_settings) diff --git a/lib/pleroma/web/salmon/salmon.ex b/lib/pleroma/web/salmon/salmon.ex index 9b01ebcc6..8ba7380c0 100644 --- a/lib/pleroma/web/salmon/salmon.ex +++ b/lib/pleroma/web/salmon/salmon.ex @@ -170,6 +170,15 @@ defmodule Pleroma.Web.Salmon do end end + def publish_one(%{recipient_id: recipient_id} = params) do + recipient = User.get_cached_by_id(recipient_id) + + params + |> Map.delete(:recipient_id) + |> Map.put(:recipient, recipient) + |> publish_one() + end + def publish_one(_), do: :noop @supported_activities [ @@ -218,7 +227,7 @@ defmodule Pleroma.Web.Salmon do Logger.debug(fn -> "Sending Salmon to #{remote_user.ap_id}" end) Publisher.enqueue_one(__MODULE__, %{ - recipient: remote_user, + recipient_id: remote_user.id, feed: feed, unreachable_since: reachable_urls_metadata[remote_user.info.salmon] }) diff --git a/lib/pleroma/web/twitter_api/controllers/util_controller.ex b/lib/pleroma/web/twitter_api/controllers/util_controller.ex index 3405bd3b7..d7745ae7a 100644 --- a/lib/pleroma/web/twitter_api/controllers/util_controller.ex +++ b/lib/pleroma/web/twitter_api/controllers/util_controller.ex @@ -265,12 +265,7 @@ defmodule Pleroma.Web.TwitterAPI.UtilController do String.split(line, ",") |> List.first() end) |> List.delete("Account address") do - PleromaJobQueue.enqueue(:background, User, [ - :follow_import, - follower, - followed_identifiers - ]) - + User.follow_import(follower, followed_identifiers) json(conn, "job started") end end @@ -281,12 +276,7 @@ defmodule Pleroma.Web.TwitterAPI.UtilController do def blocks_import(%{assigns: %{user: blocker}} = conn, %{"list" => list}) do with blocked_identifiers <- String.split(list) do - PleromaJobQueue.enqueue(:background, User, [ - :blocks_import, - blocker, - blocked_identifiers - ]) - + User.blocks_import(blocker, blocked_identifiers) json(conn, "job started") end end @@ -314,6 +304,25 @@ defmodule Pleroma.Web.TwitterAPI.UtilController do end end + def change_email(%{assigns: %{user: user}} = conn, params) do + case CommonAPI.Utils.confirm_current_password(user, params["password"]) do + {:ok, user} -> + with {:ok, _user} <- User.change_email(user, params["email"]) do + json(conn, %{status: "success"}) + else + {:error, changeset} -> + {_, {error, _}} = Enum.at(changeset.errors, 0) + json(conn, %{error: "Email #{error}."}) + + _ -> + json(conn, %{error: "Unable to change email."}) + end + + {:error, msg} -> + json(conn, %{error: msg}) + end + end + def delete_account(%{assigns: %{user: user}} = conn, params) do case CommonAPI.Utils.confirm_current_password(user, params["password"]) do {:ok, user} -> diff --git a/lib/pleroma/workers/activity_expiration_worker.ex b/lib/pleroma/workers/activity_expiration_worker.ex new file mode 100644 index 000000000..4e3e4195f --- /dev/null +++ b/lib/pleroma/workers/activity_expiration_worker.ex @@ -0,0 +1,18 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.ActivityExpirationWorker do + use Pleroma.Workers.WorkerHelper, queue: "activity_expiration" + + @impl Oban.Worker + def perform( + %{ + "op" => "activity_expiration", + "activity_expiration_id" => activity_expiration_id + }, + _job + ) do + Pleroma.Daemons.ActivityExpirationDaemon.perform(:execute, activity_expiration_id) + end +end diff --git a/lib/pleroma/workers/background_worker.ex b/lib/pleroma/workers/background_worker.ex new file mode 100644 index 000000000..082f20ab7 --- /dev/null +++ b/lib/pleroma/workers/background_worker.ex @@ -0,0 +1,69 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.BackgroundWorker do + alias Pleroma.Activity + alias Pleroma.User + alias Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy + alias Pleroma.Web.OAuth.Token.CleanWorker + + use Pleroma.Workers.WorkerHelper, queue: "background" + + @impl Oban.Worker + def perform(%{"op" => "fetch_initial_posts", "user_id" => user_id}, _job) do + user = User.get_cached_by_id(user_id) + User.perform(:fetch_initial_posts, user) + end + + def perform(%{"op" => "deactivate_user", "user_id" => user_id, "status" => status}, _job) do + user = User.get_cached_by_id(user_id) + User.perform(:deactivate_async, user, status) + end + + def perform(%{"op" => "delete_user", "user_id" => user_id}, _job) do + user = User.get_cached_by_id(user_id) + User.perform(:delete, user) + end + + def perform( + %{ + "op" => "blocks_import", + "blocker_id" => blocker_id, + "blocked_identifiers" => blocked_identifiers + }, + _job + ) do + blocker = User.get_cached_by_id(blocker_id) + User.perform(:blocks_import, blocker, blocked_identifiers) + end + + def perform( + %{ + "op" => "follow_import", + "follower_id" => follower_id, + "followed_identifiers" => followed_identifiers + }, + _job + ) do + follower = User.get_cached_by_id(follower_id) + User.perform(:follow_import, follower, followed_identifiers) + end + + def perform(%{"op" => "clean_expired_tokens"}, _job) do + CleanWorker.perform(:clean) + end + + def perform(%{"op" => "media_proxy_preload", "message" => message}, _job) do + MediaProxyWarmingPolicy.perform(:preload, message) + end + + def perform(%{"op" => "media_proxy_prefetch", "url" => url}, _job) do + MediaProxyWarmingPolicy.perform(:prefetch, url) + end + + def perform(%{"op" => "fetch_data_for_activity", "activity_id" => activity_id}, _job) do + activity = Activity.get_by_id(activity_id) + Pleroma.Web.RichMedia.Helpers.perform(:fetch, activity) + end +end diff --git a/lib/pleroma/workers/digest_emails_worker.ex b/lib/pleroma/workers/digest_emails_worker.ex new file mode 100644 index 000000000..3e5a836d0 --- /dev/null +++ b/lib/pleroma/workers/digest_emails_worker.ex @@ -0,0 +1,16 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.DigestEmailsWorker do + alias Pleroma.User + + use Pleroma.Workers.WorkerHelper, queue: "digest_emails" + + @impl Oban.Worker + def perform(%{"op" => "digest_email", "user_id" => user_id}, _job) do + user_id + |> User.get_cached_by_id() + |> Pleroma.Daemons.DigestEmailDaemon.perform() + end +end diff --git a/lib/pleroma/workers/mailer_worker.ex b/lib/pleroma/workers/mailer_worker.ex new file mode 100644 index 000000000..1b7a0eb3e --- /dev/null +++ b/lib/pleroma/workers/mailer_worker.ex @@ -0,0 +1,15 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.MailerWorker do + use Pleroma.Workers.WorkerHelper, queue: "mailer" + + @impl Oban.Worker + def perform(%{"op" => "email", "encoded_email" => encoded_email, "config" => config}, _job) do + encoded_email + |> Base.decode64!() + |> :erlang.binary_to_term() + |> Pleroma.Emails.Mailer.deliver(config) + end +end diff --git a/lib/pleroma/workers/publisher_worker.ex b/lib/pleroma/workers/publisher_worker.ex new file mode 100644 index 000000000..455f7fc7e --- /dev/null +++ b/lib/pleroma/workers/publisher_worker.ex @@ -0,0 +1,25 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.PublisherWorker do + alias Pleroma.Activity + alias Pleroma.Web.Federator + + use Pleroma.Workers.WorkerHelper, queue: "federator_outgoing" + + def backoff(attempt) when is_integer(attempt) do + Pleroma.Workers.WorkerHelper.sidekiq_backoff(attempt, 5) + end + + @impl Oban.Worker + def perform(%{"op" => "publish", "activity_id" => activity_id}, _job) do + activity = Activity.get_by_id(activity_id) + Federator.perform(:publish, activity) + end + + def perform(%{"op" => "publish_one", "module" => module_name, "params" => params}, _job) do + params = Map.new(params, fn {k, v} -> {String.to_atom(k), v} end) + Federator.perform(:publish_one, String.to_atom(module_name), params) + end +end diff --git a/lib/pleroma/workers/receiver_worker.ex b/lib/pleroma/workers/receiver_worker.ex new file mode 100644 index 000000000..83d528a66 --- /dev/null +++ b/lib/pleroma/workers/receiver_worker.ex @@ -0,0 +1,18 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.ReceiverWorker do + alias Pleroma.Web.Federator + + use Pleroma.Workers.WorkerHelper, queue: "federator_incoming" + + @impl Oban.Worker + def perform(%{"op" => "incoming_doc", "body" => doc}, _job) do + Federator.perform(:incoming_doc, doc) + end + + def perform(%{"op" => "incoming_ap_doc", "params" => params}, _job) do + Federator.perform(:incoming_ap_doc, params) + end +end diff --git a/lib/pleroma/workers/scheduled_activity_worker.ex b/lib/pleroma/workers/scheduled_activity_worker.ex new file mode 100644 index 000000000..ca7d53af1 --- /dev/null +++ b/lib/pleroma/workers/scheduled_activity_worker.ex @@ -0,0 +1,12 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.ScheduledActivityWorker do + use Pleroma.Workers.WorkerHelper, queue: "scheduled_activities" + + @impl Oban.Worker + def perform(%{"op" => "execute", "activity_id" => activity_id}, _job) do + Pleroma.Daemons.ScheduledActivityDaemon.perform(:execute, activity_id) + end +end diff --git a/lib/pleroma/workers/subscriber_worker.ex b/lib/pleroma/workers/subscriber_worker.ex new file mode 100644 index 000000000..fc490e300 --- /dev/null +++ b/lib/pleroma/workers/subscriber_worker.ex @@ -0,0 +1,26 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.SubscriberWorker do + alias Pleroma.Repo + alias Pleroma.Web.Federator + alias Pleroma.Web.Websub + + use Pleroma.Workers.WorkerHelper, queue: "federator_outgoing" + + @impl Oban.Worker + def perform(%{"op" => "refresh_subscriptions"}, _job) do + Federator.perform(:refresh_subscriptions) + end + + def perform(%{"op" => "request_subscription", "websub_id" => websub_id}, _job) do + websub = Repo.get(Websub.WebsubClientSubscription, websub_id) + Federator.perform(:request_subscription, websub) + end + + def perform(%{"op" => "verify_websub", "websub_id" => websub_id}, _job) do + websub = Repo.get(Websub.WebsubServerSubscription, websub_id) + Federator.perform(:verify_websub, websub) + end +end diff --git a/lib/pleroma/workers/transmogrifier_worker.ex b/lib/pleroma/workers/transmogrifier_worker.ex new file mode 100644 index 000000000..b581a2f86 --- /dev/null +++ b/lib/pleroma/workers/transmogrifier_worker.ex @@ -0,0 +1,15 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.TransmogrifierWorker do + alias Pleroma.User + + use Pleroma.Workers.WorkerHelper, queue: "transmogrifier" + + @impl Oban.Worker + def perform(%{"op" => "user_upgrade", "user_id" => user_id}, _job) do + user = User.get_cached_by_id(user_id) + Pleroma.Web.ActivityPub.Transmogrifier.perform(:user_upgrade, user) + end +end diff --git a/lib/pleroma/workers/web_pusher_worker.ex b/lib/pleroma/workers/web_pusher_worker.ex new file mode 100644 index 000000000..bea2baffb --- /dev/null +++ b/lib/pleroma/workers/web_pusher_worker.ex @@ -0,0 +1,16 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.WebPusherWorker do + alias Pleroma.Notification + alias Pleroma.Repo + + use Pleroma.Workers.WorkerHelper, queue: "web_push" + + @impl Oban.Worker + def perform(%{"op" => "web_push", "notification_id" => notification_id}, _job) do + notification = Repo.get(Notification, notification_id) + Pleroma.Web.Push.Impl.perform(notification) + end +end diff --git a/lib/pleroma/workers/worker_helper.ex b/lib/pleroma/workers/worker_helper.ex new file mode 100644 index 000000000..358efa14a --- /dev/null +++ b/lib/pleroma/workers/worker_helper.ex @@ -0,0 +1,46 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.WorkerHelper do + alias Pleroma.Config + alias Pleroma.Workers.WorkerHelper + + def worker_args(queue) do + case Config.get([:workers, :retries, queue]) do + nil -> [] + max_attempts -> [max_attempts: max_attempts] + end + end + + def sidekiq_backoff(attempt, pow \\ 4, base_backoff \\ 15) do + backoff = + :math.pow(attempt, pow) + + base_backoff + + :rand.uniform(2 * base_backoff) * attempt + + trunc(backoff) + end + + defmacro __using__(opts) do + caller_module = __CALLER__.module + queue = Keyword.fetch!(opts, :queue) + + quote do + # Note: `max_attempts` is intended to be overridden in `new/2` call + use Oban.Worker, + queue: unquote(queue), + max_attempts: 1 + + def enqueue(op, params, worker_args \\ []) do + params = Map.merge(%{"op" => op}, params) + queue_atom = String.to_atom(unquote(queue)) + worker_args = worker_args ++ WorkerHelper.worker_args(queue_atom) + + unquote(caller_module) + |> apply(:new, [params, worker_args]) + |> Pleroma.Repo.insert() + end + end + end +end |