diff options
author | Alexander Strizhakov <alex.strizhakov@gmail.com> | 2020-09-10 10:54:57 +0300 |
---|---|---|
committer | Alexander Strizhakov <alex.strizhakov@gmail.com> | 2020-11-11 13:39:49 +0300 |
commit | 8d218ebaf5ab0b72e419068340c40a5ef9744924 (patch) | |
tree | 7d3187a7082da34e1dde2ae858312c078e0e5c9c /lib | |
parent | 88f6b61a5e31856eb4cee204adf3fc9a6ab78d76 (diff) | |
download | pleroma-8d218ebaf5ab0b72e419068340c40a5ef9744924.tar.gz |
Moving some background jobs into simple tasks
- fetching activity data
- attachment prefetching
- using limiter to prevent overload
Diffstat (limited to 'lib')
-rw-r--r-- | lib/pleroma/application.ex | 6 | ||||
-rw-r--r-- | lib/pleroma/web/activity_pub/activity_pub.ex | 4 | ||||
-rw-r--r-- | lib/pleroma/web/activity_pub/mrf/media_proxy_warming_policy.ex | 19 | ||||
-rw-r--r-- | lib/pleroma/web/activity_pub/side_effects.ex | 5 | ||||
-rw-r--r-- | lib/pleroma/web/rich_media/helpers.ex | 5 | ||||
-rw-r--r-- | lib/pleroma/workers/background_worker.ex | 15 |
6 files changed, 25 insertions, 29 deletions
diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex index 7c4cd9626..769af1806 100644 --- a/lib/pleroma/application.ex +++ b/lib/pleroma/application.ex @@ -57,6 +57,7 @@ defmodule Pleroma.Application do setup_instrumenters() load_custom_modules() Pleroma.Docs.JSON.compile() + limiters_setup() adapter = Application.get_env(:tesla, :adapter) @@ -273,4 +274,9 @@ defmodule Pleroma.Application do end defp http_children(_, _), do: [] + + def limiters_setup do + [Pleroma.Web.RichMedia.Helpers, Pleroma.Web.MediaProxy] + |> Enum.each(&ConcurrentLimiter.new(&1, 1, 0)) + end end diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index d8f685d38..6008f2f4a 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -123,7 +123,9 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do # Splice in the child object if we have one. activity = Maps.put_if_present(activity, :object, object) - BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id}) + ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn -> + Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end) + end) {:ok, activity} else diff --git a/lib/pleroma/web/activity_pub/mrf/media_proxy_warming_policy.ex b/lib/pleroma/web/activity_pub/mrf/media_proxy_warming_policy.ex index 0fb05d3c4..816cc89bf 100644 --- a/lib/pleroma/web/activity_pub/mrf/media_proxy_warming_policy.ex +++ b/lib/pleroma/web/activity_pub/mrf/media_proxy_warming_policy.ex @@ -8,7 +8,6 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do alias Pleroma.HTTP alias Pleroma.Web.MediaProxy - alias Pleroma.Workers.BackgroundWorker require Logger @@ -17,7 +16,7 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do recv_timeout: 10_000 ] - def perform(:prefetch, url) do + defp prefetch(url) do # Fetching only proxiable resources if MediaProxy.enabled?() and MediaProxy.url_proxiable?(url) do # If preview proxy is enabled, it'll also hit media proxy (so we're caching both requests) @@ -25,17 +24,25 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do Logger.debug("Prefetching #{inspect(url)} as #{inspect(prefetch_url)}") - HTTP.get(prefetch_url, [], @adapter_options) + if Pleroma.Config.get(:env) == :test do + fetch(prefetch_url) + else + ConcurrentLimiter.limit(MediaProxy, fn -> + Task.start(fn -> fetch(prefetch_url) end) + end) + end end end - def perform(:preload, %{"object" => %{"attachment" => attachments}} = _message) do + defp fetch(url), do: HTTP.get(url, [], @adapter_options) + + defp preload(%{"object" => %{"attachment" => attachments}} = _message) do Enum.each(attachments, fn %{"url" => url} when is_list(url) -> url |> Enum.each(fn %{"href" => href} -> - BackgroundWorker.enqueue("media_proxy_prefetch", %{"url" => href}) + prefetch(href) x -> Logger.debug("Unhandled attachment URL object #{inspect(x)}") @@ -51,7 +58,7 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do %{"type" => "Create", "object" => %{"attachment" => attachments} = _object} = message ) when is_list(attachments) and length(attachments) > 0 do - BackgroundWorker.enqueue("media_proxy_preload", %{"message" => message}) + preload(message) {:ok, message} end diff --git a/lib/pleroma/web/activity_pub/side_effects.ex b/lib/pleroma/web/activity_pub/side_effects.ex index bbff35c36..4d8fb721e 100644 --- a/lib/pleroma/web/activity_pub/side_effects.ex +++ b/lib/pleroma/web/activity_pub/side_effects.ex @@ -24,7 +24,6 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do alias Pleroma.Web.ActivityPub.Utils alias Pleroma.Web.Push alias Pleroma.Web.Streamer - alias Pleroma.Workers.BackgroundWorker require Logger @@ -191,7 +190,9 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do Object.increase_replies_count(in_reply_to) end - BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id}) + ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn -> + Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end) + end) meta = meta diff --git a/lib/pleroma/web/rich_media/helpers.ex b/lib/pleroma/web/rich_media/helpers.ex index d67b594b5..442bf9995 100644 --- a/lib/pleroma/web/rich_media/helpers.ex +++ b/lib/pleroma/web/rich_media/helpers.ex @@ -78,11 +78,6 @@ defmodule Pleroma.Web.RichMedia.Helpers do def fetch_data_for_activity(_), do: %{} - def perform(:fetch, %Activity{} = activity) do - fetch_data_for_activity(activity) - :ok - end - def rich_media_get(url) do headers = [{"user-agent", Pleroma.Application.user_agent() <> "; Bot"}] diff --git a/lib/pleroma/workers/background_worker.ex b/lib/pleroma/workers/background_worker.ex index 55b5a13d9..0647c65ae 100644 --- a/lib/pleroma/workers/background_worker.ex +++ b/lib/pleroma/workers/background_worker.ex @@ -3,9 +3,7 @@ # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Workers.BackgroundWorker do - alias Pleroma.Activity alias Pleroma.User - alias Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy use Pleroma.Workers.WorkerHelper, queue: "background" @@ -32,19 +30,6 @@ defmodule Pleroma.Workers.BackgroundWorker do {:ok, User.Import.perform(String.to_atom(op), user, identifiers)} end - def perform(%Job{args: %{"op" => "media_proxy_preload", "message" => message}}) do - MediaProxyWarmingPolicy.perform(:preload, message) - end - - def perform(%Job{args: %{"op" => "media_proxy_prefetch", "url" => url}}) do - MediaProxyWarmingPolicy.perform(:prefetch, url) - end - - def perform(%Job{args: %{"op" => "fetch_data_for_activity", "activity_id" => activity_id}}) do - activity = Activity.get_by_id(activity_id) - Pleroma.Web.RichMedia.Helpers.perform(:fetch, activity) - end - def perform(%Job{ args: %{"op" => "move_following", "origin_id" => origin_id, "target_id" => target_id} }) do |