aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorrinpatch <rinpatch@sdf.org>2020-06-16 23:45:59 +0300
committerrinpatch <rinpatch@sdf.org>2020-06-16 23:53:13 +0300
commit5c0e1039ce41a2717598992a590658d4d079451c (patch)
tree9ed54a7beb5b7b07af05135732e80fba3b91952d
parentb536e57124fda9adeb4c78739d0eb2be80d47f6e (diff)
downloadpleroma-5c0e1039ce41a2717598992a590658d4d079451c.tar.gz
Chunk the notification type backfill migration
Long-term we want that migration to be done entirely in SQL, but for now this is a hotfix to not cause OOMs on large databases. This is using a homegrown version of `Repo.stream`, it's worse in terms of performance than the upstream since it doesn't use the same prepared query for chunk queries, but unlike the upstream it supports preloads.
-rw-r--r--lib/pleroma/migration_helper/notification_backfill.ex2
-rw-r--r--lib/pleroma/repo.ex28
2 files changed, 29 insertions, 1 deletions
diff --git a/lib/pleroma/migration_helper/notification_backfill.ex b/lib/pleroma/migration_helper/notification_backfill.ex
index 09647d12a..b3770307a 100644
--- a/lib/pleroma/migration_helper/notification_backfill.ex
+++ b/lib/pleroma/migration_helper/notification_backfill.ex
@@ -18,7 +18,7 @@ defmodule Pleroma.MigrationHelper.NotificationBackfill do
)
query
- |> Repo.all()
+ |> Repo.chunk_stream(100)
|> Enum.each(fn notification ->
type =
notification.activity
diff --git a/lib/pleroma/repo.ex b/lib/pleroma/repo.ex
index f62138466..6d85d70bc 100644
--- a/lib/pleroma/repo.ex
+++ b/lib/pleroma/repo.ex
@@ -8,6 +8,7 @@ defmodule Pleroma.Repo do
adapter: Ecto.Adapters.Postgres,
migration_timestamps: [type: :naive_datetime_usec]
+ import Ecto.Query
require Logger
defmodule Instrumenter do
@@ -78,6 +79,33 @@ defmodule Pleroma.Repo do
:ok
end
end
+
+ def chunk_stream(query, chunk_size) do
+ # We don't actually need start and end funcitons of resource streaming,
+ # but it seems to be the only way to not fetch records one-by-one and
+ # have individual records be the elements of the stream, instead of
+ # lists of records
+ Stream.resource(
+ fn -> 0 end,
+ fn
+ last_id ->
+ query
+ |> order_by(asc: :id)
+ |> where([r], r.id > ^last_id)
+ |> limit(^chunk_size)
+ |> all()
+ |> case do
+ [] ->
+ {:halt, last_id}
+
+ records ->
+ last_id = List.last(records).id
+ {records, last_id}
+ end
+ end,
+ fn _ -> :ok end
+ )
+ end
end
defmodule Pleroma.Repo.UnappliedMigrationsError do