diff options
author | Hélène <pleroma-dev@helene.moe> | 2022-08-15 01:15:23 +0200 |
---|---|---|
committer | Hélène <pleroma-dev@helene.moe> | 2022-08-15 01:47:09 +0200 |
commit | 88c1c76d3eca3412d1e02008f1b8d96fe8fe0b96 (patch) | |
tree | da0e4065e3ed511f3c7ba4b0d40307b9da9f748c /lib/pleroma | |
parent | bb02ee99f58e378e33162211f41fe5979d5da8ae (diff) | |
download | pleroma-88c1c76d3eca3412d1e02008f1b8d96fe8fe0b96.tar.gz |
Migrations: delete contexts with BaseMigrator
Due to the lengthiness of this task, the migration has been adapted into
a BaseMigrator migration, running in the background instead.
Diffstat (limited to 'lib/pleroma')
-rw-r--r-- | lib/pleroma/application.ex | 3 | ||||
-rw-r--r-- | lib/pleroma/data_migration.ex | 1 | ||||
-rw-r--r-- | lib/pleroma/migrators/context_objects_deletion_migrator.ex | 139 |
3 files changed, 142 insertions, 1 deletions
diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex index d808bc732..c546713ca 100644 --- a/lib/pleroma/application.ex +++ b/lib/pleroma/application.ex @@ -238,7 +238,8 @@ defmodule Pleroma.Application do defp background_migrators do [ - Pleroma.Migrators.HashtagsTableMigrator + Pleroma.Migrators.HashtagsTableMigrator, + Pleroma.Migrators.ContextObjectsDeletionMigrator ] end diff --git a/lib/pleroma/data_migration.ex b/lib/pleroma/data_migration.ex index 59d891d8d..8451678fc 100644 --- a/lib/pleroma/data_migration.ex +++ b/lib/pleroma/data_migration.ex @@ -42,4 +42,5 @@ defmodule Pleroma.DataMigration do end def populate_hashtags_table, do: get_by_name("populate_hashtags_table") + def delete_context_objects, do: get_by_name("delete_context_objects") end diff --git a/lib/pleroma/migrators/context_objects_deletion_migrator.ex b/lib/pleroma/migrators/context_objects_deletion_migrator.ex new file mode 100644 index 000000000..fb224795a --- /dev/null +++ b/lib/pleroma/migrators/context_objects_deletion_migrator.ex @@ -0,0 +1,139 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Migrators.ContextObjectsDeletionMigrator do + defmodule State do + use Pleroma.Migrators.Support.BaseMigratorState + + @impl Pleroma.Migrators.Support.BaseMigratorState + defdelegate data_migration(), to: Pleroma.DataMigration, as: :delete_context_objects + end + + use Pleroma.Migrators.Support.BaseMigrator + + alias Pleroma.Migrators.Support.BaseMigrator + alias Pleroma.Object + + @doc "This migration removes objects created exclusively for contexts, containing only an `id` field." + + @impl BaseMigrator + def feature_config_path, do: [:features, :delete_context_objects] + + @impl BaseMigrator + def fault_rate_allowance, do: Config.get([:delete_context_objects, :fault_rate_allowance], 0) + + @impl BaseMigrator + def perform do + data_migration_id = data_migration_id() + max_processed_id = get_stat(:max_processed_id, 0) + + Logger.info("Deleting context objects from `objects` (from oid: #{max_processed_id})...") + + query() + |> where([object], object.id > ^max_processed_id) + |> Repo.chunk_stream(100, :batches, timeout: :infinity) + |> Stream.each(fn objects -> + object_ids = Enum.map(objects, & &1.id) + + results = Enum.map(object_ids, &delete_context_object(&1)) + + failed_ids = + results + |> Enum.filter(&(elem(&1, 0) == :error)) + |> Enum.map(&elem(&1, 1)) + + chunk_affected_count = + results + |> Enum.filter(&(elem(&1, 0) == :ok)) + |> length() + + for failed_id <- failed_ids do + _ = + Repo.query( + "INSERT INTO data_migration_failed_ids(data_migration_id, record_id) " <> + "VALUES ($1, $2) ON CONFLICT DO NOTHING;", + [data_migration_id, failed_id] + ) + end + + _ = + Repo.query( + "DELETE FROM data_migration_failed_ids " <> + "WHERE data_migration_id = $1 AND record_id = ANY($2)", + [data_migration_id, object_ids -- failed_ids] + ) + + max_object_id = Enum.at(object_ids, -1) + + put_stat(:max_processed_id, max_object_id) + increment_stat(:iteration_processed_count, length(object_ids)) + increment_stat(:processed_count, length(object_ids)) + increment_stat(:failed_count, length(failed_ids)) + increment_stat(:affected_count, chunk_affected_count) + put_stat(:records_per_second, records_per_second()) + persist_state() + + # A quick and dirty approach to controlling the load this background migration imposes + sleep_interval = Config.get([:delete_context_objects, :sleep_interval_ms], 0) + Process.sleep(sleep_interval) + end) + |> Stream.run() + end + + @impl BaseMigrator + def query do + # Context objects have no activity type, and only one field, `id`. + # Only those context objects are without types. + from( + object in Object, + where: fragment("(?)->'type' IS NULL", object.data), + select: %{ + id: object.id + } + ) + end + + @spec delete_context_object(integer()) :: {:ok | :error, integer()} + defp delete_context_object(id) do + result = + %Object{id: id} + |> Repo.delete() + |> elem(0) + + {result, id} + end + + @impl BaseMigrator + def retry_failed do + data_migration_id = data_migration_id() + + failed_objects_query() + |> Repo.chunk_stream(100, :one) + |> Stream.each(fn object -> + with {res, _} when res != :error <- delete_context_object(object.id) do + _ = + Repo.query( + "DELETE FROM data_migration_failed_ids " <> + "WHERE data_migration_id = $1 AND record_id = $2", + [data_migration_id, object.id] + ) + end + end) + |> Stream.run() + + put_stat(:failed_count, failures_count()) + persist_state() + + force_continue() + end + + defp failed_objects_query do + from(o in Object) + |> join(:inner, [o], dmf in fragment("SELECT * FROM data_migration_failed_ids"), + on: dmf.record_id == o.id + ) + |> where([_o, dmf], dmf.data_migration_id == ^data_migration_id()) + |> order_by([o], asc: o.id) + end +end |