diff options
56 files changed, 727 insertions, 551 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index 880fba58a..fd81b3087 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). - Mastodon API: Unsubscribe followers when they unfollow a user - AdminAPI: Add "godmode" while fetching user statuses (i.e. admin can see private statuses) - Improve digest email template +- Replaced [pleroma_job_queue](https://git.pleroma.social/pleroma/pleroma_job_queue) with [Oban](https://github.com/sorentwo/oban) +- Introduced [quantum](https://github.com/quantum-elixir/quantum-core) job scheduler ### Fixed - Not being able to pin unlisted posts diff --git a/config/config.exs b/config/config.exs index 118f697b3..9a8c69448 100644 --- a/config/config.exs +++ b/config/config.exs @@ -51,6 +51,24 @@ config :pleroma, Pleroma.Repo, telemetry_event: [Pleroma.Repo.Instrumenter], migration_lock: nil +scheduled_jobs = + with digest_config <- Application.get_env(:pleroma, :email_notifications)[:digest], + true <- digest_config[:active] do + [{digest_config[:schedule], {Pleroma.DigestEmailWorker, :perform, []}}] + else + _ -> [] + end + +scheduled_jobs = + scheduled_jobs ++ + [{"0 */6 * * * *", {Pleroma.Web.Websub, :refresh_subscriptions, []}}] + +config :pleroma, Pleroma.Scheduler, + global: true, + overlap: true, + timezone: :utc, + jobs: scheduled_jobs + config :pleroma, Pleroma.Captcha, enabled: false, seconds_valid: 60, @@ -449,20 +467,25 @@ config :pleroma, Pleroma.User, "web" ] -config :pleroma, Pleroma.Web.Federator.RetryQueue, - enabled: false, - max_jobs: 20, - initial_timeout: 30, - max_retries: 5 - -config :pleroma_job_queue, :queues, - federator_incoming: 50, - federator_outgoing: 50, - web_push: 50, - mailer: 10, - transmogrifier: 20, - scheduled_activities: 10, - background: 5 +config :pleroma, Oban, + repo: Pleroma.Repo, + verbose: false, + prune: {:maxage, 60 * 60 * 24 * 7}, + queues: [ + federator_incoming: 50, + federator_outgoing: 50, + web_push: 50, + mailer: 10, + transmogrifier: 20, + scheduled_activities: 10, + background: 5 + ] + +config :pleroma, :workers, + retries: [ + federator_incoming: 5, + federator_outgoing: 5 + ] config :pleroma, :fetch_initial_posts, enabled: false, diff --git a/config/test.exs b/config/test.exs index 30a51f734..62f2a04d2 100644 --- a/config/test.exs +++ b/config/test.exs @@ -61,7 +61,9 @@ config :web_push_encryption, :vapid_details, config :web_push_encryption, :http_client, Pleroma.Web.WebPushHttpClientMock -config :pleroma_job_queue, disabled: true +config :pleroma, Oban, + queues: false, + prune: :disabled config :pleroma, Pleroma.ScheduledActivity, daily_user_limit: 2, diff --git a/docs/config.md b/docs/config.md index c246e2f6c..72e36db83 100644 --- a/docs/config.md +++ b/docs/config.md @@ -400,9 +400,9 @@ You can then do curl "http://localhost:4000/api/pleroma/admin/invite_token?admin_token=somerandomtoken" ``` -## :pleroma_job_queue +## Oban -[Pleroma Job Queue](https://git.pleroma.social/pleroma/pleroma_job_queue) configuration: a list of queues with maximum concurrent jobs. +[Oban](https://github.com/sorentwo/oban) asynchronous job processor configuration. Pleroma has the following queues: @@ -416,19 +416,21 @@ Pleroma has the following queues: Example: ```elixir -config :pleroma_job_queue, :queues, - federator_incoming: 50, - federator_outgoing: 50 +config :pleroma, Oban, + repo: Pleroma.Repo, + queues: [ + federator_incoming: 50, + federator_outgoing: 50 + ] ``` -This config contains two queues: `federator_incoming` and `federator_outgoing`. Both have the `max_jobs` set to `50`. +This config contains two queues: `federator_incoming` and `federator_outgoing`. Both have the number of max concurrent jobs set to `50`. + +## :workers -## Pleroma.Web.Federator.RetryQueue +Includes custom worker options not interpretable directly by `Oban`. -* `enabled`: If set to `true`, failed federation jobs will be retried -* `max_jobs`: The maximum amount of parallel federation jobs running at the same time. -* `initial_timeout`: The initial timeout in seconds -* `max_retries`: The maximum number of times a federation job is retried +* `retries` — keyword lists where keys are `Oban` queues (see above) and values are numbers of max attempts for failed jobs. ## Pleroma.Web.Metadata * `providers`: a list of metadata providers to enable. Providers available: diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex index 25e56b9e2..ce2d3ab59 100644 --- a/lib/pleroma/application.ex +++ b/lib/pleroma/application.ex @@ -31,6 +31,7 @@ defmodule Pleroma.Application do children = [ Pleroma.Repo, + Pleroma.Scheduler, Pleroma.Config.TransferTask, Pleroma.Emoji, Pleroma.Captcha, @@ -40,8 +41,8 @@ defmodule Pleroma.Application do cachex_children() ++ hackney_pool_children() ++ [ - Pleroma.Web.Federator.RetryQueue, Pleroma.Stats, + {Oban, Application.get_env(:pleroma, Oban)}, %{ id: :web_push_init, start: {Task, :start_link, [&Pleroma.Web.Push.init/0]}, @@ -69,9 +70,7 @@ defmodule Pleroma.Application do # See http://elixir-lang.org/docs/stable/elixir/Supervisor.html # for other strategies and supported options opts = [strategy: :one_for_one, name: Pleroma.Supervisor] - result = Supervisor.start_link(children, opts) - :ok = after_supervisor_start() - result + Supervisor.start_link(children, opts) end defp setup_instrumenters do @@ -162,17 +161,4 @@ defmodule Pleroma.Application do :hackney_pool.child_spec(pool, options) end end - - defp after_supervisor_start do - with digest_config <- Application.get_env(:pleroma, :email_notifications)[:digest], - true <- digest_config[:active] do - PleromaJobQueue.schedule( - digest_config[:schedule], - :digest_emails, - Pleroma.DigestEmailWorker - ) - end - - :ok - end end diff --git a/lib/pleroma/digest_email_worker.ex b/lib/pleroma/digest_email_worker.ex index 5644d6a67..ffc48bfab 100644 --- a/lib/pleroma/digest_email_worker.ex +++ b/lib/pleroma/digest_email_worker.ex @@ -3,9 +3,12 @@ # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.DigestEmailWorker do + alias Pleroma.Repo + alias Pleroma.Workers.Mailer, as: MailerWorker + import Ecto.Query - @queue_name :digest_emails + defdelegate worker_args(queue), to: Pleroma.Workers.Helper def perform do config = Pleroma.Config.get([:email_notifications, :digest]) @@ -21,7 +24,11 @@ defmodule Pleroma.DigestEmailWorker do select: u ) |> Pleroma.Repo.all() - |> Enum.each(&PleromaJobQueue.enqueue(@queue_name, __MODULE__, [&1])) + |> Enum.each(fn user -> + %{"op" => "digest_email", "user_id" => user.id} + |> MailerWorker.new([queue: "digest_emails"] ++ worker_args(:digest_emails)) + |> Repo.insert() + end) end @doc """ diff --git a/lib/pleroma/emails/mailer.ex b/lib/pleroma/emails/mailer.ex index 2e4657b7c..bb534f602 100644 --- a/lib/pleroma/emails/mailer.ex +++ b/lib/pleroma/emails/mailer.ex @@ -9,6 +9,8 @@ defmodule Pleroma.Emails.Mailer do The module contains functions to delivery email using Swoosh.Mailer. """ + alias Pleroma.Repo + alias Pleroma.Workers.Mailer, as: MailerWorker alias Swoosh.DeliveryError @otp_app :pleroma @@ -17,9 +19,18 @@ defmodule Pleroma.Emails.Mailer do @spec enabled?() :: boolean() def enabled?, do: Pleroma.Config.get([__MODULE__, :enabled]) + defdelegate worker_args(queue), to: Pleroma.Workers.Helper + @doc "add email to queue" def deliver_async(email, config \\ []) do - PleromaJobQueue.enqueue(:mailer, __MODULE__, [:deliver_async, email, config]) + encoded_email = + email + |> :erlang.term_to_binary() + |> Base.encode64() + + %{"op" => "email", "encoded_email" => encoded_email, "config" => config} + |> MailerWorker.new(worker_args(:mailer)) + |> Repo.insert() end @doc "callback to perform send email from queue" diff --git a/lib/pleroma/instances/instance.ex b/lib/pleroma/instances/instance.ex index 4d7ed4ca1..544c4b687 100644 --- a/lib/pleroma/instances/instance.ex +++ b/lib/pleroma/instances/instance.ex @@ -90,7 +90,7 @@ defmodule Pleroma.Instances.Instance do def set_unreachable(url_or_host, unreachable_since \\ nil) def set_unreachable(url_or_host, unreachable_since) when is_binary(url_or_host) do - unreachable_since = unreachable_since || DateTime.utc_now() + unreachable_since = parse_datetime(unreachable_since) || NaiveDateTime.utc_now() host = host(url_or_host) existing_record = Repo.get_by(Instance, %{host: host}) @@ -114,4 +114,10 @@ defmodule Pleroma.Instances.Instance do end def set_unreachable(_, _), do: {:error, nil} + + defp parse_datetime(datetime) when is_binary(datetime) do + NaiveDateTime.from_iso8601(datetime) + end + + defp parse_datetime(datetime), do: datetime end diff --git a/lib/pleroma/scheduled_activity_worker.ex b/lib/pleroma/scheduled_activity_worker.ex index 8578cab5e..a01fb4fcb 100644 --- a/lib/pleroma/scheduled_activity_worker.ex +++ b/lib/pleroma/scheduled_activity_worker.ex @@ -8,14 +8,18 @@ defmodule Pleroma.ScheduledActivityWorker do """ alias Pleroma.Config + alias Pleroma.Repo alias Pleroma.ScheduledActivity alias Pleroma.User alias Pleroma.Web.CommonAPI + use GenServer require Logger @schedule_interval :timer.minutes(1) + defdelegate worker_args(queue), to: Pleroma.Workers.Helper + def start_link(_) do GenServer.start_link(__MODULE__, nil) end @@ -45,7 +49,9 @@ defmodule Pleroma.ScheduledActivityWorker do def handle_info(:perform, state) do ScheduledActivity.due_activities(@schedule_interval) |> Enum.each(fn scheduled_activity -> - PleromaJobQueue.enqueue(:scheduled_activities, __MODULE__, [:execute, scheduled_activity.id]) + %{"op" => "execute", "activity_id" => scheduled_activity.id} + |> Pleroma.Workers.ScheduledActivityWorker.new(worker_args(:scheduled_activities)) + |> Repo.insert() end) schedule_next() diff --git a/lib/pleroma/scheduler.ex b/lib/pleroma/scheduler.ex new file mode 100644 index 000000000..d84cd99ad --- /dev/null +++ b/lib/pleroma/scheduler.ex @@ -0,0 +1,7 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Scheduler do + use Quantum.Scheduler, otp_app: :pleroma +end diff --git a/lib/pleroma/user.ex b/lib/pleroma/user.ex index 134b8bb6c..61a3d5911 100644 --- a/lib/pleroma/user.ex +++ b/lib/pleroma/user.ex @@ -27,6 +27,7 @@ defmodule Pleroma.User do alias Pleroma.Web.OStatus alias Pleroma.Web.RelMe alias Pleroma.Web.Websub + alias Pleroma.Workers.BackgroundWorker require Logger @@ -40,6 +41,8 @@ defmodule Pleroma.User do @strict_local_nickname_regex ~r/^[a-zA-Z\d]+$/ @extended_local_nickname_regex ~r/^[a-zA-Z\d_-]+$/ + defdelegate worker_args(queue), to: Pleroma.Workers.Helper + schema "users" do field(:bio, :string) field(:email, :string) @@ -613,8 +616,11 @@ defmodule Pleroma.User do end @doc "Fetch some posts when the user has just been federated with" - def fetch_initial_posts(user), - do: PleromaJobQueue.enqueue(:background, __MODULE__, [:fetch_initial_posts, user]) + def fetch_initial_posts(user) do + %{"op" => "fetch_initial_posts", "user_id" => user.id} + |> BackgroundWorker.new(worker_args(:background)) + |> Repo.insert() + end @spec get_followers_query(User.t(), pos_integer() | nil) :: Ecto.Query.t() def get_followers_query(%User{} = user, nil) do @@ -1044,7 +1050,9 @@ defmodule Pleroma.User do end def deactivate_async(user, status \\ true) do - PleromaJobQueue.enqueue(:background, __MODULE__, [:deactivate_async, user, status]) + %{"op" => "deactivate_user", "user_id" => user.id, "status" => status} + |> BackgroundWorker.new(worker_args(:background)) + |> Repo.insert() end def deactivate(%User{} = user, status \\ true) do @@ -1072,9 +1080,11 @@ defmodule Pleroma.User do |> update_and_set_cache() end - @spec delete(User.t()) :: :ok - def delete(%User{} = user), - do: PleromaJobQueue.enqueue(:background, __MODULE__, [:delete, user]) + def delete(%User{} = user) do + %{"op" => "delete_user", "user_id" => user.id} + |> BackgroundWorker.new(worker_args(:background)) + |> Repo.insert() + end @spec perform(atom(), User.t()) :: {:ok, User.t()} def perform(:delete, %User{} = user) do @@ -1181,21 +1191,26 @@ defmodule Pleroma.User do Repo.all(query) end - def blocks_import(%User{} = blocker, blocked_identifiers) when is_list(blocked_identifiers), - do: - PleromaJobQueue.enqueue(:background, __MODULE__, [ - :blocks_import, - blocker, - blocked_identifiers - ]) - - def follow_import(%User{} = follower, followed_identifiers) when is_list(followed_identifiers), - do: - PleromaJobQueue.enqueue(:background, __MODULE__, [ - :follow_import, - follower, - followed_identifiers - ]) + def blocks_import(%User{} = blocker, blocked_identifiers) when is_list(blocked_identifiers) do + %{ + "op" => "blocks_import", + "blocker_id" => blocker.id, + "blocked_identifiers" => blocked_identifiers + } + |> BackgroundWorker.new(worker_args(:background)) + |> Repo.insert() + end + + def follow_import(%User{} = follower, followed_identifiers) + when is_list(followed_identifiers) do + %{ + "op" => "follow_import", + "follower_id" => follower.id, + "followed_identifiers" => followed_identifiers + } + |> BackgroundWorker.new(worker_args(:background)) + |> Repo.insert() + end def delete_user_activities(%User{ap_id: ap_id} = user) do ap_id diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index 172c952d4..07652fae4 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -17,6 +17,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do alias Pleroma.Web.ActivityPub.MRF alias Pleroma.Web.ActivityPub.Transmogrifier alias Pleroma.Web.WebFinger + alias Pleroma.Workers.BackgroundWorker import Ecto.Query import Pleroma.Web.ActivityPub.Utils @@ -25,6 +26,8 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do require Logger require Pleroma.Constants + defdelegate worker_args(queue), to: Pleroma.Workers.Helper + # For Announce activities, we filter the recipients based on following status for any actors # that match actual users. See issue #164 for more information about why this is necessary. defp get_recipients(%{"type" => "Announce"} = data) do @@ -145,7 +148,9 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do activity end - PleromaJobQueue.enqueue(:background, Pleroma.Web.RichMedia.Helpers, [:fetch, activity]) + %{"op" => "fetch_data_for_activity", "activity_id" => activity.id} + |> BackgroundWorker.new(worker_args(:background)) + |> Repo.insert() Notification.create_notifications(activity) diff --git a/lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex b/lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex index a179dd54d..b188164ee 100644 --- a/lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex +++ b/lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex @@ -7,7 +7,9 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do @behaviour Pleroma.Web.ActivityPub.MRF alias Pleroma.HTTP + alias Pleroma.Repo alias Pleroma.Web.MediaProxy + alias Pleroma.Workers.BackgroundWorker require Logger @@ -16,6 +18,8 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do recv_timeout: 10_000 ] + defdelegate worker_args(queue), to: Pleroma.Workers.Helper + def perform(:prefetch, url) do Logger.info("Prefetching #{inspect(url)}") @@ -30,7 +34,9 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do url |> Enum.each(fn %{"href" => href} -> - PleromaJobQueue.enqueue(:background, __MODULE__, [:prefetch, href]) + %{"op" => "media_proxy_prefetch", "url" => href} + |> BackgroundWorker.new(worker_args(:background)) + |> Repo.insert() x -> Logger.debug("Unhandled attachment URL object #{inspect(x)}") @@ -46,7 +52,9 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do %{"type" => "Create", "object" => %{"attachment" => attachments} = _object} = message ) when is_list(attachments) and length(attachments) > 0 do - PleromaJobQueue.enqueue(:background, __MODULE__, [:preload, message]) + %{"op" => "media_proxy_preload", "message" => message} + |> BackgroundWorker.new(worker_args(:background)) + |> Repo.insert() {:ok, message} end diff --git a/lib/pleroma/web/activity_pub/publisher.ex b/lib/pleroma/web/activity_pub/publisher.ex index 262529b84..03deec5f4 100644 --- a/lib/pleroma/web/activity_pub/publisher.ex +++ b/lib/pleroma/web/activity_pub/publisher.ex @@ -86,6 +86,15 @@ defmodule Pleroma.Web.ActivityPub.Publisher do end end + def publish_one(%{actor_id: actor_id} = params) do + actor = User.get_by_id(actor_id) + + params + |> Map.delete(:actor_id) + |> Map.put(:actor, actor) + |> publish_one() + end + defp should_federate?(inbox, public) do if public do true @@ -161,7 +170,8 @@ defmodule Pleroma.Web.ActivityPub.Publisher do Publishes an activity with BCC to all relevant peers. """ - def publish(actor, %{data: %{"bcc" => bcc}} = activity) when is_list(bcc) and bcc != [] do + def publish(%User{} = actor, %{data: %{"bcc" => bcc}} = activity) + when is_list(bcc) and bcc != [] do public = is_public?(activity) {:ok, data} = Transmogrifier.prepare_outgoing(activity.data) @@ -188,7 +198,7 @@ defmodule Pleroma.Web.ActivityPub.Publisher do Pleroma.Web.Federator.Publisher.enqueue_one(__MODULE__, %{ inbox: inbox, json: json, - actor: actor, + actor_id: actor.id, id: activity.data["id"], unreachable_since: unreachable_since }) @@ -223,7 +233,7 @@ defmodule Pleroma.Web.ActivityPub.Publisher do %{ inbox: inbox, json: json, - actor: actor, + actor_id: actor.id, id: activity.data["id"], unreachable_since: unreachable_since } diff --git a/lib/pleroma/web/activity_pub/transmogrifier.ex b/lib/pleroma/web/activity_pub/transmogrifier.ex index 36340a3a1..7a03914bb 100644 --- a/lib/pleroma/web/activity_pub/transmogrifier.ex +++ b/lib/pleroma/web/activity_pub/transmogrifier.ex @@ -15,12 +15,15 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do alias Pleroma.Web.ActivityPub.Utils alias Pleroma.Web.ActivityPub.Visibility alias Pleroma.Web.Federator + alias Pleroma.Workers.Transmogrifier, as: TransmogrifierWorker import Ecto.Query require Logger require Pleroma.Constants + defdelegate worker_args(queue), to: Pleroma.Workers.Helper + @doc """ Modifies an incoming AP object (mastodon format) to our internal format. """ @@ -1049,7 +1052,9 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do already_ap <- User.ap_enabled?(user), {:ok, user} <- user |> User.upgrade_changeset(data) |> User.update_and_set_cache() do unless already_ap do - PleromaJobQueue.enqueue(:transmogrifier, __MODULE__, [:user_upgrade, user]) + %{"op" => "user_upgrade", "user_id" => user.id} + |> TransmogrifierWorker.new(worker_args(:transmogrifier)) + |> Repo.insert() end {:ok, user} diff --git a/lib/pleroma/web/activity_pub/utils.ex b/lib/pleroma/web/activity_pub/utils.ex index 1c3058658..8502ca3be 100644 --- a/lib/pleroma/web/activity_pub/utils.ex +++ b/lib/pleroma/web/activity_pub/utils.ex @@ -168,14 +168,7 @@ defmodule Pleroma.Web.ActivityPub.Utils do """ def maybe_federate(%Activity{local: true} = activity) do if Pleroma.Config.get!([:instance, :federating]) do - priority = - case activity.data["type"] do - "Delete" -> 10 - "Create" -> 1 - _ -> 5 - end - - Pleroma.Web.Federator.publish(activity, priority) + Pleroma.Web.Federator.publish(activity) end :ok diff --git a/lib/pleroma/web/federator/federator.ex b/lib/pleroma/web/federator/federator.ex index f4f9e83e0..cf7e50fee 100644 --- a/lib/pleroma/web/federator/federator.ex +++ b/lib/pleroma/web/federator/federator.ex @@ -10,16 +10,19 @@ defmodule Pleroma.Web.Federator do alias Pleroma.Web.ActivityPub.Transmogrifier alias Pleroma.Web.ActivityPub.Utils alias Pleroma.Web.Federator.Publisher - alias Pleroma.Web.Federator.RetryQueue alias Pleroma.Web.OStatus alias Pleroma.Web.Websub + alias Pleroma.Workers.Publisher, as: PublisherWorker + alias Pleroma.Workers.Receiver, as: ReceiverWorker + alias Pleroma.Workers.Subscriber, as: SubscriberWorker require Logger + defdelegate worker_args(queue), to: Pleroma.Workers.Helper + def init do - # 1 minute - Process.sleep(1000 * 60) - refresh_subscriptions() + # To do: consider removing this call in favor of scheduled execution (`quantum`-based) + refresh_subscriptions(schedule_in: 60) end @doc "Addresses [memory leaks on recursive replies fetching](https://git.pleroma.social/pleroma/pleroma/issues/161)" @@ -37,50 +40,50 @@ defmodule Pleroma.Web.Federator do # Client API def incoming_doc(doc) do - PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_doc, doc]) + %{"op" => "incoming_doc", "body" => doc} + |> ReceiverWorker.new(worker_args(:federator_incoming)) + |> Pleroma.Repo.insert() end def incoming_ap_doc(params) do - PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_ap_doc, params]) + %{"op" => "incoming_ap_doc", "params" => params} + |> ReceiverWorker.new(worker_args(:federator_incoming)) + |> Pleroma.Repo.insert() end - def publish(activity, priority \\ 1) do - PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish, activity], priority) + def publish(%{id: "pleroma:fakeid"} = activity) do + perform(:publish, activity) end - def verify_websub(websub) do - PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:verify_websub, websub]) + def publish(activity) do + %{"op" => "publish", "activity_id" => activity.id} + |> PublisherWorker.new(worker_args(:federator_outgoing)) + |> Pleroma.Repo.insert() end - def request_subscription(sub) do - PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:request_subscription, sub]) + def verify_websub(websub) do + %{"op" => "verify_websub", "websub_id" => websub.id} + |> SubscriberWorker.new(worker_args(:federator_outgoing)) + |> Pleroma.Repo.insert() end - def refresh_subscriptions do - PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:refresh_subscriptions]) + def request_subscription(websub) do + %{"op" => "request_subscription", "websub_id" => websub.id} + |> SubscriberWorker.new(worker_args(:federator_outgoing)) + |> Pleroma.Repo.insert() end - # Job Worker Callbacks - - def perform(:refresh_subscriptions) do - Logger.debug("Federator running refresh subscriptions") - Websub.refresh_subscriptions() - - spawn(fn -> - # 6 hours - Process.sleep(1000 * 60 * 60 * 6) - refresh_subscriptions() - end) + def refresh_subscriptions(worker_args \\ []) do + %{"op" => "refresh_subscriptions"} + |> SubscriberWorker.new(worker_args ++ [max_attempts: 1] ++ worker_args(:federator_outgoing)) + |> Pleroma.Repo.insert() end - def perform(:request_subscription, websub) do - Logger.debug("Refreshing #{websub.topic}") + # Job Worker Callbacks - with {:ok, websub} <- Websub.request_subscription(websub) do - Logger.debug("Successfully refreshed #{websub.topic}") - else - _e -> Logger.debug("Couldn't refresh #{websub.topic}") - end + @spec perform(atom(), module(), any()) :: {:ok, any()} | {:error, any()} + def perform(:publish_one, module, params) do + apply(module, :publish_one, [params]) end def perform(:publish, activity) do @@ -92,14 +95,6 @@ defmodule Pleroma.Web.Federator do end end - def perform(:verify_websub, websub) do - Logger.debug(fn -> - "Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})" - end) - - Websub.verify(websub) - end - def perform(:incoming_doc, doc) do Logger.info("Got document, trying to parse") OStatus.handle_incoming(doc) @@ -130,22 +125,27 @@ defmodule Pleroma.Web.Federator do end end - def perform( - :publish_single_websub, - %{xml: _xml, topic: _topic, callback: _callback, secret: _secret} = params - ) do - case Websub.publish_one(params) do - {:ok, _} -> - :ok + def perform(:request_subscription, websub) do + Logger.debug("Refreshing #{websub.topic}") - {:error, _} -> - RetryQueue.enqueue(params, Websub) + with {:ok, websub} <- Websub.request_subscription(websub) do + Logger.debug("Successfully refreshed #{websub.topic}") + else + _e -> Logger.debug("Couldn't refresh #{websub.topic}") end end - def perform(type, _) do - Logger.debug(fn -> "Unknown task: #{type}" end) - {:error, "Don't know what to do with this"} + def perform(:verify_websub, websub) do + Logger.debug(fn -> + "Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})" + end) + + Websub.verify(websub) + end + + def perform(:refresh_subscriptions) do + Logger.debug("Federator running refresh subscriptions") + Websub.refresh_subscriptions() end def ap_enabled_actor(id) do diff --git a/lib/pleroma/web/federator/publisher.ex b/lib/pleroma/web/federator/publisher.ex index 70f870244..05d2be615 100644 --- a/lib/pleroma/web/federator/publisher.ex +++ b/lib/pleroma/web/federator/publisher.ex @@ -6,7 +6,7 @@ defmodule Pleroma.Web.Federator.Publisher do alias Pleroma.Activity alias Pleroma.Config alias Pleroma.User - alias Pleroma.Web.Federator.RetryQueue + alias Pleroma.Workers.Publisher, as: PublisherWorker require Logger @@ -30,23 +30,17 @@ defmodule Pleroma.Web.Federator.Publisher do Enqueue publishing a single activity. """ @spec enqueue_one(module(), Map.t()) :: :ok - def enqueue_one(module, %{} = params), - do: PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish_one, module, params]) - - @spec perform(atom(), module(), any()) :: {:ok, any()} | {:error, any()} - def perform(:publish_one, module, params) do - case apply(module, :publish_one, [params]) do - {:ok, _} -> - :ok - - {:error, _e} -> - RetryQueue.enqueue(params, module) - end - end + def enqueue_one(module, %{} = params) do + worker_args = + if max_attempts = Pleroma.Config.get([:workers, :retries, :federator_outgoing]) do + [max_attempts: max_attempts] + else + [] + end - def perform(type, _, _) do - Logger.debug("Unknown task: #{type}") - {:error, "Don't know what to do with this"} + %{"op" => "publish_one", "module" => to_string(module), "params" => params} + |> PublisherWorker.new(worker_args) + |> Pleroma.Repo.insert() end @doc """ diff --git a/lib/pleroma/web/federator/retry_queue.ex b/lib/pleroma/web/federator/retry_queue.ex deleted file mode 100644 index 9eab8c218..000000000 --- a/lib/pleroma/web/federator/retry_queue.ex +++ /dev/null @@ -1,239 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Web.Federator.RetryQueue do - use GenServer - - require Logger - - def init(args) do - queue_table = :ets.new(:pleroma_retry_queue, [:bag, :protected]) - - {:ok, %{args | queue_table: queue_table, running_jobs: :sets.new()}} - end - - def start_link(_) do - enabled = - if Pleroma.Config.get(:env) == :test, - do: true, - else: Pleroma.Config.get([__MODULE__, :enabled], false) - - if enabled do - Logger.info("Starting retry queue") - - linkres = - GenServer.start_link( - __MODULE__, - %{delivered: 0, dropped: 0, queue_table: nil, running_jobs: nil}, - name: __MODULE__ - ) - - maybe_kickoff_timer() - linkres - else - Logger.info("Retry queue disabled") - :ignore - end - end - - def enqueue(data, transport, retries \\ 0) do - GenServer.cast(__MODULE__, {:maybe_enqueue, data, transport, retries + 1}) - end - - def get_stats do - GenServer.call(__MODULE__, :get_stats) - end - - def reset_stats do - GenServer.call(__MODULE__, :reset_stats) - end - - def get_retry_params(retries) do - if retries > Pleroma.Config.get([__MODULE__, :max_retries]) do - {:drop, "Max retries reached"} - else - {:retry, growth_function(retries)} - end - end - - def get_retry_timer_interval do - Pleroma.Config.get([:retry_queue, :interval], 1000) - end - - defp ets_count_expires(table, current_time) do - :ets.select_count( - table, - [ - { - {:"$1", :"$2"}, - [{:"=<", :"$1", {:const, current_time}}], - [true] - } - ] - ) - end - - defp ets_pop_n_expired(table, current_time, desired) do - {popped, _continuation} = - :ets.select( - table, - [ - { - {:"$1", :"$2"}, - [{:"=<", :"$1", {:const, current_time}}], - [:"$_"] - } - ], - desired - ) - - popped - |> Enum.each(fn e -> - :ets.delete_object(table, e) - end) - - popped - end - - def maybe_start_job(running_jobs, queue_table) do - # we don't want to hit the ets or the DateTime more times than we have to - # could optimize slightly further by not using the count, and instead grabbing - # up to N objects early... - current_time = DateTime.to_unix(DateTime.utc_now()) - n_running_jobs = :sets.size(running_jobs) - - if n_running_jobs < Pleroma.Config.get([__MODULE__, :max_jobs]) do - n_ready_jobs = ets_count_expires(queue_table, current_time) - - if n_ready_jobs > 0 do - # figure out how many we could start - available_job_slots = Pleroma.Config.get([__MODULE__, :max_jobs]) - n_running_jobs - start_n_jobs(running_jobs, queue_table, current_time, available_job_slots) - else - running_jobs - end - else - running_jobs - end - end - - defp start_n_jobs(running_jobs, _queue_table, _current_time, 0) do - running_jobs - end - - defp start_n_jobs(running_jobs, queue_table, current_time, available_job_slots) - when available_job_slots > 0 do - candidates = ets_pop_n_expired(queue_table, current_time, available_job_slots) - - candidates - |> List.foldl(running_jobs, fn {_, e}, rj -> - {:ok, pid} = Task.start(fn -> worker(e) end) - mref = Process.monitor(pid) - :sets.add_element(mref, rj) - end) - end - - def worker({:send, data, transport, retries}) do - case transport.publish_one(data) do - {:ok, _} -> - GenServer.cast(__MODULE__, :inc_delivered) - :delivered - - {:error, _reason} -> - enqueue(data, transport, retries) - :retry - end - end - - def handle_call(:get_stats, _from, %{delivered: delivery_count, dropped: drop_count} = state) do - {:reply, %{delivered: delivery_count, dropped: drop_count}, state} - end - - def handle_call(:reset_stats, _from, %{delivered: delivery_count, dropped: drop_count} = state) do - {:reply, %{delivered: delivery_count, dropped: drop_count}, - %{state | delivered: 0, dropped: 0}} - end - - def handle_cast(:reset_stats, state) do - {:noreply, %{state | delivered: 0, dropped: 0}} - end - - def handle_cast( - {:maybe_enqueue, data, transport, retries}, - %{dropped: drop_count, queue_table: queue_table, running_jobs: running_jobs} = state - ) do - case get_retry_params(retries) do - {:retry, timeout} -> - :ets.insert(queue_table, {timeout, {:send, data, transport, retries}}) - running_jobs = maybe_start_job(running_jobs, queue_table) - {:noreply, %{state | running_jobs: running_jobs}} - - {:drop, message} -> - Logger.debug(message) - {:noreply, %{state | dropped: drop_count + 1}} - end - end - - def handle_cast(:kickoff_timer, state) do - retry_interval = get_retry_timer_interval() - Process.send_after(__MODULE__, :retry_timer_run, retry_interval) - {:noreply, state} - end - - def handle_cast(:inc_delivered, %{delivered: delivery_count} = state) do - {:noreply, %{state | delivered: delivery_count + 1}} - end - - def handle_cast(:inc_dropped, %{dropped: drop_count} = state) do - {:noreply, %{state | dropped: drop_count + 1}} - end - - def handle_info({:send, data, transport, retries}, %{delivered: delivery_count} = state) do - case transport.publish_one(data) do - {:ok, _} -> - {:noreply, %{state | delivered: delivery_count + 1}} - - {:error, _reason} -> - enqueue(data, transport, retries) - {:noreply, state} - end - end - - def handle_info( - :retry_timer_run, - %{queue_table: queue_table, running_jobs: running_jobs} = state - ) do - maybe_kickoff_timer() - running_jobs = maybe_start_job(running_jobs, queue_table) - {:noreply, %{state | running_jobs: running_jobs}} - end - - def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do - %{running_jobs: running_jobs, queue_table: queue_table} = state - running_jobs = :sets.del_element(ref, running_jobs) - running_jobs = maybe_start_job(running_jobs, queue_table) - {:noreply, %{state | running_jobs: running_jobs}} - end - - def handle_info(unknown, state) do - Logger.debug("RetryQueue: don't know what to do with #{inspect(unknown)}, ignoring") - {:noreply, state} - end - - if Pleroma.Config.get(:env) == :test do - defp growth_function(_retries) do - _shutit = Pleroma.Config.get([__MODULE__, :initial_timeout]) - DateTime.to_unix(DateTime.utc_now()) - 1 - end - else - defp growth_function(retries) do - round(Pleroma.Config.get([__MODULE__, :initial_timeout]) * :math.pow(retries, 3)) + - DateTime.to_unix(DateTime.utc_now()) - end - end - - defp maybe_kickoff_timer do - GenServer.cast(__MODULE__, :kickoff_timer) - end -end diff --git a/lib/pleroma/web/oauth/token/clean_worker.ex b/lib/pleroma/web/oauth/token/clean_worker.ex index f50098302..943e73289 100644 --- a/lib/pleroma/web/oauth/token/clean_worker.ex +++ b/lib/pleroma/web/oauth/token/clean_worker.ex @@ -16,7 +16,11 @@ defmodule Pleroma.Web.OAuth.Token.CleanWorker do @one_day ) + alias Pleroma.Repo alias Pleroma.Web.OAuth.Token + alias Pleroma.Workers.BackgroundWorker + + defdelegate worker_args(queue), to: Pleroma.Workers.Helper def start_link(_), do: GenServer.start_link(__MODULE__, %{}) @@ -27,9 +31,13 @@ defmodule Pleroma.Web.OAuth.Token.CleanWorker do @doc false def handle_info(:perform, state) do - Token.delete_expired_tokens() + %{"op" => "clean_expired_tokens"} + |> BackgroundWorker.new(worker_args(:background)) + |> Repo.insert() Process.send_after(self(), :perform, @interval) {:noreply, state} end + + def perform(:clean), do: Token.delete_expired_tokens() end diff --git a/lib/pleroma/web/push/push.ex b/lib/pleroma/web/push/push.ex index 729dad02a..b4f0e5127 100644 --- a/lib/pleroma/web/push/push.ex +++ b/lib/pleroma/web/push/push.ex @@ -3,10 +3,13 @@ # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Web.Push do - alias Pleroma.Web.Push.Impl + alias Pleroma.Repo + alias Pleroma.Workers.WebPusher require Logger + defdelegate worker_args(queue), to: Pleroma.Workers.Helper + def init do unless enabled() do Logger.warn(""" @@ -31,6 +34,9 @@ defmodule Pleroma.Web.Push do end end - def send(notification), - do: PleromaJobQueue.enqueue(:web_push, Impl, [notification]) + def send(notification) do + %{"op" => "web_push", "notification_id" => notification.id} + |> WebPusher.new(worker_args(:web_push)) + |> Repo.insert() + end end diff --git a/lib/pleroma/web/salmon/salmon.ex b/lib/pleroma/web/salmon/salmon.ex index 9b01ebcc6..bbaa293fd 100644 --- a/lib/pleroma/web/salmon/salmon.ex +++ b/lib/pleroma/web/salmon/salmon.ex @@ -170,6 +170,15 @@ defmodule Pleroma.Web.Salmon do end end + def publish_one(%{recipient_id: recipient_id} = params) do + recipient = User.get_by_id(recipient_id) + + params + |> Map.delete(:recipient_id) + |> Map.put(:recipient, recipient) + |> publish_one() + end + def publish_one(_), do: :noop @supported_activities [ @@ -218,7 +227,7 @@ defmodule Pleroma.Web.Salmon do Logger.debug(fn -> "Sending Salmon to #{remote_user.ap_id}" end) Publisher.enqueue_one(__MODULE__, %{ - recipient: remote_user, + recipient_id: remote_user.id, feed: feed, unreachable_since: reachable_urls_metadata[remote_user.info.salmon] }) diff --git a/lib/pleroma/web/twitter_api/controllers/util_controller.ex b/lib/pleroma/web/twitter_api/controllers/util_controller.ex index 3405bd3b7..7ba4ad305 100644 --- a/lib/pleroma/web/twitter_api/controllers/util_controller.ex +++ b/lib/pleroma/web/twitter_api/controllers/util_controller.ex @@ -265,12 +265,7 @@ defmodule Pleroma.Web.TwitterAPI.UtilController do String.split(line, ",") |> List.first() end) |> List.delete("Account address") do - PleromaJobQueue.enqueue(:background, User, [ - :follow_import, - follower, - followed_identifiers - ]) - + User.follow_import(follower, followed_identifiers) json(conn, "job started") end end @@ -281,12 +276,7 @@ defmodule Pleroma.Web.TwitterAPI.UtilController do def blocks_import(%{assigns: %{user: blocker}} = conn, %{"list" => list}) do with blocked_identifiers <- String.split(list) do - PleromaJobQueue.enqueue(:background, User, [ - :blocks_import, - blocker, - blocked_identifiers - ]) - + User.blocks_import(blocker, blocked_identifiers) json(conn, "job started") end end diff --git a/lib/pleroma/workers/background_worker.ex b/lib/pleroma/workers/background_worker.ex new file mode 100644 index 000000000..3c021b9b4 --- /dev/null +++ b/lib/pleroma/workers/background_worker.ex @@ -0,0 +1,72 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.BackgroundWorker do + alias Pleroma.Activity + alias Pleroma.User + alias Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy + alias Pleroma.Web.OAuth.Token.CleanWorker + + # Note: `max_attempts` is intended to be overridden in `new/1` call + use Oban.Worker, + queue: "background", + max_attempts: 1 + + @impl Oban.Worker + def perform(%{"op" => "fetch_initial_posts", "user_id" => user_id}, _job) do + user = User.get_by_id(user_id) + User.perform(:fetch_initial_posts, user) + end + + def perform(%{"op" => "deactivate_user", "user_id" => user_id, "status" => status}, _job) do + user = User.get_by_id(user_id) + User.perform(:deactivate_async, user, status) + end + + def perform(%{"op" => "delete_user", "user_id" => user_id}, _job) do + user = User.get_by_id(user_id) + User.perform(:delete, user) + end + + def perform( + %{ + "op" => "blocks_import", + "blocker_id" => blocker_id, + "blocked_identifiers" => blocked_identifiers + }, + _job + ) do + blocker = User.get_by_id(blocker_id) + User.perform(:blocks_import, blocker, blocked_identifiers) + end + + def perform( + %{ + "op" => "follow_import", + "follower_id" => follower_id, + "followed_identifiers" => followed_identifiers + }, + _job + ) do + follower = User.get_by_id(follower_id) + User.perform(:follow_import, follower, followed_identifiers) + end + + def perform(%{"op" => "clean_expired_tokens"}, _job) do + CleanWorker.perform(:clean) + end + + def perform(%{"op" => "media_proxy_preload", "message" => message}, _job) do + MediaProxyWarmingPolicy.perform(:preload, message) + end + + def perform(%{"op" => "media_proxy_prefetch", "url" => url}, _job) do + MediaProxyWarmingPolicy.perform(:prefetch, url) + end + + def perform(%{"op" => "fetch_data_for_activity", "activity_id" => activity_id}, _job) do + activity = Activity.get_by_id(activity_id) + Pleroma.Web.RichMedia.Helpers.perform(:fetch, activity) + end +end diff --git a/lib/pleroma/workers/helper.ex b/lib/pleroma/workers/helper.ex new file mode 100644 index 000000000..3286ce0e8 --- /dev/null +++ b/lib/pleroma/workers/helper.ex @@ -0,0 +1,13 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.Helper do + def worker_args(queue) do + if max_attempts = Pleroma.Config.get([:workers, :retries, queue]) do + [max_attempts: max_attempts] + else + [] + end + end +end diff --git a/lib/pleroma/workers/mailer.ex b/lib/pleroma/workers/mailer.ex new file mode 100644 index 000000000..1cce2ea03 --- /dev/null +++ b/lib/pleroma/workers/mailer.ex @@ -0,0 +1,27 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.Mailer do + alias Pleroma.User + + # Note: `max_attempts` is intended to be overridden in `new/1` call + use Oban.Worker, + queue: "mailer", + max_attempts: 1 + + @impl Oban.Worker + def perform(%{"op" => "email", "encoded_email" => encoded_email, "config" => config}, _job) do + email = + encoded_email + |> Base.decode64!() + |> :erlang.binary_to_term() + + Pleroma.Emails.Mailer.deliver(email, config) + end + + def perform(%{"op" => "digest_email", "user_id" => user_id}, _job) do + user = User.get_by_id(user_id) + Pleroma.DigestEmailWorker.perform(user) + end +end diff --git a/lib/pleroma/workers/publisher.ex b/lib/pleroma/workers/publisher.ex new file mode 100644 index 000000000..00fae99c7 --- /dev/null +++ b/lib/pleroma/workers/publisher.ex @@ -0,0 +1,24 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.Publisher do + alias Pleroma.Activity + alias Pleroma.Web.Federator + + # Note: `max_attempts` is intended to be overridden in `new/1` call + use Oban.Worker, + queue: "federator_outgoing", + max_attempts: 1 + + @impl Oban.Worker + def perform(%{"op" => "publish", "activity_id" => activity_id}, _job) do + activity = Activity.get_by_id(activity_id) + Federator.perform(:publish, activity) + end + + def perform(%{"op" => "publish_one", "module" => module_name, "params" => params}, _job) do + params = Map.new(params, fn {k, v} -> {String.to_atom(k), v} end) + Federator.perform(:publish_one, String.to_atom(module_name), params) + end +end diff --git a/lib/pleroma/workers/receiver.ex b/lib/pleroma/workers/receiver.ex new file mode 100644 index 000000000..4ee270d74 --- /dev/null +++ b/lib/pleroma/workers/receiver.ex @@ -0,0 +1,21 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.Receiver do + alias Pleroma.Web.Federator + + # Note: `max_attempts` is intended to be overridden in `new/1` call + use Oban.Worker, + queue: "federator_incoming", + max_attempts: 1 + + @impl Oban.Worker + def perform(%{"op" => "incoming_doc", "body" => doc}, _job) do + Federator.perform(:incoming_doc, doc) + end + + def perform(%{"op" => "incoming_ap_doc", "params" => params}, _job) do + Federator.perform(:incoming_ap_doc, params) + end +end diff --git a/lib/pleroma/workers/scheduled_activity_worker.ex b/lib/pleroma/workers/scheduled_activity_worker.ex new file mode 100644 index 000000000..d9724c78a --- /dev/null +++ b/lib/pleroma/workers/scheduled_activity_worker.ex @@ -0,0 +1,15 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.ScheduledActivityWorker do + # Note: `max_attempts` is intended to be overridden in `new/1` call + use Oban.Worker, + queue: "scheduled_activities", + max_attempts: 1 + + @impl Oban.Worker + def perform(%{"op" => "execute", "activity_id" => activity_id}, _job) do + Pleroma.ScheduledActivityWorker.perform(:execute, activity_id) + end +end diff --git a/lib/pleroma/workers/subscriber.ex b/lib/pleroma/workers/subscriber.ex new file mode 100644 index 000000000..783c44173 --- /dev/null +++ b/lib/pleroma/workers/subscriber.ex @@ -0,0 +1,29 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.Subscriber do + alias Pleroma.Repo + alias Pleroma.Web.Federator + alias Pleroma.Web.Websub.WebsubClientSubscription + + # Note: `max_attempts` is intended to be overridden in `new/1` call + use Oban.Worker, + queue: "federator_outgoing", + max_attempts: 1 + + @impl Oban.Worker + def perform(%{"op" => "refresh_subscriptions"}, _job) do + Federator.perform(:refresh_subscriptions) + end + + def perform(%{"op" => "request_subscription", "websub_id" => websub_id}, _job) do + websub = Repo.get(WebsubClientSubscription, websub_id) + Federator.perform(:request_subscription, websub) + end + + def perform(%{"op" => "verify_websub", "websub_id" => websub_id}, _job) do + websub = Repo.get(WebsubClientSubscription, websub_id) + Federator.perform(:verify_websub, websub) + end +end diff --git a/lib/pleroma/workers/transmogrifier.ex b/lib/pleroma/workers/transmogrifier.ex new file mode 100644 index 000000000..e13202c06 --- /dev/null +++ b/lib/pleroma/workers/transmogrifier.ex @@ -0,0 +1,18 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.Transmogrifier do + alias Pleroma.User + + # Note: `max_attempts` is intended to be overridden in `new/1` call + use Oban.Worker, + queue: "transmogrifier", + max_attempts: 1 + + @impl Oban.Worker + def perform(%{"op" => "user_upgrade", "user_id" => user_id}, _job) do + user = User.get_by_id(user_id) + Pleroma.Web.ActivityPub.Transmogrifier.perform(:user_upgrade, user) + end +end diff --git a/lib/pleroma/workers/web_pusher.ex b/lib/pleroma/workers/web_pusher.ex new file mode 100644 index 000000000..7b78bb3ea --- /dev/null +++ b/lib/pleroma/workers/web_pusher.ex @@ -0,0 +1,19 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.WebPusher do + alias Pleroma.Notification + alias Pleroma.Repo + + # Note: `max_attempts` is intended to be overridden in `new/1` call + use Oban.Worker, + queue: "web_push", + max_attempts: 1 + + @impl Oban.Worker + def perform(%{"op" => "web_push", "notification_id" => notification_id}, _job) do + notification = Repo.get(Notification, notification_id) + Pleroma.Web.Push.Impl.perform(notification) + end +end @@ -101,6 +101,8 @@ defmodule Pleroma.Mixfile do {:phoenix_ecto, "~> 4.0"}, {:ecto_sql, "~> 3.1"}, {:postgrex, ">= 0.13.5"}, + {:oban, "~> 0.7"}, + {:quantum, "~> 2.3"}, {:gettext, "~> 0.15"}, {:comeonin, "~> 4.1.1"}, {:pbkdf2_elixir, "~> 0.12.3"}, @@ -141,7 +143,6 @@ defmodule Pleroma.Mixfile do {:http_signatures, git: "https://git.pleroma.social/pleroma/http_signatures.git", ref: "293d77bb6f4a67ac8bde1428735c3b42f22cbb30"}, - {:pleroma_job_queue, "~> 0.3"}, {:telemetry, "~> 0.3"}, {:prometheus_ex, "~> 3.0"}, {:prometheus_plugs, "~> 1.1"}, @@ -17,12 +17,12 @@ "credo": {:hex, :credo, "0.9.3", "76fa3e9e497ab282e0cf64b98a624aa11da702854c52c82db1bf24e54ab7c97a", [:mix], [{:bunt, "~> 0.2.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:poison, ">= 0.0.0", [hex: :poison, repo: "hexpm", optional: false]}], "hexpm"}, "crontab": {:hex, :crontab, "1.1.7", "b9219f0bdc8678b94143655a8f229716c5810c0636a4489f98c0956137e53985", [:mix], [{:ecto, "~> 1.0 or ~> 2.0 or ~> 3.0", [hex: :ecto, repo: "hexpm", optional: true]}], "hexpm"}, "crypt": {:git, "https://github.com/msantos/crypt", "1f2b58927ab57e72910191a7ebaeff984382a1d3", [ref: "1f2b58927ab57e72910191a7ebaeff984382a1d3"]}, - "db_connection": {:hex, :db_connection, "2.0.6", "bde2f85d047969c5b5800cb8f4b3ed6316c8cb11487afedac4aa5f93fd39abfa", [:mix], [{:connection, "~> 1.0.2", [hex: :connection, repo: "hexpm", optional: false]}], "hexpm"}, + "db_connection": {:hex, :db_connection, "2.1.1", "a51e8a2ee54ef2ae6ec41a668c85787ed40cb8944928c191280fe34c15b76ae5", [:mix], [{:connection, "~> 1.0.2", [hex: :connection, repo: "hexpm", optional: false]}], "hexpm"}, "decimal": {:hex, :decimal, "1.8.0", "ca462e0d885f09a1c5a342dbd7c1dcf27ea63548c65a65e67334f4b61803822e", [:mix], [], "hexpm"}, "deep_merge": {:hex, :deep_merge, "1.0.0", "b4aa1a0d1acac393bdf38b2291af38cb1d4a52806cf7a4906f718e1feb5ee961", [:mix], [], "hexpm"}, "earmark": {:hex, :earmark, "1.3.2", "b840562ea3d67795ffbb5bd88940b1bed0ed9fa32834915125ea7d02e35888a5", [:mix], [], "hexpm"}, - "ecto": {:hex, :ecto, "3.1.4", "69d852da7a9f04ede725855a35ede48d158ca11a404fe94f8b2fb3b2162cd3c9", [:mix], [{:decimal, "~> 1.6", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm"}, - "ecto_sql": {:hex, :ecto_sql, "3.1.3", "2c536139190492d9de33c5fefac7323c5eaaa82e1b9bf93482a14649042f7cd9", [:mix], [{:db_connection, "~> 2.0", [hex: :db_connection, repo: "hexpm", optional: false]}, {:ecto, "~> 3.1.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:mariaex, "~> 0.9.1", [hex: :mariaex, repo: "hexpm", optional: true]}, {:myxql, "~> 0.2.0", [hex: :myxql, repo: "hexpm", optional: true]}, {:postgrex, "~> 0.14.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm"}, + "ecto": {:hex, :ecto, "3.1.7", "fa21d06ef56cdc2fdaa62574e8c3ba34a2751d44ea34c30bc65f0728421043e5", [:mix], [{:decimal, "~> 1.6", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm"}, + "ecto_sql": {:hex, :ecto_sql, "3.1.6", "1e80e30d16138a729c717f73dcb938590bcdb3a4502f3012414d0cbb261045d8", [:mix], [{:db_connection, "~> 2.0", [hex: :db_connection, repo: "hexpm", optional: false]}, {:ecto, "~> 3.1.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:mariaex, "~> 0.9.1", [hex: :mariaex, repo: "hexpm", optional: true]}, {:myxql, "~> 0.2.0", [hex: :myxql, repo: "hexpm", optional: true]}, {:postgrex, "~> 0.14.0 or ~> 0.15.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm"}, "esshd": {:hex, :esshd, "0.1.0", "6f93a2062adb43637edad0ea7357db2702a4b80dd9683482fe00f5134e97f4c1", [:mix], [], "hexpm"}, "eternal": {:hex, :eternal, "1.2.0", "e2a6b6ce3b8c248f7dc31451aefca57e3bdf0e48d73ae5043229380a67614c41", [:mix], [], "hexpm"}, "ex2ms": {:hex, :ex2ms, "1.5.0", "19e27f9212be9a96093fed8cdfbef0a2b56c21237196d26760f11dfcfae58e97", [:mix], [], "hexpm"}, @@ -36,6 +36,8 @@ "excoveralls": {:hex, :excoveralls, "0.11.1", "dd677fbdd49114fdbdbf445540ec735808250d56b011077798316505064edb2c", [:mix], [{:hackney, "~> 1.0", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm"}, "floki": {:hex, :floki, "0.20.4", "be42ac911fece24b4c72f3b5846774b6e61b83fe685c2fc9d62093277fb3bc86", [:mix], [{:html_entities, "~> 0.4.0", [hex: :html_entities, repo: "hexpm", optional: false]}, {:mochiweb, "~> 2.15", [hex: :mochiweb, repo: "hexpm", optional: false]}], "hexpm"}, "gen_smtp": {:hex, :gen_smtp, "0.14.0", "39846a03522456077c6429b4badfd1d55e5e7d0fdfb65e935b7c5e38549d9202", [:rebar3], [], "hexpm"}, + "gen_stage": {:hex, :gen_stage, "0.14.2", "6a2a578a510c5bfca8a45e6b27552f613b41cf584b58210f017088d3d17d0b14", [:mix], [], "hexpm"}, + "gen_state_machine": {:hex, :gen_state_machine, "2.0.5", "9ac15ec6e66acac994cc442dcc2c6f9796cf380ec4b08267223014be1c728a95", [:mix], [], "hexpm"}, "gettext": {:hex, :gettext, "0.17.0", "abe21542c831887a2b16f4c94556db9c421ab301aee417b7c4fbde7fbdbe01ec", [:mix], [], "hexpm"}, "hackney": {:hex, :hackney, "1.15.1", "9f8f471c844b8ce395f7b6d8398139e26ddca9ebc171a8b91342ee15a19963f4", [:rebar3], [{:certifi, "2.5.1", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "6.0.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "1.0.1", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~>1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "1.1.4", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm"}, "html_entities": {:hex, :html_entities, "0.4.0", "f2fee876858cf6aaa9db608820a3209e45a087c5177332799592142b50e89a6b", [:mix], [], "hexpm"}, @@ -46,6 +48,7 @@ "jason": {:hex, :jason, "1.1.2", "b03dedea67a99223a2eaf9f1264ce37154564de899fd3d8b9a21b1a6fd64afe7", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm"}, "joken": {:hex, :joken, "2.0.1", "ec9ab31bf660f343380da033b3316855197c8d4c6ef597fa3fcb451b326beb14", [:mix], [{:jose, "~> 1.9", [hex: :jose, repo: "hexpm", optional: false]}], "hexpm"}, "jose": {:hex, :jose, "1.9.0", "4167c5f6d06ffaebffd15cdb8da61a108445ef5e85ab8f5a7ad926fdf3ada154", [:mix, :rebar3], [{:base64url, "~> 0.0.1", [hex: :base64url, repo: "hexpm", optional: false]}], "hexpm"}, + "libring": {:hex, :libring, "1.4.0", "41246ba2f3fbc76b3971f6bce83119dfec1eee17e977a48d8a9cfaaf58c2a8d6", [:mix], [], "hexpm"}, "makeup": {:hex, :makeup, "0.8.0", "9cf32aea71c7fe0a4b2e9246c2c4978f9070257e5c9ce6d4a28ec450a839b55f", [:mix], [{:nimble_parsec, "~> 0.5.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm"}, "makeup_elixir": {:hex, :makeup_elixir, "0.13.0", "be7a477997dcac2e48a9d695ec730b2d22418292675c75aa2d34ba0909dcdeda", [:mix], [{:makeup, "~> 0.8", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm"}, "meck": {:hex, :meck, "0.8.13", "ffedb39f99b0b99703b8601c6f17c7f76313ee12de6b646e671e3188401f7866", [:rebar3], [], "hexpm"}, @@ -57,6 +60,7 @@ "mogrify": {:hex, :mogrify, "0.6.1", "de1b527514f2d95a7bbe9642eb556061afb337e220cf97adbf3a4e6438ed70af", [:mix], [], "hexpm"}, "mox": {:hex, :mox, "0.5.1", "f86bb36026aac1e6f924a4b6d024b05e9adbed5c63e8daa069bd66fb3292165b", [:mix], [], "hexpm"}, "nimble_parsec": {:hex, :nimble_parsec, "0.5.0", "90e2eca3d0266e5c53f8fbe0079694740b9c91b6747f2b7e3c5d21966bba8300", [:mix], [], "hexpm"}, + "oban": {:hex, :oban, "0.7.1", "171bdd1b69c1a4a839f8c768f5e962fc22d1de1513d459fb6b8e0cbd34817a9a", [:mix], [{:ecto_sql, "~> 3.1", [hex: :ecto_sql, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:postgrex, "~> 0.14", [hex: :postgrex, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm"}, "parse_trans": {:hex, :parse_trans, "3.3.0", "09765507a3c7590a784615cfd421d101aec25098d50b89d7aa1d66646bc571c1", [:rebar3], [], "hexpm"}, "pbkdf2_elixir": {:hex, :pbkdf2_elixir, "0.12.3", "6706a148809a29c306062862c803406e88f048277f6e85b68faf73291e820b84", [:mix], [], "hexpm"}, "phoenix": {:hex, :phoenix, "1.4.9", "746d098e10741c334d88143d3c94cab1756435f94387a63441792e66ec0ee974", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 1.1", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:plug, "~> 1.8.1 or ~> 1.9", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 1.0 or ~> 2.0", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm"}, @@ -64,22 +68,23 @@ "phoenix_html": {:hex, :phoenix_html, "2.13.1", "fa8f034b5328e2dfa0e4131b5569379003f34bc1fafdaa84985b0b9d2f12e68b", [:mix], [{:plug, "~> 1.5", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm"}, "phoenix_pubsub": {:hex, :phoenix_pubsub, "1.1.2", "496c303bdf1b2e98a9d26e89af5bba3ab487ba3a3735f74bf1f4064d2a845a3e", [:mix], [], "hexpm"}, "phoenix_swoosh": {:hex, :phoenix_swoosh, "0.2.0", "a7e0b32077cd6d2323ae15198839b05d9caddfa20663fd85787479e81f89520e", [:mix], [{:phoenix, "~> 1.0", [hex: :phoenix, repo: "hexpm", optional: false]}, {:phoenix_html, "~> 2.2", [hex: :phoenix_html, repo: "hexpm", optional: false]}, {:swoosh, "~> 0.1", [hex: :swoosh, repo: "hexpm", optional: false]}], "hexpm"}, - "pleroma_job_queue": {:hex, :pleroma_job_queue, "0.3.0", "b84538d621f0c3d6fcc1cff9d5648d3faaf873b8b21b94e6503428a07a48ec47", [:mix], [{:crontab, "~> 1.1", [hex: :crontab, repo: "hexpm", optional: false]}], "hexpm"}, "plug": {:hex, :plug, "1.8.2", "0bcce1daa420f189a6491f3940cc77ea7fb1919761175c9c3b59800d897440fc", [:mix], [{:mime, "~> 1.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: true]}], "hexpm"}, "plug_cowboy": {:hex, :plug_cowboy, "2.1.0", "b75768153c3a8a9e8039d4b25bb9b14efbc58e9c4a6e6a270abff1cd30cbe320", [:mix], [{:cowboy, "~> 2.5", [hex: :cowboy, repo: "hexpm", optional: false]}, {:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm"}, "plug_crypto": {:hex, :plug_crypto, "1.0.0", "18e49317d3fa343f24620ed22795ec29d4a5e602d52d1513ccea0b07d8ea7d4d", [:mix], [], "hexpm"}, "plug_static_index_html": {:hex, :plug_static_index_html, "1.0.0", "840123d4d3975585133485ea86af73cb2600afd7f2a976f9f5fd8b3808e636a0", [:mix], [{:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm"}, "poison": {:hex, :poison, "3.1.0", "d9eb636610e096f86f25d9a46f35a9facac35609a7591b3be3326e99a0484665", [:mix], [], "hexpm"}, - "postgrex": {:hex, :postgrex, "0.14.3", "5754dee2fdf6e9e508cbf49ab138df964278700b764177e8f3871e658b345a1e", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}, {:db_connection, "~> 2.0", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm"}, + "postgrex": {:hex, :postgrex, "0.15.0", "dd5349161019caeea93efa42f9b22f9d79995c3a86bdffb796427b4c9863b0f0", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}, {:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm"}, "prometheus": {:hex, :prometheus, "4.4.1", "1e96073b3ed7788053768fea779cbc896ddc3bdd9ba60687f2ad50b252ac87d6", [:mix, :rebar3], [], "hexpm"}, "prometheus_ecto": {:hex, :prometheus_ecto, "1.4.1", "6c768ea9654de871e5b32fab2eac348467b3021604ebebbcbd8bcbe806a65ed5", [:mix], [{:ecto, "~> 2.0 or ~> 3.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:prometheus_ex, "~> 1.1 or ~> 2.0 or ~> 3.0", [hex: :prometheus_ex, repo: "hexpm", optional: false]}], "hexpm"}, "prometheus_ex": {:hex, :prometheus_ex, "3.0.5", "fa58cfd983487fc5ead331e9a3e0aa622c67232b3ec71710ced122c4c453a02f", [:mix], [{:prometheus, "~> 4.0", [hex: :prometheus, repo: "hexpm", optional: false]}], "hexpm"}, "prometheus_phoenix": {:hex, :prometheus_phoenix, "1.3.0", "c4b527e0b3a9ef1af26bdcfbfad3998f37795b9185d475ca610fe4388fdd3bb5", [:mix], [{:phoenix, "~> 1.4", [hex: :phoenix, repo: "hexpm", optional: false]}, {:prometheus_ex, "~> 1.3 or ~> 2.0 or ~> 3.0", [hex: :prometheus_ex, repo: "hexpm", optional: false]}], "hexpm"}, "prometheus_plugs": {:hex, :prometheus_plugs, "1.1.5", "25933d48f8af3a5941dd7b621c889749894d8a1082a6ff7c67cc99dec26377c5", [:mix], [{:accept, "~> 0.1", [hex: :accept, repo: "hexpm", optional: false]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: false]}, {:prometheus_ex, "~> 1.1 or ~> 2.0 or ~> 3.0", [hex: :prometheus_ex, repo: "hexpm", optional: false]}, {:prometheus_process_collector, "~> 1.1", [hex: :prometheus_process_collector, repo: "hexpm", optional: true]}], "hexpm"}, "quack": {:hex, :quack, "0.1.1", "cca7b4da1a233757fdb44b3334fce80c94785b3ad5a602053b7a002b5a8967bf", [:mix], [{:poison, ">= 1.0.0", [hex: :poison, repo: "hexpm", optional: false]}, {:tesla, "~> 1.2.0", [hex: :tesla, repo: "hexpm", optional: false]}], "hexpm"}, + "quantum": {:hex, :quantum, "2.3.4", "72a0e8855e2adc101459eac8454787cb74ab4169de6ca50f670e72142d4960e9", [:mix], [{:calendar, "~> 0.17", [hex: :calendar, repo: "hexpm", optional: true]}, {:crontab, "~> 1.1", [hex: :crontab, repo: "hexpm", optional: false]}, {:gen_stage, "~> 0.12", [hex: :gen_stage, repo: "hexpm", optional: false]}, {:swarm, "~> 3.3", [hex: :swarm, repo: "hexpm", optional: false]}, {:timex, "~> 3.1", [hex: :timex, repo: "hexpm", optional: true]}], "hexpm"}, "ranch": {:hex, :ranch, "1.7.1", "6b1fab51b49196860b733a49c07604465a47bdb78aa10c1c16a3d199f7f8c881", [:rebar3], [], "hexpm"}, "recon": {:git, "https://github.com/ferd/recon.git", "75d70c7c08926d2f24f1ee6de14ee50fe8a52763", [tag: "2.4.0"]}, "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.4", "f0eafff810d2041e93f915ef59899c923f4568f4585904d010387ed74988e77b", [:make, :mix, :rebar3], [], "hexpm"}, + "swarm": {:hex, :swarm, "3.4.0", "64f8b30055d74640d2186c66354b33b999438692a91be275bb89cdc7e401f448", [:mix], [{:gen_state_machine, "~> 2.0", [hex: :gen_state_machine, repo: "hexpm", optional: false]}, {:libring, "~> 1.0", [hex: :libring, repo: "hexpm", optional: false]}], "hexpm"}, "sweet_xml": {:hex, :sweet_xml, "0.6.6", "fc3e91ec5dd7c787b6195757fbcf0abc670cee1e4172687b45183032221b66b8", [:mix], [], "hexpm"}, "swoosh": {:hex, :swoosh, "0.23.2", "7dda95ff0bf54a2298328d6899c74dae1223777b43563ccebebb4b5d2b61df38", [:mix], [{:cowboy, "~> 1.0.1 or ~> 1.1 or ~> 2.4", [hex: :cowboy, repo: "hexpm", optional: true]}, {:gen_smtp, "~> 0.13", [hex: :gen_smtp, repo: "hexpm", optional: true]}, {:hackney, "~> 1.9", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mail, "~> 0.2", [hex: :mail, repo: "hexpm", optional: true]}, {:mime, "~> 1.1", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_cowboy, ">= 1.0.0", [hex: :plug_cowboy, repo: "hexpm", optional: true]}], "hexpm"}, "syslog": {:git, "https://github.com/Vagabond/erlang-syslog.git", "4a6c6f2c996483e86c1320e9553f91d337bcb6aa", [tag: "1.0.5"]}, diff --git a/priv/repo/migrations/20190730055101_add_oban_jobs_table.exs b/priv/repo/migrations/20190730055101_add_oban_jobs_table.exs new file mode 100644 index 000000000..2f201bd05 --- /dev/null +++ b/priv/repo/migrations/20190730055101_add_oban_jobs_table.exs @@ -0,0 +1,6 @@ +defmodule Pleroma.Repo.Migrations.AddObanJobsTable do + use Ecto.Migration + + defdelegate up, to: Oban.Migrations + defdelegate down, to: Oban.Migrations +end diff --git a/test/activity_test.exs b/test/activity_test.exs index b27f6fd36..658c47837 100644 --- a/test/activity_test.exs +++ b/test/activity_test.exs @@ -7,6 +7,7 @@ defmodule Pleroma.ActivityTest do alias Pleroma.Activity alias Pleroma.Bookmark alias Pleroma.Object + alias Pleroma.Tests.ObanHelpers alias Pleroma.ThreadMute import Pleroma.Factory @@ -125,7 +126,8 @@ defmodule Pleroma.ActivityTest do } {:ok, local_activity} = Pleroma.Web.CommonAPI.post(user, %{"status" => "find me!"}) - {:ok, remote_activity} = Pleroma.Web.Federator.incoming_ap_doc(params) + {:ok, job} = Pleroma.Web.Federator.incoming_ap_doc(params) + {:ok, remote_activity} = ObanHelpers.perform(job) %{local_activity: local_activity, remote_activity: remote_activity, user: user} end diff --git a/test/conversation_test.exs b/test/conversation_test.exs index 4e36494f8..693427d80 100644 --- a/test/conversation_test.exs +++ b/test/conversation_test.exs @@ -22,6 +22,8 @@ defmodule Pleroma.ConversationTest do {:ok, _activity} = CommonAPI.post(user, %{"visibility" => "direct", "status" => "hey @#{other_user.nickname}"}) + Pleroma.Tests.ObanHelpers.perform_all() + Repo.delete_all(Conversation) Repo.delete_all(Conversation.Participation) diff --git a/test/notification_test.exs b/test/notification_test.exs index 80ea2a085..e1c9f4f93 100644 --- a/test/notification_test.exs +++ b/test/notification_test.exs @@ -8,6 +8,7 @@ defmodule Pleroma.NotificationTest do import Pleroma.Factory alias Pleroma.Notification + alias Pleroma.Tests.ObanHelpers alias Pleroma.User alias Pleroma.Web.ActivityPub.Transmogrifier alias Pleroma.Web.CommonAPI @@ -621,7 +622,8 @@ defmodule Pleroma.NotificationTest do refute Enum.empty?(Notification.for_user(other_user)) - User.delete(user) + {:ok, job} = User.delete(user) + ObanHelpers.perform(job) assert Enum.empty?(Notification.for_user(other_user)) end @@ -666,6 +668,7 @@ defmodule Pleroma.NotificationTest do } {:ok, _delete_activity} = Transmogrifier.handle_incoming(delete_user_message) + ObanHelpers.perform_all() assert Enum.empty?(Notification.for_user(local_user)) end diff --git a/test/support/oban_helpers.ex b/test/support/oban_helpers.ex new file mode 100644 index 000000000..989770926 --- /dev/null +++ b/test/support/oban_helpers.ex @@ -0,0 +1,42 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2018 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Tests.ObanHelpers do + @moduledoc """ + Oban test helpers. + """ + + alias Pleroma.Repo + + def perform_all do + Oban.Job + |> Repo.all() + |> perform() + end + + def perform(%Oban.Job{} = job) do + res = apply(String.to_existing_atom("Elixir." <> job.worker), :perform, [job.args, job]) + Repo.delete(job) + res + end + + def perform(jobs) when is_list(jobs) do + for job <- jobs, do: perform(job) + end + + def member?(%{} = job_args, jobs) when is_list(jobs) do + Enum.any?(jobs, fn job -> + member?(job_args, job.args) + end) + end + + def member?(%{} = test_attrs, %{} = attrs) do + Enum.all?( + test_attrs, + fn {k, _v} -> member?(test_attrs[k], attrs[k]) end + ) + end + + def member?(x, y), do: x == y +end diff --git a/test/tasks/digest_test.exs b/test/tasks/digest_test.exs index 4bfa1fb93..96d762685 100644 --- a/test/tasks/digest_test.exs +++ b/test/tasks/digest_test.exs @@ -4,6 +4,7 @@ defmodule Mix.Tasks.Pleroma.DigestTest do import Pleroma.Factory import Swoosh.TestAssertions + alias Pleroma.Tests.ObanHelpers alias Pleroma.Web.CommonAPI setup_all do @@ -39,6 +40,8 @@ defmodule Mix.Tasks.Pleroma.DigestTest do :ok = Mix.Tasks.Pleroma.Digest.run(["test", user2.nickname, yesterday_date]) + ObanHelpers.perform_all() + assert_receive {:mix_shell, :info, [message]} assert message =~ "Digest email have been sent" diff --git a/test/user_test.exs b/test/user_test.exs index 661ffc0b3..733467398 100644 --- a/test/user_test.exs +++ b/test/user_test.exs @@ -7,14 +7,16 @@ defmodule Pleroma.UserTest do alias Pleroma.Builders.UserBuilder alias Pleroma.Object alias Pleroma.Repo + alias Pleroma.Tests.ObanHelpers alias Pleroma.User alias Pleroma.Web.ActivityPub.ActivityPub alias Pleroma.Web.CommonAPI use Pleroma.DataCase + use Oban.Testing, repo: Pleroma.Repo - import Pleroma.Factory import Mock + import Pleroma.Factory setup_all do Tesla.Mock.mock_global(fn env -> apply(HttpRequestMock, :request, [env]) end) @@ -725,7 +727,9 @@ defmodule Pleroma.UserTest do user3.nickname ] - result = User.follow_import(user1, identifiers) + {:ok, job} = User.follow_import(user1, identifiers) + result = ObanHelpers.perform(job) + assert is_list(result) assert result == [user2, user3] end @@ -936,7 +940,9 @@ defmodule Pleroma.UserTest do user3.nickname ] - result = User.blocks_import(user1, identifiers) + {:ok, job} = User.blocks_import(user1, identifiers) + result = ObanHelpers.perform(job) + assert is_list(result) assert result == [user2, user3] end @@ -1053,7 +1059,9 @@ defmodule Pleroma.UserTest do test "it deletes deactivated user" do {:ok, user} = insert(:user, info: %{deactivated: true}) |> User.set_cache() - assert {:ok, _} = User.delete(user) + {:ok, job} = User.delete(user) + {:ok, _user} = ObanHelpers.perform(job) + refute User.get_by_id(user.id) end @@ -1071,7 +1079,8 @@ defmodule Pleroma.UserTest do {:ok, like_two, _} = CommonAPI.favorite(activity.id, follower) {:ok, repeat, _} = CommonAPI.repeat(activity_two.id, user) - {:ok, _} = User.delete(user) + {:ok, job} = User.delete(user) + {:ok, _user} = ObanHelpers.perform(job) follower = User.get_cached_by_id(follower.id) @@ -1103,12 +1112,18 @@ defmodule Pleroma.UserTest do {:ok, follower} = User.get_or_fetch_by_ap_id("http://mastodon.example.org/users/admin") {:ok, _} = User.follow(follower, user) - {:ok, _user} = User.delete(user) - - assert called( - Pleroma.Web.ActivityPub.Publisher.publish_one(%{ - inbox: "http://mastodon.example.org/inbox" - }) + {:ok, job} = User.delete(user) + {:ok, _user} = ObanHelpers.perform(job) + + assert ObanHelpers.member?( + %{ + "op" => "publish_one", + "params" => %{ + "inbox" => "http://mastodon.example.org/inbox", + "id" => "pleroma:fakeid" + } + }, + all_enqueued(worker: Pleroma.Workers.Publisher) ) end end @@ -1153,7 +1168,8 @@ defmodule Pleroma.UserTest do test "User.delete() plugs any possible zombie objects" do user = insert(:user) - {:ok, _} = User.delete(user) + {:ok, job} = User.delete(user) + {:ok, _} = ObanHelpers.perform(job) {:ok, cached_user} = Cachex.get(:user_cache, "ap_id:#{user.ap_id}") diff --git a/test/web/activity_pub/activity_pub_controller_test.exs b/test/web/activity_pub/activity_pub_controller_test.exs index 77f5e39fa..a214f57a6 100644 --- a/test/web/activity_pub/activity_pub_controller_test.exs +++ b/test/web/activity_pub/activity_pub_controller_test.exs @@ -4,15 +4,19 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do use Pleroma.Web.ConnCase + use Oban.Testing, repo: Pleroma.Repo + import Pleroma.Factory alias Pleroma.Activity alias Pleroma.Instances alias Pleroma.Object + alias Pleroma.Tests.ObanHelpers alias Pleroma.User alias Pleroma.Web.ActivityPub.ObjectView alias Pleroma.Web.ActivityPub.UserView alias Pleroma.Web.ActivityPub.Utils alias Pleroma.Web.CommonAPI + alias Pleroma.Workers.Receiver, as: ReceiverWorker setup_all do Tesla.Mock.mock_global(fn env -> apply(HttpRequestMock, :request, [env]) end) @@ -276,7 +280,8 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do |> post("/inbox", data) assert "ok" == json_response(conn, 200) - :timer.sleep(500) + + ObanHelpers.perform(all_enqueued(worker: ReceiverWorker)) assert Activity.get_by_ap_id(data["id"]) end @@ -318,7 +323,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do |> post("/users/#{user.nickname}/inbox", data) assert "ok" == json_response(conn, 200) - :timer.sleep(500) + ObanHelpers.perform(all_enqueued(worker: ReceiverWorker)) assert Activity.get_by_ap_id(data["id"]) end @@ -347,7 +352,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do |> post("/users/#{recipient.nickname}/inbox", data) assert "ok" == json_response(conn, 200) - :timer.sleep(500) + ObanHelpers.perform(all_enqueued(worker: ReceiverWorker)) assert Activity.get_by_ap_id(data["id"]) end @@ -426,6 +431,8 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do |> post("/users/#{recipient.nickname}/inbox", data) |> json_response(200) + ObanHelpers.perform(all_enqueued(worker: ReceiverWorker)) + activity = Activity.get_by_ap_id(data["id"]) assert activity.id @@ -501,6 +508,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do |> post("/users/#{user.nickname}/outbox", data) result = json_response(conn, 201) + assert Activity.get_by_ap_id(result["id"]) end diff --git a/test/web/activity_pub/mrf/mediaproxy_warming_policy_test.exs b/test/web/activity_pub/mrf/mediaproxy_warming_policy_test.exs index 372e789be..95a809d25 100644 --- a/test/web/activity_pub/mrf/mediaproxy_warming_policy_test.exs +++ b/test/web/activity_pub/mrf/mediaproxy_warming_policy_test.exs @@ -6,6 +6,7 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicyTest do use Pleroma.DataCase alias Pleroma.HTTP + alias Pleroma.Tests.ObanHelpers alias Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy import Mock @@ -24,6 +25,11 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicyTest do test "it prefetches media proxy URIs" do with_mock HTTP, get: fn _, _, _ -> {:ok, []} end do MediaProxyWarmingPolicy.filter(@message) + + ObanHelpers.perform_all() + # Performing jobs which has been just enqueued + ObanHelpers.perform_all() + assert called(HTTP.get(:_, :_, :_)) end end diff --git a/test/web/activity_pub/publisher_test.exs b/test/web/activity_pub/publisher_test.exs index 36a39c84c..26d019878 100644 --- a/test/web/activity_pub/publisher_test.exs +++ b/test/web/activity_pub/publisher_test.exs @@ -257,7 +257,7 @@ defmodule Pleroma.Web.ActivityPub.PublisherTest do assert called( Pleroma.Web.Federator.Publisher.enqueue_one(Publisher, %{ inbox: "https://domain.com/users/nick1/inbox", - actor: actor, + actor_id: actor.id, id: note_activity.data["id"] }) ) diff --git a/test/web/activity_pub/transmogrifier_test.exs b/test/web/activity_pub/transmogrifier_test.exs index 629c76c97..42f9672d0 100644 --- a/test/web/activity_pub/transmogrifier_test.exs +++ b/test/web/activity_pub/transmogrifier_test.exs @@ -8,6 +8,7 @@ defmodule Pleroma.Web.ActivityPub.TransmogrifierTest do alias Pleroma.Object alias Pleroma.Object.Fetcher alias Pleroma.Repo + alias Pleroma.Tests.ObanHelpers alias Pleroma.User alias Pleroma.Web.ActivityPub.ActivityPub alias Pleroma.Web.ActivityPub.Transmogrifier @@ -640,6 +641,7 @@ defmodule Pleroma.Web.ActivityPub.TransmogrifierTest do |> Poison.decode!() {:ok, _} = Transmogrifier.handle_incoming(data) + ObanHelpers.perform_all() refute User.get_cached_by_ap_id(ap_id) end @@ -1202,6 +1204,8 @@ defmodule Pleroma.Web.ActivityPub.TransmogrifierTest do assert user.info.note_count == 1 {:ok, user} = Transmogrifier.upgrade_user_from_ap_id("https://niu.moe/users/rye") + ObanHelpers.perform_all() + assert user.info.ap_enabled assert user.info.note_count == 1 assert user.follower_address == "https://niu.moe/users/rye/followers" diff --git a/test/web/admin_api/admin_api_controller_test.exs b/test/web/admin_api/admin_api_controller_test.exs index 844cd0732..a867ac998 100644 --- a/test/web/admin_api/admin_api_controller_test.exs +++ b/test/web/admin_api/admin_api_controller_test.exs @@ -1861,7 +1861,7 @@ defmodule Pleroma.Web.AdminAPI.AdminAPIControllerTest do post(conn, "/api/pleroma/admin/config", %{ configs: [ %{ - "group" => "pleroma_job_queue", + "group" => "oban", "key" => ":queues", "value" => [ %{"tuple" => [":federator_incoming", 50]}, @@ -1879,7 +1879,7 @@ defmodule Pleroma.Web.AdminAPI.AdminAPIControllerTest do assert json_response(conn, 200) == %{ "configs" => [ %{ - "group" => "pleroma_job_queue", + "group" => "oban", "key" => ":queues", "value" => [ %{"tuple" => [":federator_incoming", 50]}, diff --git a/test/web/digest_email_worker_test.exs b/test/web/digest_email_worker_test.exs index 15002330f..5dfd920fa 100644 --- a/test/web/digest_email_worker_test.exs +++ b/test/web/digest_email_worker_test.exs @@ -7,6 +7,7 @@ defmodule Pleroma.DigestEmailWorkerTest do import Pleroma.Factory alias Pleroma.DigestEmailWorker + alias Pleroma.Tests.ObanHelpers alias Pleroma.User alias Pleroma.Web.CommonAPI @@ -23,6 +24,9 @@ defmodule Pleroma.DigestEmailWorkerTest do CommonAPI.post(user, %{"status" => "hey @#{user2.nickname}!"}) DigestEmailWorker.perform() + ObanHelpers.perform_all() + # Performing job(s) enqueued at previous step + ObanHelpers.perform_all() assert_received {:email, email} assert email.to == [{user2.name, user2.email}] diff --git a/test/web/federator_test.exs b/test/web/federator_test.exs index 09e54533f..5724672fd 100644 --- a/test/web/federator_test.exs +++ b/test/web/federator_test.exs @@ -4,9 +4,14 @@ defmodule Pleroma.Web.FederatorTest do alias Pleroma.Instances + alias Pleroma.Tests.ObanHelpers alias Pleroma.Web.CommonAPI alias Pleroma.Web.Federator + alias Pleroma.Workers.Publisher, as: PublisherWorker + use Pleroma.DataCase + use Oban.Testing, repo: Pleroma.Repo + import Pleroma.Factory import Mock @@ -24,15 +29,6 @@ defmodule Pleroma.Web.FederatorTest do clear_config([:instance, :rewrite_policy]) clear_config([:mrf_keyword]) - describe "Publisher.perform" do - test "call `perform` with unknown task" do - assert { - :error, - "Don't know what to do with this" - } = Pleroma.Web.Federator.Publisher.perform("test", :ok, :ok) - end - end - describe "Publish an activity" do setup do user = insert(:user) @@ -53,6 +49,7 @@ defmodule Pleroma.Web.FederatorTest do } do with_mocks([relay_mock]) do Federator.publish(activity) + ObanHelpers.perform(all_enqueued(worker: PublisherWorker)) end assert_received :relay_publish @@ -66,6 +63,7 @@ defmodule Pleroma.Web.FederatorTest do with_mocks([relay_mock]) do Federator.publish(activity) + ObanHelpers.perform(all_enqueued(worker: PublisherWorker)) end refute_received :relay_publish @@ -73,10 +71,7 @@ defmodule Pleroma.Web.FederatorTest do end describe "Targets reachability filtering in `publish`" do - test_with_mock "it federates only to reachable instances via AP", - Pleroma.Web.ActivityPub.Publisher, - [:passthrough], - [] do + test "it federates only to reachable instances via AP" do user = insert(:user) {inbox1, inbox2} = @@ -104,20 +99,20 @@ defmodule Pleroma.Web.FederatorTest do {:ok, _activity} = CommonAPI.post(user, %{"status" => "HI @nick1@domain.com, @nick2@domain2.com!"}) - assert called( - Pleroma.Web.ActivityPub.Publisher.publish_one(%{ - inbox: inbox1, - unreachable_since: dt - }) - ) + expected_dt = NaiveDateTime.to_iso8601(dt) - refute called(Pleroma.Web.ActivityPub.Publisher.publish_one(%{inbox: inbox2})) + ObanHelpers.perform(all_enqueued(worker: PublisherWorker)) + + assert ObanHelpers.member?( + %{ + "op" => "publish_one", + "params" => %{"inbox" => inbox1, "unreachable_since" => expected_dt} + }, + all_enqueued(worker: PublisherWorker) + ) end - test_with_mock "it federates only to reachable instances via Websub", - Pleroma.Web.Websub, - [:passthrough], - [] do + test "it federates only to reachable instances via Websub" do user = insert(:user) websub_topic = Pleroma.Web.OStatus.feed_path(user) @@ -142,23 +137,27 @@ defmodule Pleroma.Web.FederatorTest do {:ok, _activity} = CommonAPI.post(user, %{"status" => "HI"}) - assert called( - Pleroma.Web.Websub.publish_one(%{ - callback: sub2.callback, - unreachable_since: dt - }) - ) + expected_callback = sub2.callback + expected_dt = NaiveDateTime.to_iso8601(dt) + + ObanHelpers.perform(all_enqueued(worker: PublisherWorker)) - refute called(Pleroma.Web.Websub.publish_one(%{callback: sub1.callback})) + assert ObanHelpers.member?( + %{ + "op" => "publish_one", + "params" => %{ + "callback" => expected_callback, + "unreachable_since" => expected_dt + } + }, + all_enqueued(worker: PublisherWorker) + ) end - test_with_mock "it federates only to reachable instances via Salmon", - Pleroma.Web.Salmon, - [:passthrough], - [] do + test "it federates only to reachable instances via Salmon" do user = insert(:user) - remote_user1 = + _remote_user1 = insert(:user, %{ local: false, nickname: "nick1@domain.com", @@ -174,6 +173,8 @@ defmodule Pleroma.Web.FederatorTest do info: %{salmon: "https://domain2.com/salmon"} }) + remote_user2_id = remote_user2.id + dt = NaiveDateTime.utc_now() Instances.set_unreachable(remote_user2.ap_id, dt) @@ -182,14 +183,20 @@ defmodule Pleroma.Web.FederatorTest do {:ok, _activity} = CommonAPI.post(user, %{"status" => "HI @nick1@domain.com, @nick2@domain2.com!"}) - assert called( - Pleroma.Web.Salmon.publish_one(%{ - recipient: remote_user2, - unreachable_since: dt - }) - ) + expected_dt = NaiveDateTime.to_iso8601(dt) + + ObanHelpers.perform(all_enqueued(worker: PublisherWorker)) - refute called(Pleroma.Web.Salmon.publish_one(%{recipient: remote_user1})) + assert ObanHelpers.member?( + %{ + "op" => "publish_one", + "params" => %{ + "recipient_id" => remote_user2_id, + "unreachable_since" => expected_dt + } + }, + all_enqueued(worker: PublisherWorker) + ) end end @@ -209,7 +216,8 @@ defmodule Pleroma.Web.FederatorTest do "to" => ["https://www.w3.org/ns/activitystreams#Public"] } - {:ok, _activity} = Federator.incoming_ap_doc(params) + assert {:ok, job} = Federator.incoming_ap_doc(params) + assert {:ok, _activity} = ObanHelpers.perform(job) end test "rejects incoming AP docs with incorrect origin" do @@ -227,7 +235,8 @@ defmodule Pleroma.Web.FederatorTest do "to" => ["https://www.w3.org/ns/activitystreams#Public"] } - :error = Federator.incoming_ap_doc(params) + assert {:ok, job} = Federator.incoming_ap_doc(params) + assert :error = ObanHelpers.perform(job) end test "it does not crash if MRF rejects the post" do @@ -242,7 +251,8 @@ defmodule Pleroma.Web.FederatorTest do File.read!("test/fixtures/mastodon-post-activity.json") |> Poison.decode!() - assert Federator.incoming_ap_doc(params) == :error + assert {:ok, job} = Federator.incoming_ap_doc(params) + assert :error = ObanHelpers.perform(job) end end end diff --git a/test/web/instances/instance_test.exs b/test/web/instances/instance_test.exs index 3fd011fd3..0b53bc6cd 100644 --- a/test/web/instances/instance_test.exs +++ b/test/web/instances/instance_test.exs @@ -16,7 +16,8 @@ defmodule Pleroma.Instances.InstanceTest do describe "set_reachable/1" do test "clears `unreachable_since` of existing matching Instance record having non-nil `unreachable_since`" do - instance = insert(:instance, unreachable_since: NaiveDateTime.utc_now()) + unreachable_since = NaiveDateTime.to_iso8601(NaiveDateTime.utc_now()) + instance = insert(:instance, unreachable_since: unreachable_since) assert {:ok, instance} = Instance.set_reachable(instance.host) refute instance.unreachable_since diff --git a/test/web/mastodon_api/mastodon_api_controller_test.exs b/test/web/mastodon_api/mastodon_api_controller_test.exs index 77430e9c9..0f40146fb 100644 --- a/test/web/mastodon_api/mastodon_api_controller_test.exs +++ b/test/web/mastodon_api/mastodon_api_controller_test.exs @@ -12,6 +12,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIControllerTest do alias Pleroma.Object alias Pleroma.Repo alias Pleroma.ScheduledActivity + alias Pleroma.Tests.ObanHelpers alias Pleroma.User alias Pleroma.Web.ActivityPub.ActivityPub alias Pleroma.Web.CommonAPI @@ -3861,6 +3862,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIControllerTest do end test "it sends an email to user", %{user: user} do + ObanHelpers.perform_all() token_record = Repo.get_by(Pleroma.PasswordResetToken, user_id: user.id) email = Pleroma.Emails.UserEmail.password_reset_email(user, token_record.token) @@ -3921,6 +3923,8 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIControllerTest do |> post("/api/v1/pleroma/accounts/confirmation_resend?email=#{user.email}") |> json_response(:no_content) + ObanHelpers.perform_all() + email = Pleroma.Emails.UserEmail.account_confirmation_email(user) notify_email = Config.get([:instance, :notify_email]) instance_name = Config.get([:instance, :name]) diff --git a/test/web/retry_queue_test.exs b/test/web/retry_queue_test.exs deleted file mode 100644 index ecb3ce5d0..000000000 --- a/test/web/retry_queue_test.exs +++ /dev/null @@ -1,48 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2018 Pleroma Authors <https://pleroma.social/> -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule MockActivityPub do - def publish_one({ret, waiter}) do - send(waiter, :complete) - {ret, "success"} - end -end - -defmodule Pleroma.Web.Federator.RetryQueueTest do - use Pleroma.DataCase - alias Pleroma.Web.Federator.RetryQueue - - @small_retry_count 0 - @hopeless_retry_count 10 - - setup do - RetryQueue.reset_stats() - end - - test "RetryQueue responds to stats request" do - assert %{delivered: 0, dropped: 0} == RetryQueue.get_stats() - end - - test "failed posts are retried" do - {:retry, _timeout} = RetryQueue.get_retry_params(@small_retry_count) - - wait_task = - Task.async(fn -> - receive do - :complete -> :ok - end - end) - - RetryQueue.enqueue({:ok, wait_task.pid}, MockActivityPub, @small_retry_count) - Task.await(wait_task) - assert %{delivered: 1, dropped: 0} == RetryQueue.get_stats() - end - - test "posts that have been tried too many times are dropped" do - {:drop, _timeout} = RetryQueue.get_retry_params(@hopeless_retry_count) - - RetryQueue.enqueue({:ok, nil}, MockActivityPub, @hopeless_retry_count) - assert %{delivered: 0, dropped: 1} == RetryQueue.get_stats() - end -end diff --git a/test/web/salmon/salmon_test.exs b/test/web/salmon/salmon_test.exs index e86e76fe9..0186f3fef 100644 --- a/test/web/salmon/salmon_test.exs +++ b/test/web/salmon/salmon_test.exs @@ -96,6 +96,6 @@ defmodule Pleroma.Web.Salmon.SalmonTest do Salmon.publish(user, activity) - assert called(Publisher.enqueue_one(Salmon, %{recipient: mentioned_user})) + assert called(Publisher.enqueue_one(Salmon, %{recipient_id: mentioned_user.id})) end end diff --git a/test/web/twitter_api/twitter_api_controller_test.exs b/test/web/twitter_api/twitter_api_controller_test.exs index 8ef14b4c5..598833893 100644 --- a/test/web/twitter_api/twitter_api_controller_test.exs +++ b/test/web/twitter_api/twitter_api_controller_test.exs @@ -12,6 +12,7 @@ defmodule Pleroma.Web.TwitterAPI.ControllerTest do alias Pleroma.Notification alias Pleroma.Object alias Pleroma.Repo + alias Pleroma.Tests.ObanHelpers alias Pleroma.User alias Pleroma.Web.ActivityPub.ActivityPub alias Pleroma.Web.CommonAPI @@ -1093,6 +1094,7 @@ defmodule Pleroma.Web.TwitterAPI.ControllerTest do end test "it sends an email to user", %{user: user} do + ObanHelpers.perform_all() token_record = Repo.get_by(Pleroma.PasswordResetToken, user_id: user.id) email = Pleroma.Emails.UserEmail.password_reset_email(user, token_record.token) @@ -1200,6 +1202,8 @@ defmodule Pleroma.Web.TwitterAPI.ControllerTest do |> assign(:user, user) |> post("/api/account/resend_confirmation_email?email=#{user.email}") + ObanHelpers.perform_all() + email = Pleroma.Emails.UserEmail.account_confirmation_email(user) notify_email = Pleroma.Config.get([:instance, :notify_email]) instance_name = Pleroma.Config.get([:instance, :name]) diff --git a/test/web/twitter_api/twitter_api_test.exs b/test/web/twitter_api/twitter_api_test.exs index cbe83852e..bf063a0de 100644 --- a/test/web/twitter_api/twitter_api_test.exs +++ b/test/web/twitter_api/twitter_api_test.exs @@ -7,6 +7,7 @@ defmodule Pleroma.Web.TwitterAPI.TwitterAPITest do alias Pleroma.Activity alias Pleroma.Object alias Pleroma.Repo + alias Pleroma.Tests.ObanHelpers alias Pleroma.User alias Pleroma.UserInviteToken alias Pleroma.Web.ActivityPub.ActivityPub @@ -321,6 +322,7 @@ defmodule Pleroma.Web.TwitterAPI.TwitterAPITest do } {:ok, user} = TwitterAPI.register_user(data) + ObanHelpers.perform_all() assert user.info.confirmation_pending diff --git a/test/web/twitter_api/util_controller_test.exs b/test/web/twitter_api/util_controller_test.exs index fe4ffdb59..349122c05 100644 --- a/test/web/twitter_api/util_controller_test.exs +++ b/test/web/twitter_api/util_controller_test.exs @@ -4,9 +4,11 @@ defmodule Pleroma.Web.TwitterAPI.UtilControllerTest do use Pleroma.Web.ConnCase + use Oban.Testing, repo: Pleroma.Repo alias Pleroma.Notification alias Pleroma.Repo + alias Pleroma.Tests.ObanHelpers alias Pleroma.User alias Pleroma.Web.CommonAPI import Pleroma.Factory @@ -43,8 +45,7 @@ defmodule Pleroma.Web.TwitterAPI.UtilControllerTest do {File, [], read!: fn "follow_list.txt" -> "Account address,Show boosts\n#{user2.ap_id},true" - end}, - {PleromaJobQueue, [:passthrough], []} + end} ]) do response = conn @@ -52,15 +53,16 @@ defmodule Pleroma.Web.TwitterAPI.UtilControllerTest do |> post("/api/pleroma/follow_import", %{"list" => %Plug.Upload{path: "follow_list.txt"}}) |> json_response(:ok) - assert called( - PleromaJobQueue.enqueue( - :background, - User, - [:follow_import, user1, [user2.ap_id]] - ) - ) - assert response == "job started" + + assert ObanHelpers.member?( + %{ + "op" => "follow_import", + "follower_id" => user1.id, + "followed_identifiers" => [user2.ap_id] + }, + all_enqueued(worker: Pleroma.Workers.BackgroundWorker) + ) end end @@ -119,8 +121,7 @@ defmodule Pleroma.Web.TwitterAPI.UtilControllerTest do user3 = insert(:user) with_mocks([ - {File, [], read!: fn "blocks_list.txt" -> "#{user2.ap_id} #{user3.ap_id}" end}, - {PleromaJobQueue, [:passthrough], []} + {File, [], read!: fn "blocks_list.txt" -> "#{user2.ap_id} #{user3.ap_id}" end} ]) do response = conn @@ -128,15 +129,16 @@ defmodule Pleroma.Web.TwitterAPI.UtilControllerTest do |> post("/api/pleroma/blocks_import", %{"list" => %Plug.Upload{path: "blocks_list.txt"}}) |> json_response(:ok) - assert called( - PleromaJobQueue.enqueue( - :background, - User, - [:blocks_import, user1, [user2.ap_id, user3.ap_id]] - ) - ) - assert response == "job started" + + assert ObanHelpers.member?( + %{ + "op" => "blocks_import", + "blocker_id" => user1.id, + "blocked_identifiers" => [user2.ap_id, user3.ap_id] + }, + all_enqueued(worker: Pleroma.Workers.BackgroundWorker) + ) end end end @@ -589,6 +591,7 @@ defmodule Pleroma.Web.TwitterAPI.UtilControllerTest do |> json_response(:ok) assert response == %{"status" => "success"} + ObanHelpers.perform_all() user = User.get_cached_by_id(user.id) diff --git a/test/web/websub/websub_test.exs b/test/web/websub/websub_test.exs index 74386d7db..414610879 100644 --- a/test/web/websub/websub_test.exs +++ b/test/web/websub/websub_test.exs @@ -4,11 +4,14 @@ defmodule Pleroma.Web.WebsubTest do use Pleroma.DataCase + use Oban.Testing, repo: Pleroma.Repo + alias Pleroma.Tests.ObanHelpers alias Pleroma.Web.Router.Helpers alias Pleroma.Web.Websub alias Pleroma.Web.Websub.WebsubClientSubscription alias Pleroma.Web.Websub.WebsubServerSubscription + alias Pleroma.Workers.Subscriber, as: SubscriberWorker import Pleroma.Factory import Tesla.Mock @@ -224,6 +227,7 @@ defmodule Pleroma.Web.WebsubTest do }) _refresh = Websub.refresh_subscriptions() + ObanHelpers.perform(all_enqueued(worker: SubscriberWorker)) assert still_good == Repo.get(WebsubClientSubscription, still_good.id) refute needs_refresh == Repo.get(WebsubClientSubscription, needs_refresh.id) |