diff options
Diffstat (limited to 'lib/pleroma/workers')
-rw-r--r-- | lib/pleroma/workers/attachments_cleanup_worker.ex | 48 |
1 files changed, 23 insertions, 25 deletions
diff --git a/lib/pleroma/workers/attachments_cleanup_worker.ex b/lib/pleroma/workers/attachments_cleanup_worker.ex index 58226b395..aa8ee2605 100644 --- a/lib/pleroma/workers/attachments_cleanup_worker.ex +++ b/lib/pleroma/workers/attachments_cleanup_worker.ex @@ -10,6 +10,8 @@ defmodule Pleroma.Workers.AttachmentsCleanupWorker do use Pleroma.Workers.WorkerHelper, queue: "attachments_cleanup" + @batch_size 500 + @impl Oban.Worker def perform(%Job{ args: %{ @@ -19,8 +21,7 @@ defmodule Pleroma.Workers.AttachmentsCleanupWorker do }) do attachments |> Enum.flat_map(fn item -> Enum.map(item["url"], & &1["href"]) end) - |> fetch_objects - |> prepare_objects(actor, Enum.map(attachments, & &1["name"])) + |> fetch_objects(actor, Enum.map(attachments, & &1["name"])) |> filter_objects |> do_clean @@ -71,17 +72,16 @@ defmodule Pleroma.Workers.AttachmentsCleanupWorker do end) end - defp prepare_objects(objects, actor, names) do - objects - |> Enum.reduce(%{}, fn %{ - id: id, - data: %{ - "url" => [%{"href" => href}], - "actor" => obj_actor, - "name" => name - } - }, - acc -> + defp prepare_objects(init, objects, actor, names) do + Enum.reduce(objects, init, fn %{ + id: id, + data: %{ + "url" => [%{"href" => href}], + "actor" => obj_actor, + "name" => name + } + }, + acc -> Map.update(acc, href, %{id: id, count: 1}, fn val -> case obj_actor == actor and name in names do true -> @@ -96,18 +96,16 @@ defmodule Pleroma.Workers.AttachmentsCleanupWorker do end) end - defp fetch_objects(hrefs) do - from(o in Object, - where: - fragment( - "to_jsonb(array(select jsonb_array_elements((?)#>'{url}') ->> 'href' where jsonb_typeof((?)#>'{url}') = 'array'))::jsonb \\?| (?)", - o.data, - o.data, - ^hrefs - ) + defp fetch_objects(hrefs, actor, names) do + from( + o in Object, + where: fragment("object_attachment_urls(?) && (?)", o.data, ^hrefs) ) - # The query above can be time consumptive on large instances until we - # refactor how uploads are stored - |> Repo.all(timeout: :infinity) + |> Pleroma.RepoStreamer.chunk_stream(@batch_size, timeout: :infinity) + |> Stream.transform(%{}, fn objs, acc -> + res = prepare_objects(acc, objs, actor, names) + {res, res} + end) + |> Enum.to_list() end end |