aboutsummaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/pleroma/repo_streamer.ex34
-rw-r--r--lib/pleroma/user.ex46
-rw-r--r--lib/pleroma/web/activity_pub/utils.ex18
3 files changed, 79 insertions, 19 deletions
diff --git a/lib/pleroma/repo_streamer.ex b/lib/pleroma/repo_streamer.ex
new file mode 100644
index 000000000..a4b71a1bb
--- /dev/null
+++ b/lib/pleroma/repo_streamer.ex
@@ -0,0 +1,34 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.RepoStreamer do
+ alias Pleroma.Repo
+ import Ecto.Query
+
+ def chunk_stream(query, chunk_size) do
+ Stream.unfold(0, fn
+ :halt ->
+ {[], :halt}
+
+ last_id ->
+ query
+ |> order_by(asc: :id)
+ |> where([r], r.id > ^last_id)
+ |> limit(^chunk_size)
+ |> Repo.all()
+ |> case do
+ [] ->
+ {[], :halt}
+
+ records ->
+ last_id = List.last(records).id
+ {records, last_id}
+ end
+ end)
+ |> Stream.take_while(fn
+ [] -> false
+ _ -> true
+ end)
+ end
+end
diff --git a/lib/pleroma/user.ex b/lib/pleroma/user.ex
index 3a9ae8d73..1e59a4121 100644
--- a/lib/pleroma/user.ex
+++ b/lib/pleroma/user.ex
@@ -15,6 +15,7 @@ defmodule Pleroma.User do
alias Pleroma.Object
alias Pleroma.Registration
alias Pleroma.Repo
+ alias Pleroma.RepoStreamer
alias Pleroma.User
alias Pleroma.Web
alias Pleroma.Web.ActivityPub.ActivityPub
@@ -932,18 +933,24 @@ defmodule Pleroma.User do
@spec perform(atom(), User.t()) :: {:ok, User.t()}
def perform(:delete, %User{} = user) do
- {:ok, user} = User.deactivate(user)
-
# Remove all relationships
{:ok, followers} = User.get_followers(user)
- Enum.each(followers, fn follower -> User.unfollow(follower, user) end)
+ Enum.each(followers, fn follower ->
+ ActivityPub.unfollow(follower, user)
+ User.unfollow(follower, user)
+ end)
{:ok, friends} = User.get_friends(user)
- Enum.each(friends, fn followed -> User.unfollow(user, followed) end)
+ Enum.each(friends, fn followed ->
+ ActivityPub.unfollow(user, followed)
+ User.unfollow(user, followed)
+ end)
delete_user_activities(user)
+
+ {:ok, _user} = Repo.delete(user)
end
@spec perform(atom(), User.t()) :: {:ok, User.t()}
@@ -1016,18 +1023,35 @@ defmodule Pleroma.User do
])
def delete_user_activities(%User{ap_id: ap_id} = user) do
- stream =
- ap_id
- |> Activity.query_by_actor()
- |> Repo.stream()
-
- Repo.transaction(fn -> Enum.each(stream, &delete_activity(&1)) end, timeout: :infinity)
+ ap_id
+ |> Activity.query_by_actor()
+ |> RepoStreamer.chunk_stream(50)
+ |> Stream.each(fn activities ->
+ Enum.each(activities, &delete_activity(&1))
+ end)
+ |> Stream.run()
{:ok, user}
end
defp delete_activity(%{data: %{"type" => "Create"}} = activity) do
- Object.normalize(activity) |> ActivityPub.delete()
+ activity
+ |> Object.normalize()
+ |> ActivityPub.delete()
+ end
+
+ defp delete_activity(%{data: %{"type" => "Like"}} = activity) do
+ user = get_cached_by_ap_id(activity.actor)
+ object = Object.normalize(activity)
+
+ ActivityPub.unlike(user, object)
+ end
+
+ defp delete_activity(%{data: %{"type" => "Announce"}} = activity) do
+ user = get_cached_by_ap_id(activity.actor)
+ object = Object.normalize(activity)
+
+ ActivityPub.unannounce(user, object)
end
defp delete_activity(_activity), do: "Doing nothing"
diff --git a/lib/pleroma/web/activity_pub/utils.ex b/lib/pleroma/web/activity_pub/utils.ex
index 10ff572a2..514266cee 100644
--- a/lib/pleroma/web/activity_pub/utils.ex
+++ b/lib/pleroma/web/activity_pub/utils.ex
@@ -151,16 +151,18 @@ defmodule Pleroma.Web.ActivityPub.Utils do
def create_context(context) do
context = context || generate_id("contexts")
- changeset = Object.context_mapping(context)
- case Repo.insert(changeset) do
- {:ok, object} ->
- object
+ # Ecto has problems accessing the constraint inside the jsonb,
+ # so we explicitly check for the existed object before insert
+ object = Object.get_cached_by_ap_id(context)
- # This should be solved by an upsert, but it seems ecto
- # has problems accessing the constraint inside the jsonb.
- {:error, _} ->
- Object.get_cached_by_ap_id(context)
+ with true <- is_nil(object),
+ changeset <- Object.context_mapping(context),
+ {:ok, inserted_object} <- Repo.insert(changeset) do
+ inserted_object
+ else
+ _ ->
+ object
end
end