aboutsummaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorEgor Kislitsyn <egor@kislitsyn.com>2020-12-02 00:17:52 +0400
committerEgor Kislitsyn <egor@kislitsyn.com>2020-12-02 00:18:58 +0400
commit35ba48494f5129d3a0010b045ff36d98e7e7984f (patch)
tree6e03b23681837c364b94e2f8c09b2f31065819b6 /lib
parent57d0379b89bd323dacf4959b03c55519de173ec0 (diff)
downloadpleroma-35ba48494f5129d3a0010b045ff36d98e7e7984f.tar.gz
Stream follow updates
Diffstat (limited to 'lib')
-rw-r--r--lib/pleroma/following_relationship.ex42
-rw-r--r--lib/pleroma/user.ex12
-rw-r--r--lib/pleroma/user/import.ex2
-rw-r--r--lib/pleroma/web/activity_pub/side_effects.ex7
-rw-r--r--lib/pleroma/web/streamer.ex45
-rw-r--r--lib/pleroma/web/views/streamer_view.ex22
6 files changed, 86 insertions, 44 deletions
diff --git a/lib/pleroma/following_relationship.ex b/lib/pleroma/following_relationship.ex
index 2039a259d..bc6a7eaf9 100644
--- a/lib/pleroma/following_relationship.ex
+++ b/lib/pleroma/following_relationship.ex
@@ -62,23 +62,47 @@ defmodule Pleroma.FollowingRelationship do
follow(follower, following, state)
following_relationship ->
- following_relationship
- |> cast(%{state: state}, [:state])
- |> validate_required([:state])
- |> Repo.update()
+ with {:ok, _following_relationship} <-
+ following_relationship
+ |> cast(%{state: state}, [:state])
+ |> validate_required([:state])
+ |> Repo.update() do
+ after_update(state, follower, following)
+ end
end
end
def follow(%User{} = follower, %User{} = following, state \\ :follow_accept) do
- %__MODULE__{}
- |> changeset(%{follower: follower, following: following, state: state})
- |> Repo.insert(on_conflict: :nothing)
+ with {:ok, _following_relationship} <-
+ %__MODULE__{}
+ |> changeset(%{follower: follower, following: following, state: state})
+ |> Repo.insert(on_conflict: :nothing) do
+ after_update(state, follower, following)
+ end
end
def unfollow(%User{} = follower, %User{} = following) do
case get(follower, following) do
- %__MODULE__{} = following_relationship -> Repo.delete(following_relationship)
- _ -> {:ok, nil}
+ %__MODULE__{} = following_relationship ->
+ with {:ok, _following_relationship} <- Repo.delete(following_relationship) do
+ after_update(:unfollow, follower, following)
+ end
+
+ _ ->
+ {:ok, nil}
+ end
+ end
+
+ defp after_update(state, %User{} = follower, %User{} = following) do
+ with {:ok, following} <- User.update_follower_count(following),
+ {:ok, follower} <- User.update_following_count(follower) do
+ Pleroma.Web.Streamer.stream("relationships:update", %{
+ state: state,
+ following: following,
+ follower: follower
+ })
+
+ {:ok, follower, following}
end
end
diff --git a/lib/pleroma/user.ex b/lib/pleroma/user.ex
index bcd5256c8..676483540 100644
--- a/lib/pleroma/user.ex
+++ b/lib/pleroma/user.ex
@@ -882,7 +882,7 @@ defmodule Pleroma.User do
if not ap_enabled?(followed) do
follow(follower, followed)
else
- {:ok, follower}
+ {:ok, follower, followed}
end
end
@@ -908,11 +908,6 @@ defmodule Pleroma.User do
true ->
FollowingRelationship.follow(follower, followed, state)
-
- {:ok, _} = update_follower_count(followed)
-
- follower
- |> update_following_count()
end
end
@@ -936,11 +931,6 @@ defmodule Pleroma.User do
case get_follow_state(follower, followed) do
state when state in [:follow_pending, :follow_accept] ->
FollowingRelationship.unfollow(follower, followed)
- {:ok, followed} = update_follower_count(followed)
-
- {:ok, follower} = update_following_count(follower)
-
- {:ok, follower, followed}
nil ->
{:error, "Not subscribed!"}
diff --git a/lib/pleroma/user/import.ex b/lib/pleroma/user/import.ex
index e458021c8..86b49d8ae 100644
--- a/lib/pleroma/user/import.ex
+++ b/lib/pleroma/user/import.ex
@@ -45,7 +45,7 @@ defmodule Pleroma.User.Import do
identifiers,
fn identifier ->
with {:ok, %User{} = followed} <- User.get_or_fetch(identifier),
- {:ok, follower} <- User.maybe_direct_follow(follower, followed),
+ {:ok, follower, followed} <- User.maybe_direct_follow(follower, followed),
{:ok, _, _, _} <- CommonAPI.follow(follower, followed) do
followed
else
diff --git a/lib/pleroma/web/activity_pub/side_effects.ex b/lib/pleroma/web/activity_pub/side_effects.ex
index 4d8fb721e..8556fca1d 100644
--- a/lib/pleroma/web/activity_pub/side_effects.ex
+++ b/lib/pleroma/web/activity_pub/side_effects.ex
@@ -47,10 +47,9 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
%User{} = followed <- User.get_cached_by_ap_id(actor),
%User{} = follower <- User.get_cached_by_ap_id(follower_id),
{:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
- {:ok, _relationship} <- FollowingRelationship.update(follower, followed, :follow_accept) do
+ {:ok, _follower, followed} <-
+ FollowingRelationship.update(follower, followed, :follow_accept) do
Notification.update_notification_type(followed, follow_activity)
- User.update_follower_count(followed)
- User.update_following_count(follower)
end
{:ok, object, meta}
@@ -99,7 +98,7 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
) do
with %User{} = follower <- User.get_cached_by_ap_id(following_user),
%User{} = followed <- User.get_cached_by_ap_id(followed_user),
- {_, {:ok, _}, _, _} <-
+ {_, {:ok, _, _}, _, _} <-
{:following, User.follow(follower, followed, :follow_pending), follower, followed} do
if followed.local && !followed.is_locked do
{:ok, accept_data, _} = Builder.accept(followed, object)
diff --git a/lib/pleroma/web/streamer.ex b/lib/pleroma/web/streamer.ex
index 71fe27c89..0b6cc89e9 100644
--- a/lib/pleroma/web/streamer.ex
+++ b/lib/pleroma/web/streamer.ex
@@ -36,9 +36,8 @@ defmodule Pleroma.Web.Streamer do
) ::
{:ok, topic :: String.t()} | {:error, :bad_topic} | {:error, :unauthorized}
def get_topic_and_add_socket(stream, user, oauth_token, params \\ %{}) do
- case get_topic(stream, user, oauth_token, params) do
- {:ok, topic} -> add_socket(topic, user)
- error -> error
+ with {:ok, topic} <- get_topic(stream, user, oauth_token, params) do
+ add_socket(topic, user)
end
end
@@ -70,10 +69,10 @@ defmodule Pleroma.Web.Streamer do
def get_topic(
stream,
%User{id: user_id} = user,
- %Token{user_id: token_user_id} = oauth_token,
+ %Token{user_id: user_id} = oauth_token,
_params
)
- when stream in @user_streams and user_id == token_user_id do
+ when stream in @user_streams do
# Note: "read" works for all user streams (not mentioning it since it's an ancestor scope)
required_scopes =
if stream == "user:notification" do
@@ -97,10 +96,9 @@ defmodule Pleroma.Web.Streamer do
def get_topic(
"list",
%User{id: user_id} = user,
- %Token{user_id: token_user_id} = oauth_token,
+ %Token{user_id: user_id} = oauth_token,
%{"list" => id}
- )
- when user_id == token_user_id do
+ ) do
cond do
OAuthScopesPlug.filter_descendants(["read", "read:lists"], oauth_token.scopes) == [] ->
{:error, :unauthorized}
@@ -137,16 +135,10 @@ defmodule Pleroma.Web.Streamer do
def stream(topics, items) do
if should_env_send?() do
- List.wrap(topics)
- |> Enum.each(fn topic ->
- List.wrap(items)
- |> Enum.each(fn item ->
- spawn(fn -> do_stream(topic, item) end)
- end)
- end)
+ for topic <- List.wrap(topics), item <- List.wrap(items) do
+ spawn(fn -> do_stream(topic, item) end)
+ end
end
-
- :ok
end
def filtered_by_user?(user, item, streamed_type \\ :activity)
@@ -160,8 +152,7 @@ defmodule Pleroma.Web.Streamer do
domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.domain_blocks)
with parent <- Object.normalize(item) || item,
- true <-
- Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)),
+ true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)),
true <- item.data["type"] != "Announce" || item.actor not in reblog_muted_ap_ids,
true <-
!(streamed_type == :activity && item.data["type"] == "Announce" &&
@@ -195,6 +186,22 @@ defmodule Pleroma.Web.Streamer do
end)
end
+ defp do_stream("relationships:update", item) do
+ text = StreamerView.render("relationships_update.json", item)
+
+ [item.follower, item.following]
+ |> Enum.map(fn %{id: id} -> "user:#{id}" end)
+ |> Enum.each(fn user_topic ->
+ Logger.debug("Trying to push relationships:update to #{user_topic}\n\n")
+
+ Registry.dispatch(@registry, user_topic, fn list ->
+ Enum.each(list, fn {pid, _auth} ->
+ send(pid, {:text, text})
+ end)
+ end)
+ end)
+ end
+
defp do_stream("participation", participation) do
user_topic = "direct:#{participation.user_id}"
Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
diff --git a/lib/pleroma/web/views/streamer_view.ex b/lib/pleroma/web/views/streamer_view.ex
index 476a33245..92239a411 100644
--- a/lib/pleroma/web/views/streamer_view.ex
+++ b/lib/pleroma/web/views/streamer_view.ex
@@ -74,6 +74,28 @@ defmodule Pleroma.Web.StreamerView do
|> Jason.encode!()
end
+ def render("relationships_update.json", item) do
+ %{
+ event: "pleroma:relationships_update",
+ payload:
+ %{
+ state: item.state,
+ follower: %{
+ id: item.follower.id,
+ follower_count: item.follower.follower_count,
+ following_count: item.follower.following_count
+ },
+ following: %{
+ id: item.following.id,
+ follower_count: item.following.follower_count,
+ following_count: item.following.following_count
+ }
+ }
+ |> Jason.encode!()
+ }
+ |> Jason.encode!()
+ end
+
def render("conversation.json", %Participation{} = participation) do
%{
event: "conversation",