aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorlain <lain@soykaf.club>2020-09-16 12:22:48 +0000
committerlain <lain@soykaf.club>2020-09-16 12:22:48 +0000
commit6b088ed76a7c4b895c4bff4113e291bd53209273 (patch)
tree9d7f7737f6cff97984ee0cc83930843bac02d655
parentcfad4f46b2d53f67722f7b8046b673d60a9cefcd (diff)
parent599f8bb152ca0669d17baa5f313f00f0791209b6 (diff)
downloadpleroma-6b088ed76a7c4b895c4bff4113e291bd53209273.tar.gz
Merge branch 'issue/2089' into 'develop'
[#2089] fix notifications See merge request pleroma/pleroma!3000
-rw-r--r--lib/mix/tasks/pleroma/database.ex4
-rw-r--r--lib/mix/tasks/pleroma/user.ex4
-rw-r--r--lib/pleroma/migration_helper/notification_backfill.ex15
-rw-r--r--lib/pleroma/repo.ex23
-rw-r--r--lib/pleroma/repo_streamer.ex34
-rw-r--r--lib/pleroma/user.ex3
-rw-r--r--priv/repo/migrations/20200914105638_delete_notification_without_activity.exs30
-rw-r--r--priv/repo/migrations/20200914105800_add_notification_constraints.exs23
-rw-r--r--test/marker_test.exs4
-rw-r--r--test/repo_test.exs32
-rw-r--r--test/web/mastodon_api/controllers/account_controller_test.exs5
-rw-r--r--test/web/mastodon_api/controllers/marker_controller_test.exs2
-rw-r--r--test/web/mastodon_api/views/account_view_test.exs2
13 files changed, 125 insertions, 56 deletions
diff --git a/lib/mix/tasks/pleroma/database.ex b/lib/mix/tasks/pleroma/database.ex
index 7f1108dcf..a01c36ece 100644
--- a/lib/mix/tasks/pleroma/database.ex
+++ b/lib/mix/tasks/pleroma/database.ex
@@ -99,7 +99,7 @@ defmodule Mix.Tasks.Pleroma.Database do
where: fragment("(?)->>'likes' is not null", object.data),
select: %{id: object.id, likes: fragment("(?)->>'likes'", object.data)}
)
- |> Pleroma.RepoStreamer.chunk_stream(100)
+ |> Pleroma.Repo.chunk_stream(100, :batches)
|> Stream.each(fn objects ->
ids =
objects
@@ -145,7 +145,7 @@ defmodule Mix.Tasks.Pleroma.Database do
|> where(local: true)
|> where([a], fragment("(? ->> 'type'::text) = 'Create'", a.data))
|> where([_a, o], fragment("?->>'type' = 'Note'", o.data))
- |> Pleroma.RepoStreamer.chunk_stream(100)
+ |> Pleroma.Repo.chunk_stream(100, :batches)
|> Stream.each(fn activities ->
Enum.each(activities, fn activity ->
expires_at =
diff --git a/lib/mix/tasks/pleroma/user.ex b/lib/mix/tasks/pleroma/user.ex
index 01824aa18..b20c49d89 100644
--- a/lib/mix/tasks/pleroma/user.ex
+++ b/lib/mix/tasks/pleroma/user.ex
@@ -179,7 +179,7 @@ defmodule Mix.Tasks.Pleroma.User do
start_pleroma()
Pleroma.User.Query.build(%{nickname: "@#{instance}"})
- |> Pleroma.RepoStreamer.chunk_stream(500)
+ |> Pleroma.Repo.chunk_stream(500, :batches)
|> Stream.each(fn users ->
users
|> Enum.each(fn user ->
@@ -370,7 +370,7 @@ defmodule Mix.Tasks.Pleroma.User do
start_pleroma()
Pleroma.User.Query.build(%{local: true})
- |> Pleroma.RepoStreamer.chunk_stream(500)
+ |> Pleroma.Repo.chunk_stream(500, :batches)
|> Stream.each(fn users ->
users
|> Enum.each(fn user ->
diff --git a/lib/pleroma/migration_helper/notification_backfill.ex b/lib/pleroma/migration_helper/notification_backfill.ex
index d260e62ca..24f4733fe 100644
--- a/lib/pleroma/migration_helper/notification_backfill.ex
+++ b/lib/pleroma/migration_helper/notification_backfill.ex
@@ -19,13 +19,13 @@ defmodule Pleroma.MigrationHelper.NotificationBackfill do
query
|> Repo.chunk_stream(100)
|> Enum.each(fn notification ->
- type =
- notification.activity
- |> type_from_activity()
+ if notification.activity do
+ type = type_from_activity(notification.activity)
- notification
- |> Ecto.Changeset.change(%{type: type})
- |> Repo.update()
+ notification
+ |> Ecto.Changeset.change(%{type: type})
+ |> Repo.update()
+ end
end)
end
@@ -72,8 +72,7 @@ defmodule Pleroma.MigrationHelper.NotificationBackfill do
"pleroma:emoji_reaction"
"Create" ->
- activity
- |> type_from_activity_object()
+ type_from_activity_object(activity)
t ->
raise "No notification type for activity type #{t}"
diff --git a/lib/pleroma/repo.ex b/lib/pleroma/repo.ex
index f317e4d58..4524bd5e2 100644
--- a/lib/pleroma/repo.ex
+++ b/lib/pleroma/repo.ex
@@ -49,7 +49,21 @@ defmodule Pleroma.Repo do
end
end
- def chunk_stream(query, chunk_size) do
+ @doc """
+ Returns a lazy enumerable that emits all entries from the data store matching the given query.
+
+ `returns_as` use to group records. use the `batches` option to fetch records in bulk.
+
+ ## Examples
+
+ # fetch records one-by-one
+ iex> Pleroma.Repo.chunk_stream(Pleroma.Activity.Queries.by_actor(ap_id), 500)
+
+ # fetch records in bulk
+ iex> Pleroma.Repo.chunk_stream(Pleroma.Activity.Queries.by_actor(ap_id), 500, :batches)
+ """
+ @spec chunk_stream(Ecto.Query.t(), integer(), atom()) :: Enumerable.t()
+ def chunk_stream(query, chunk_size, returns_as \\ :one) do
# We don't actually need start and end funcitons of resource streaming,
# but it seems to be the only way to not fetch records one-by-one and
# have individual records be the elements of the stream, instead of
@@ -69,7 +83,12 @@ defmodule Pleroma.Repo do
records ->
last_id = List.last(records).id
- {records, last_id}
+
+ if returns_as == :one do
+ {records, last_id}
+ else
+ {[records], last_id}
+ end
end
end,
fn _ -> :ok end
diff --git a/lib/pleroma/repo_streamer.ex b/lib/pleroma/repo_streamer.ex
deleted file mode 100644
index cb4d7bb7a..000000000
--- a/lib/pleroma/repo_streamer.ex
+++ /dev/null
@@ -1,34 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.RepoStreamer do
- alias Pleroma.Repo
- import Ecto.Query
-
- def chunk_stream(query, chunk_size) do
- Stream.unfold(0, fn
- :halt ->
- {[], :halt}
-
- last_id ->
- query
- |> order_by(asc: :id)
- |> where([r], r.id > ^last_id)
- |> limit(^chunk_size)
- |> Repo.all()
- |> case do
- [] ->
- {[], :halt}
-
- records ->
- last_id = List.last(records).id
- {records, last_id}
- end
- end)
- |> Stream.take_while(fn
- [] -> false
- _ -> true
- end)
- end
-end
diff --git a/lib/pleroma/user.ex b/lib/pleroma/user.ex
index e73d19964..57497eb83 100644
--- a/lib/pleroma/user.ex
+++ b/lib/pleroma/user.ex
@@ -25,7 +25,6 @@ defmodule Pleroma.User do
alias Pleroma.Object
alias Pleroma.Registration
alias Pleroma.Repo
- alias Pleroma.RepoStreamer
alias Pleroma.User
alias Pleroma.UserRelationship
alias Pleroma.Web
@@ -1775,7 +1774,7 @@ defmodule Pleroma.User do
def delete_user_activities(%User{ap_id: ap_id} = user) do
ap_id
|> Activity.Queries.by_actor()
- |> RepoStreamer.chunk_stream(50)
+ |> Repo.chunk_stream(50, :batches)
|> Stream.each(fn activities ->
Enum.each(activities, fn activity -> delete_activity(activity, user) end)
end)
diff --git a/priv/repo/migrations/20200914105638_delete_notification_without_activity.exs b/priv/repo/migrations/20200914105638_delete_notification_without_activity.exs
new file mode 100644
index 000000000..9333fc5a1
--- /dev/null
+++ b/priv/repo/migrations/20200914105638_delete_notification_without_activity.exs
@@ -0,0 +1,30 @@
+defmodule Pleroma.Repo.Migrations.DeleteNotificationWithoutActivity do
+ use Ecto.Migration
+
+ import Ecto.Query
+ alias Pleroma.Repo
+
+ def up do
+ from(
+ q in Pleroma.Notification,
+ left_join: c in assoc(q, :activity),
+ select: %{id: type(q.id, :integer)},
+ where: is_nil(c.id)
+ )
+ |> Repo.chunk_stream(1_000, :batches)
+ |> Stream.each(fn records ->
+ notification_ids = Enum.map(records, fn %{id: id} -> id end)
+
+ Repo.delete_all(
+ from(n in "notifications",
+ where: n.id in ^notification_ids
+ )
+ )
+ end)
+ |> Stream.run()
+ end
+
+ def down do
+ :ok
+ end
+end
diff --git a/priv/repo/migrations/20200914105800_add_notification_constraints.exs b/priv/repo/migrations/20200914105800_add_notification_constraints.exs
new file mode 100644
index 000000000..a65c35fd0
--- /dev/null
+++ b/priv/repo/migrations/20200914105800_add_notification_constraints.exs
@@ -0,0 +1,23 @@
+defmodule Pleroma.Repo.Migrations.AddNotificationConstraints do
+ use Ecto.Migration
+
+ def up do
+ drop(constraint(:notifications, "notifications_activity_id_fkey"))
+
+ alter table(:notifications) do
+ modify(:activity_id, references(:activities, type: :uuid, on_delete: :delete_all),
+ null: false
+ )
+ end
+ end
+
+ def down do
+ drop(constraint(:notifications, "notifications_activity_id_fkey"))
+
+ alter table(:notifications) do
+ modify(:activity_id, references(:activities, type: :uuid, on_delete: :delete_all),
+ null: true
+ )
+ end
+ end
+end
diff --git a/test/marker_test.exs b/test/marker_test.exs
index 5b6d0b4a4..7b3943c7b 100644
--- a/test/marker_test.exs
+++ b/test/marker_test.exs
@@ -33,8 +33,8 @@ defmodule Pleroma.MarkerTest do
test "returns user markers" do
user = insert(:user)
marker = insert(:marker, user: user)
- insert(:notification, user: user)
- insert(:notification, user: user)
+ insert(:notification, user: user, activity: insert(:note_activity))
+ insert(:notification, user: user, activity: insert(:note_activity))
insert(:marker, timeline: "home", user: user)
assert Marker.get_markers(
diff --git a/test/repo_test.exs b/test/repo_test.exs
index 92e827c95..155791be2 100644
--- a/test/repo_test.exs
+++ b/test/repo_test.exs
@@ -37,7 +37,9 @@ defmodule Pleroma.RepoTest do
test "get one-to-many assoc from repo" do
user = insert(:user)
- notification = refresh_record(insert(:notification, user: user))
+
+ notification =
+ refresh_record(insert(:notification, user: user, activity: insert(:note_activity)))
assert Repo.get_assoc(user, :notifications) == {:ok, [notification]}
end
@@ -47,4 +49,32 @@ defmodule Pleroma.RepoTest do
assert Repo.get_assoc(token, :user) == {:error, :not_found}
end
end
+
+ describe "chunk_stream/3" do
+ test "fetch records one-by-one" do
+ users = insert_list(50, :user)
+
+ {fetch_users, 50} =
+ from(t in User)
+ |> Repo.chunk_stream(5)
+ |> Enum.reduce({[], 0}, fn %User{} = user, {acc, count} ->
+ {acc ++ [user], count + 1}
+ end)
+
+ assert users == fetch_users
+ end
+
+ test "fetch records in bulk" do
+ users = insert_list(50, :user)
+
+ {fetch_users, 10} =
+ from(t in User)
+ |> Repo.chunk_stream(5, :batches)
+ |> Enum.reduce({[], 0}, fn users, {acc, count} ->
+ {acc ++ users, count + 1}
+ end)
+
+ assert users == fetch_users
+ end
+ end
end
diff --git a/test/web/mastodon_api/controllers/account_controller_test.exs b/test/web/mastodon_api/controllers/account_controller_test.exs
index 17a1e7d66..f7f1369e4 100644
--- a/test/web/mastodon_api/controllers/account_controller_test.exs
+++ b/test/web/mastodon_api/controllers/account_controller_test.exs
@@ -1442,7 +1442,10 @@ defmodule Pleroma.Web.MastodonAPI.AccountControllerTest do
describe "verify_credentials" do
test "verify_credentials" do
%{user: user, conn: conn} = oauth_access(["read:accounts"])
- [notification | _] = insert_list(7, :notification, user: user)
+
+ [notification | _] =
+ insert_list(7, :notification, user: user, activity: insert(:note_activity))
+
Pleroma.Notification.set_read_up_to(user, notification.id)
conn = get(conn, "/api/v1/accounts/verify_credentials")
diff --git a/test/web/mastodon_api/controllers/marker_controller_test.exs b/test/web/mastodon_api/controllers/marker_controller_test.exs
index 6dd40fb4a..9f0481120 100644
--- a/test/web/mastodon_api/controllers/marker_controller_test.exs
+++ b/test/web/mastodon_api/controllers/marker_controller_test.exs
@@ -11,7 +11,7 @@ defmodule Pleroma.Web.MastodonAPI.MarkerControllerTest do
test "gets markers with correct scopes", %{conn: conn} do
user = insert(:user)
token = insert(:oauth_token, user: user, scopes: ["read:statuses"])
- insert_list(7, :notification, user: user)
+ insert_list(7, :notification, user: user, activity: insert(:note_activity))
{:ok, %{"notifications" => marker}} =
Pleroma.Marker.upsert(
diff --git a/test/web/mastodon_api/views/account_view_test.exs b/test/web/mastodon_api/views/account_view_test.exs
index 9f22f9dcf..c5f491d6b 100644
--- a/test/web/mastodon_api/views/account_view_test.exs
+++ b/test/web/mastodon_api/views/account_view_test.exs
@@ -448,7 +448,7 @@ defmodule Pleroma.Web.MastodonAPI.AccountViewTest do
test "shows unread_count only to the account owner" do
user = insert(:user)
- insert_list(7, :notification, user: user)
+ insert_list(7, :notification, user: user, activity: insert(:note_activity))
other_user = insert(:user)
user = User.get_cached_by_ap_id(user.ap_id)