diff options
-rw-r--r-- | config/config.exs | 4 | ||||
-rw-r--r-- | config/description.exs | 14 | ||||
-rw-r--r-- | config/test.exs | 4 | ||||
-rw-r--r-- | lib/pleroma/user.ex | 148 | ||||
-rw-r--r-- | lib/pleroma/web/activity_pub/pipeline.ex | 6 | ||||
-rw-r--r-- | lib/pleroma/web/streamer.ex | 9 | ||||
-rw-r--r-- | lib/pleroma/workers/background_worker.ex | 41 |
7 files changed, 190 insertions, 36 deletions
diff --git a/config/config.exs b/config/config.exs index 66aee3264..c62f5ca1f 100644 --- a/config/config.exs +++ b/config/config.exs @@ -652,7 +652,9 @@ config :pleroma, :oauth2, issue_new_refresh_token: true, clean_expired_tokens: false -config :pleroma, :database, rum_enabled: false +config :pleroma, :database, + rum_enabled: false, + rollback_on_activity_deletion_errors: true config :pleroma, :env, Mix.env() diff --git a/config/description.exs b/config/description.exs index d9b15e684..5e99e3846 100644 --- a/config/description.exs +++ b/config/description.exs @@ -72,6 +72,20 @@ frontend_options = [ config :pleroma, :config_description, [ %{ group: :pleroma, + key: :database, + type: :group, + description: "Database settings", + children: [ + %{ + key: :rollback_on_activity_deletion_errors, + type: :boolean, + description: + "Rollback the transaction if Pleroma fails to delete an activity during user deletion. If you need to disable this, please report the issue you were having on the bugtracker." + } + ] + }, + %{ + group: :pleroma, key: Pleroma.Upload, type: :group, description: "Upload general settings", diff --git a/config/test.exs b/config/test.exs index 87396a88d..d87047496 100644 --- a/config/test.exs +++ b/config/test.exs @@ -133,6 +133,10 @@ config :pleroma, :side_effects, ap_streamer: Pleroma.Web.ActivityPub.ActivityPubMock, logger: Pleroma.LoggerMock +# Disable transaction check by default unless the test wants otherwise +# because all tests run in a transaction. +config :pleroma, Pleroma.Workers.BackgroundWorker, ignore_transaction_check: true + if File.exists?("./config/test.secret.exs") do import_config "test.secret.exs" else diff --git a/lib/pleroma/user.ex b/lib/pleroma/user.ex index 51f5bc8ea..9837166ea 100644 --- a/lib/pleroma/user.ex +++ b/lib/pleroma/user.ex @@ -1072,7 +1072,19 @@ defmodule Pleroma.User do def update_and_set_cache(changeset) do with {:ok, user} <- Repo.update(changeset, stale_error_field: :id) do - set_cache(user) + BackgroundWorker.execute_or_enqueue_if_in_transaction(fn + false -> + set_cache(user) + + # If the function has been enqueued, there is a chance something changed + # before the worker got to executing it, so refetch the user from the database + true -> + user.id + |> get_by_id() + |> set_cache() + end) + + {:ok, user} end end @@ -1339,7 +1351,7 @@ defmodule Pleroma.User do user |> follow_information_changeset(%{follower_count: follower_count}) - |> update_and_set_cache + |> update_and_set_cache() else {:ok, maybe_fetch_follow_information(user)} end @@ -1747,27 +1759,67 @@ defmodule Pleroma.User do @spec perform(atom(), User.t()) :: {:ok, User.t()} def perform(:delete, %User{} = user) do - # Remove all relationships - user - |> get_followers() - |> Enum.each(fn follower -> - ActivityPub.unfollow(follower, user) - unfollow(follower, user) - end) + # Deactivate the user before starting the deletion + # to make sure they are not able to make new posts/follows during it + {:ok, user} = set_activation_status(user, false) + + Repo.transaction( + fn -> + # Remove all relationships + # No need to handle errors from ActivityPub.unfollow because + # they will automatically rollback the transaction. + user + |> get_followers() + |> Enum.each(fn follower -> + ActivityPub.unfollow(follower, user) + unfollow(follower, user) + end) - user - |> get_friends() - |> Enum.each(fn followed -> - ActivityPub.unfollow(user, followed) - unfollow(user, followed) - end) + user + |> get_friends() + |> Enum.each(fn followed -> + ActivityPub.unfollow(user, followed) + unfollow(user, followed) + end) + + rollback_on_activity_deletion_errors = + Config.get([:database, :rollback_on_activity_deletion_errors], true) - delete_user_activities(user) - delete_notifications_from_user_activities(user) + case {delete_user_activities(user), rollback_on_activity_deletion_errors} do + {res, rollback} when res == :ok or rollback == false -> + case res do + {:error, _} -> + Logger.warn(fn -> + "Deleting #{user.ap_id}: Failed deleting some of the activities, proceeding anyway." + end) - delete_outgoing_pending_follow_requests(user) + _ -> + :noop + end - delete_or_deactivate(user) + delete_notifications_from_user_activities(user) + + delete_outgoing_pending_follow_requests(user) + + case delete_or_deactivate(user) do + {:ok, user} -> user + {:error, e} -> Repo.rollback(e) + end + + {{:error, e}, true} -> + Logger.error(fn -> + """ + Deleting #{user.ap_id}: Failed deleting some of the activities, rolling back. + Set `config :pleroma, :database, rollback_on_activity_deletion_errors: true` + and restart the deletion if you want to continue anyway. Please report this on Pleroma bugtracker. + """ + end) + + Repo.rollback({:deleting_activities, e}) + end + end, + timeout: :infinity + ) end def perform(:set_activation_async, user, status), do: set_activation(user, status) @@ -1807,16 +1859,48 @@ defmodule Pleroma.User do |> Repo.delete_all() end + @type activity_id :: String.t() + @spec delete_user_activities(User.t()) :: + :ok | {:error, [{:error, activity_id(), any()}]} def delete_user_activities(%User{ap_id: ap_id} = user) do - ap_id - |> Activity.Queries.by_actor() - |> Repo.chunk_stream(50, :batches) - |> Stream.each(fn activities -> - Enum.each(activities, fn activity -> delete_activity(activity, user) end) - end) - |> Stream.run() + errors = + ap_id + |> Activity.Queries.by_actor() + |> Repo.chunk_stream(50) + |> Stream.flat_map(fn activity -> + case delete_activity(activity, user) do + {:ok, _activity, _meta} -> + [] + + {:error, error} -> + Logger.error(fn -> + "Deleting #{ap_id}: could not delete or undo #{activity.data["id"]}.\n Reason: #{ + inspect(error) + }" + end) + + [{:error, activity.id, error}] + + :noop -> + Logger.debug(fn -> + "Deleting #{ap_id}: nothing to do for #{activity.data["id"]} of type #{ + activity.data["type"] + }" + end) + + [] + end + end) + |> Enum.to_list() + + case errors do + [] -> :ok + errors -> {:error, errors} + end end + @spec delete_activity(Pleroma.Activity.t(), User.t()) :: + {:ok, Activity.t(), keyword()} | {:error, any()} | :noop defp delete_activity(%{data: %{"type" => "Create", "object" => object}} = activity, user) do with {_, %Object{}} <- {:find_object, Object.get_by_ap_id(object)}, {:ok, delete_data, _} <- Builder.delete(user, object) do @@ -1831,18 +1915,20 @@ defmodule Pleroma.User do end e -> - Logger.error("Could not delete #{object} created by #{activity.data["ap_id"]}") - Logger.error("Error: #{inspect(e)}") + e end end defp delete_activity(%{data: %{"type" => type}} = activity, user) when type in ["Like", "Announce"] do - {:ok, undo, _} = Builder.undo(user, activity) - Pipeline.common_pipeline(undo, local: user.local) + with {:ok, undo, _} <- Builder.undo(user, activity) do + Pipeline.common_pipeline(undo, local: user.local) + else + e -> e + end end - defp delete_activity(_activity, _user), do: "Doing nothing" + defp delete_activity(_activity, _user), do: :noop defp delete_outgoing_pending_follow_requests(user) do user diff --git a/lib/pleroma/web/activity_pub/pipeline.ex b/lib/pleroma/web/activity_pub/pipeline.ex index 195596f94..405649fb1 100644 --- a/lib/pleroma/web/activity_pub/pipeline.ex +++ b/lib/pleroma/web/activity_pub/pipeline.ex @@ -13,6 +13,7 @@ defmodule Pleroma.Web.ActivityPub.Pipeline do alias Pleroma.Web.ActivityPub.SideEffects alias Pleroma.Web.ActivityPub.Visibility alias Pleroma.Web.Federator + alias Pleroma.Workers.BackgroundWorker @side_effects Config.get([:pipeline, :side_effects], SideEffects) @federator Config.get([:pipeline, :federator], Federator) @@ -26,7 +27,10 @@ defmodule Pleroma.Web.ActivityPub.Pipeline do def common_pipeline(object, meta) do case Repo.transaction(fn -> do_common_pipeline(object, meta) end) do {:ok, {:ok, activity, meta}} -> - @side_effects.handle_after_transaction(meta) + BackgroundWorker.execute_or_enqueue_if_in_transaction(fn -> + @side_effects.handle_after_transaction(meta) + end) + {:ok, activity, meta} {:ok, value} -> diff --git a/lib/pleroma/web/streamer.ex b/lib/pleroma/web/streamer.ex index fc3bbb130..f63f16a79 100644 --- a/lib/pleroma/web/streamer.ex +++ b/lib/pleroma/web/streamer.ex @@ -18,6 +18,7 @@ defmodule Pleroma.Web.Streamer do alias Pleroma.Web.OAuth.Token alias Pleroma.Web.Plugs.OAuthScopesPlug alias Pleroma.Web.StreamerView + alias Pleroma.Workers.BackgroundWorker @mix_env Mix.env() @registry Pleroma.Web.StreamerRegistry @@ -135,9 +136,11 @@ defmodule Pleroma.Web.Streamer do def stream(topics, items) do if should_env_send?() do - for topic <- List.wrap(topics), item <- List.wrap(items) do - spawn(fn -> do_stream(topic, item) end) - end + BackgroundWorker.execute_or_enqueue_if_in_transaction(fn -> + for topic <- List.wrap(topics), item <- List.wrap(items) do + spawn(fn -> do_stream(topic, item) end) + end + end) end end diff --git a/lib/pleroma/workers/background_worker.ex b/lib/pleroma/workers/background_worker.ex index 1e28384cb..0465fd06f 100644 --- a/lib/pleroma/workers/background_worker.ex +++ b/lib/pleroma/workers/background_worker.ex @@ -38,4 +38,45 @@ defmodule Pleroma.Workers.BackgroundWorker do Pleroma.FollowingRelationship.move_following(origin, target) end + + def perform(%Job{args: %{"op" => "transaction_side_effects", "function" => encoded_function}}) do + function = + encoded_function + |> Base.decode64!() + |> :erlang.binary_to_term() + + maybe_execute_function_with_worker_info(function, true) + :ok + end + + @doc "Executes a function right away if not running in transaction. Otherwise enqueues it to be executed by BackgroundWorker after transaction commit. Intended for side effects that can not be rolled back. If the function has an arity of 1, the first argument will be a boolean indicating whether it is run by BackgroundWorker or not." + @spec execute_or_enqueue_if_in_transaction((() -> any()) | (boolean() -> any())) :: + {:ok, {:enqueued, Oban.Job.t()}} + | {:error, {:enqueue, Oban.job_changeset()}} + | {:error, {:enqueue, term()}} + | {:ok, {:executed, term()}} + def execute_or_enqueue_if_in_transaction(function) do + if Pleroma.Repo.in_transaction?() and + !Pleroma.Config.get([__MODULE__, :ignore_transaction_check], false) do + encoded_function = + function + |> :erlang.term_to_binary() + |> Base.encode64() + + case enqueue("transaction_side_effects", %{"function" => encoded_function}) do + {:ok, job} -> {:ok, {:enqueued, job}} + {:error, e} -> {:error, {:enqueue, e}} + end + else + {:ok, {:executed, maybe_execute_function_with_worker_info(function, false)}} + end + end + + defp maybe_execute_function_with_worker_info(function, executed_by_worker) do + if :erlang.fun_info(function)[:arity] == 1 do + function.(executed_by_worker) + else + function.() + end + end end |