diff options
author | rinpatch <rinpatch@sdf.org> | 2020-06-16 23:45:59 +0300 |
---|---|---|
committer | rinpatch <rinpatch@sdf.org> | 2020-06-16 23:53:13 +0300 |
commit | 5c0e1039ce41a2717598992a590658d4d079451c (patch) | |
tree | 9ed54a7beb5b7b07af05135732e80fba3b91952d | |
parent | b536e57124fda9adeb4c78739d0eb2be80d47f6e (diff) | |
download | pleroma-5c0e1039ce41a2717598992a590658d4d079451c.tar.gz |
Chunk the notification type backfill migration
Long-term we want that migration to be done entirely in SQL,
but for now this is a hotfix to not cause OOMs on large databases.
This is using a homegrown version of `Repo.stream`, it's worse in
terms of performance than the upstream since it doesn't use the same
prepared query for chunk queries, but unlike the upstream it supports
preloads.
-rw-r--r-- | lib/pleroma/migration_helper/notification_backfill.ex | 2 | ||||
-rw-r--r-- | lib/pleroma/repo.ex | 28 |
2 files changed, 29 insertions, 1 deletions
diff --git a/lib/pleroma/migration_helper/notification_backfill.ex b/lib/pleroma/migration_helper/notification_backfill.ex index 09647d12a..b3770307a 100644 --- a/lib/pleroma/migration_helper/notification_backfill.ex +++ b/lib/pleroma/migration_helper/notification_backfill.ex @@ -18,7 +18,7 @@ defmodule Pleroma.MigrationHelper.NotificationBackfill do ) query - |> Repo.all() + |> Repo.chunk_stream(100) |> Enum.each(fn notification -> type = notification.activity diff --git a/lib/pleroma/repo.ex b/lib/pleroma/repo.ex index f62138466..6d85d70bc 100644 --- a/lib/pleroma/repo.ex +++ b/lib/pleroma/repo.ex @@ -8,6 +8,7 @@ defmodule Pleroma.Repo do adapter: Ecto.Adapters.Postgres, migration_timestamps: [type: :naive_datetime_usec] + import Ecto.Query require Logger defmodule Instrumenter do @@ -78,6 +79,33 @@ defmodule Pleroma.Repo do :ok end end + + def chunk_stream(query, chunk_size) do + # We don't actually need start and end funcitons of resource streaming, + # but it seems to be the only way to not fetch records one-by-one and + # have individual records be the elements of the stream, instead of + # lists of records + Stream.resource( + fn -> 0 end, + fn + last_id -> + query + |> order_by(asc: :id) + |> where([r], r.id > ^last_id) + |> limit(^chunk_size) + |> all() + |> case do + [] -> + {:halt, last_id} + + records -> + last_id = List.last(records).id + {records, last_id} + end + end, + fn _ -> :ok end + ) + end end defmodule Pleroma.Repo.UnappliedMigrationsError do |