diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/mix/tasks/pleroma/database.ex | 4 | ||||
-rw-r--r-- | lib/mix/tasks/pleroma/user.ex | 4 | ||||
-rw-r--r-- | lib/pleroma/repo.ex | 14 | ||||
-rw-r--r-- | lib/pleroma/repo_streamer.ex | 34 | ||||
-rw-r--r-- | lib/pleroma/user.ex | 3 |
5 files changed, 19 insertions, 40 deletions
diff --git a/lib/mix/tasks/pleroma/database.ex b/lib/mix/tasks/pleroma/database.ex index 7f1108dcf..a01c36ece 100644 --- a/lib/mix/tasks/pleroma/database.ex +++ b/lib/mix/tasks/pleroma/database.ex @@ -99,7 +99,7 @@ defmodule Mix.Tasks.Pleroma.Database do where: fragment("(?)->>'likes' is not null", object.data), select: %{id: object.id, likes: fragment("(?)->>'likes'", object.data)} ) - |> Pleroma.RepoStreamer.chunk_stream(100) + |> Pleroma.Repo.chunk_stream(100, :batches) |> Stream.each(fn objects -> ids = objects @@ -145,7 +145,7 @@ defmodule Mix.Tasks.Pleroma.Database do |> where(local: true) |> where([a], fragment("(? ->> 'type'::text) = 'Create'", a.data)) |> where([_a, o], fragment("?->>'type' = 'Note'", o.data)) - |> Pleroma.RepoStreamer.chunk_stream(100) + |> Pleroma.Repo.chunk_stream(100, :batches) |> Stream.each(fn activities -> Enum.each(activities, fn activity -> expires_at = diff --git a/lib/mix/tasks/pleroma/user.ex b/lib/mix/tasks/pleroma/user.ex index 01824aa18..b20c49d89 100644 --- a/lib/mix/tasks/pleroma/user.ex +++ b/lib/mix/tasks/pleroma/user.ex @@ -179,7 +179,7 @@ defmodule Mix.Tasks.Pleroma.User do start_pleroma() Pleroma.User.Query.build(%{nickname: "@#{instance}"}) - |> Pleroma.RepoStreamer.chunk_stream(500) + |> Pleroma.Repo.chunk_stream(500, :batches) |> Stream.each(fn users -> users |> Enum.each(fn user -> @@ -370,7 +370,7 @@ defmodule Mix.Tasks.Pleroma.User do start_pleroma() Pleroma.User.Query.build(%{local: true}) - |> Pleroma.RepoStreamer.chunk_stream(500) + |> Pleroma.Repo.chunk_stream(500, :batches) |> Stream.each(fn users -> users |> Enum.each(fn user -> diff --git a/lib/pleroma/repo.ex b/lib/pleroma/repo.ex index a75610879..4524bd5e2 100644 --- a/lib/pleroma/repo.ex +++ b/lib/pleroma/repo.ex @@ -49,6 +49,20 @@ defmodule Pleroma.Repo do end end + @doc """ + Returns a lazy enumerable that emits all entries from the data store matching the given query. + + `returns_as` use to group records. use the `batches` option to fetch records in bulk. + + ## Examples + + # fetch records one-by-one + iex> Pleroma.Repo.chunk_stream(Pleroma.Activity.Queries.by_actor(ap_id), 500) + + # fetch records in bulk + iex> Pleroma.Repo.chunk_stream(Pleroma.Activity.Queries.by_actor(ap_id), 500, :batches) + """ + @spec chunk_stream(Ecto.Query.t(), integer(), atom()) :: Enumerable.t() def chunk_stream(query, chunk_size, returns_as \\ :one) do # We don't actually need start and end funcitons of resource streaming, # but it seems to be the only way to not fetch records one-by-one and diff --git a/lib/pleroma/repo_streamer.ex b/lib/pleroma/repo_streamer.ex deleted file mode 100644 index cb4d7bb7a..000000000 --- a/lib/pleroma/repo_streamer.ex +++ /dev/null @@ -1,34 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.RepoStreamer do - alias Pleroma.Repo - import Ecto.Query - - def chunk_stream(query, chunk_size) do - Stream.unfold(0, fn - :halt -> - {[], :halt} - - last_id -> - query - |> order_by(asc: :id) - |> where([r], r.id > ^last_id) - |> limit(^chunk_size) - |> Repo.all() - |> case do - [] -> - {[], :halt} - - records -> - last_id = List.last(records).id - {records, last_id} - end - end) - |> Stream.take_while(fn - [] -> false - _ -> true - end) - end -end diff --git a/lib/pleroma/user.ex b/lib/pleroma/user.ex index e73d19964..57497eb83 100644 --- a/lib/pleroma/user.ex +++ b/lib/pleroma/user.ex @@ -25,7 +25,6 @@ defmodule Pleroma.User do alias Pleroma.Object alias Pleroma.Registration alias Pleroma.Repo - alias Pleroma.RepoStreamer alias Pleroma.User alias Pleroma.UserRelationship alias Pleroma.Web @@ -1775,7 +1774,7 @@ defmodule Pleroma.User do def delete_user_activities(%User{ap_id: ap_id} = user) do ap_id |> Activity.Queries.by_actor() - |> RepoStreamer.chunk_stream(50) + |> Repo.chunk_stream(50, :batches) |> Stream.each(fn activities -> Enum.each(activities, fn activity -> delete_activity(activity, user) end) end) |