diff options
author | Egor Kislitsyn <egor@kislitsyn.com> | 2020-12-02 00:17:52 +0400 |
---|---|---|
committer | Egor Kislitsyn <egor@kislitsyn.com> | 2020-12-02 00:18:58 +0400 |
commit | 35ba48494f5129d3a0010b045ff36d98e7e7984f (patch) | |
tree | 6e03b23681837c364b94e2f8c09b2f31065819b6 /lib | |
parent | 57d0379b89bd323dacf4959b03c55519de173ec0 (diff) | |
download | pleroma-35ba48494f5129d3a0010b045ff36d98e7e7984f.tar.gz |
Stream follow updates
Diffstat (limited to 'lib')
-rw-r--r-- | lib/pleroma/following_relationship.ex | 42 | ||||
-rw-r--r-- | lib/pleroma/user.ex | 12 | ||||
-rw-r--r-- | lib/pleroma/user/import.ex | 2 | ||||
-rw-r--r-- | lib/pleroma/web/activity_pub/side_effects.ex | 7 | ||||
-rw-r--r-- | lib/pleroma/web/streamer.ex | 45 | ||||
-rw-r--r-- | lib/pleroma/web/views/streamer_view.ex | 22 |
6 files changed, 86 insertions, 44 deletions
diff --git a/lib/pleroma/following_relationship.ex b/lib/pleroma/following_relationship.ex index 2039a259d..bc6a7eaf9 100644 --- a/lib/pleroma/following_relationship.ex +++ b/lib/pleroma/following_relationship.ex @@ -62,23 +62,47 @@ defmodule Pleroma.FollowingRelationship do follow(follower, following, state) following_relationship -> - following_relationship - |> cast(%{state: state}, [:state]) - |> validate_required([:state]) - |> Repo.update() + with {:ok, _following_relationship} <- + following_relationship + |> cast(%{state: state}, [:state]) + |> validate_required([:state]) + |> Repo.update() do + after_update(state, follower, following) + end end end def follow(%User{} = follower, %User{} = following, state \\ :follow_accept) do - %__MODULE__{} - |> changeset(%{follower: follower, following: following, state: state}) - |> Repo.insert(on_conflict: :nothing) + with {:ok, _following_relationship} <- + %__MODULE__{} + |> changeset(%{follower: follower, following: following, state: state}) + |> Repo.insert(on_conflict: :nothing) do + after_update(state, follower, following) + end end def unfollow(%User{} = follower, %User{} = following) do case get(follower, following) do - %__MODULE__{} = following_relationship -> Repo.delete(following_relationship) - _ -> {:ok, nil} + %__MODULE__{} = following_relationship -> + with {:ok, _following_relationship} <- Repo.delete(following_relationship) do + after_update(:unfollow, follower, following) + end + + _ -> + {:ok, nil} + end + end + + defp after_update(state, %User{} = follower, %User{} = following) do + with {:ok, following} <- User.update_follower_count(following), + {:ok, follower} <- User.update_following_count(follower) do + Pleroma.Web.Streamer.stream("relationships:update", %{ + state: state, + following: following, + follower: follower + }) + + {:ok, follower, following} end end diff --git a/lib/pleroma/user.ex b/lib/pleroma/user.ex index bcd5256c8..676483540 100644 --- a/lib/pleroma/user.ex +++ b/lib/pleroma/user.ex @@ -882,7 +882,7 @@ defmodule Pleroma.User do if not ap_enabled?(followed) do follow(follower, followed) else - {:ok, follower} + {:ok, follower, followed} end end @@ -908,11 +908,6 @@ defmodule Pleroma.User do true -> FollowingRelationship.follow(follower, followed, state) - - {:ok, _} = update_follower_count(followed) - - follower - |> update_following_count() end end @@ -936,11 +931,6 @@ defmodule Pleroma.User do case get_follow_state(follower, followed) do state when state in [:follow_pending, :follow_accept] -> FollowingRelationship.unfollow(follower, followed) - {:ok, followed} = update_follower_count(followed) - - {:ok, follower} = update_following_count(follower) - - {:ok, follower, followed} nil -> {:error, "Not subscribed!"} diff --git a/lib/pleroma/user/import.ex b/lib/pleroma/user/import.ex index e458021c8..86b49d8ae 100644 --- a/lib/pleroma/user/import.ex +++ b/lib/pleroma/user/import.ex @@ -45,7 +45,7 @@ defmodule Pleroma.User.Import do identifiers, fn identifier -> with {:ok, %User{} = followed} <- User.get_or_fetch(identifier), - {:ok, follower} <- User.maybe_direct_follow(follower, followed), + {:ok, follower, followed} <- User.maybe_direct_follow(follower, followed), {:ok, _, _, _} <- CommonAPI.follow(follower, followed) do followed else diff --git a/lib/pleroma/web/activity_pub/side_effects.ex b/lib/pleroma/web/activity_pub/side_effects.ex index 4d8fb721e..8556fca1d 100644 --- a/lib/pleroma/web/activity_pub/side_effects.ex +++ b/lib/pleroma/web/activity_pub/side_effects.ex @@ -47,10 +47,9 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do %User{} = followed <- User.get_cached_by_ap_id(actor), %User{} = follower <- User.get_cached_by_ap_id(follower_id), {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"), - {:ok, _relationship} <- FollowingRelationship.update(follower, followed, :follow_accept) do + {:ok, _follower, followed} <- + FollowingRelationship.update(follower, followed, :follow_accept) do Notification.update_notification_type(followed, follow_activity) - User.update_follower_count(followed) - User.update_following_count(follower) end {:ok, object, meta} @@ -99,7 +98,7 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do ) do with %User{} = follower <- User.get_cached_by_ap_id(following_user), %User{} = followed <- User.get_cached_by_ap_id(followed_user), - {_, {:ok, _}, _, _} <- + {_, {:ok, _, _}, _, _} <- {:following, User.follow(follower, followed, :follow_pending), follower, followed} do if followed.local && !followed.is_locked do {:ok, accept_data, _} = Builder.accept(followed, object) diff --git a/lib/pleroma/web/streamer.ex b/lib/pleroma/web/streamer.ex index 71fe27c89..0b6cc89e9 100644 --- a/lib/pleroma/web/streamer.ex +++ b/lib/pleroma/web/streamer.ex @@ -36,9 +36,8 @@ defmodule Pleroma.Web.Streamer do ) :: {:ok, topic :: String.t()} | {:error, :bad_topic} | {:error, :unauthorized} def get_topic_and_add_socket(stream, user, oauth_token, params \\ %{}) do - case get_topic(stream, user, oauth_token, params) do - {:ok, topic} -> add_socket(topic, user) - error -> error + with {:ok, topic} <- get_topic(stream, user, oauth_token, params) do + add_socket(topic, user) end end @@ -70,10 +69,10 @@ defmodule Pleroma.Web.Streamer do def get_topic( stream, %User{id: user_id} = user, - %Token{user_id: token_user_id} = oauth_token, + %Token{user_id: user_id} = oauth_token, _params ) - when stream in @user_streams and user_id == token_user_id do + when stream in @user_streams do # Note: "read" works for all user streams (not mentioning it since it's an ancestor scope) required_scopes = if stream == "user:notification" do @@ -97,10 +96,9 @@ defmodule Pleroma.Web.Streamer do def get_topic( "list", %User{id: user_id} = user, - %Token{user_id: token_user_id} = oauth_token, + %Token{user_id: user_id} = oauth_token, %{"list" => id} - ) - when user_id == token_user_id do + ) do cond do OAuthScopesPlug.filter_descendants(["read", "read:lists"], oauth_token.scopes) == [] -> {:error, :unauthorized} @@ -137,16 +135,10 @@ defmodule Pleroma.Web.Streamer do def stream(topics, items) do if should_env_send?() do - List.wrap(topics) - |> Enum.each(fn topic -> - List.wrap(items) - |> Enum.each(fn item -> - spawn(fn -> do_stream(topic, item) end) - end) - end) + for topic <- List.wrap(topics), item <- List.wrap(items) do + spawn(fn -> do_stream(topic, item) end) + end end - - :ok end def filtered_by_user?(user, item, streamed_type \\ :activity) @@ -160,8 +152,7 @@ defmodule Pleroma.Web.Streamer do domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.domain_blocks) with parent <- Object.normalize(item) || item, - true <- - Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)), + true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)), true <- item.data["type"] != "Announce" || item.actor not in reblog_muted_ap_ids, true <- !(streamed_type == :activity && item.data["type"] == "Announce" && @@ -195,6 +186,22 @@ defmodule Pleroma.Web.Streamer do end) end + defp do_stream("relationships:update", item) do + text = StreamerView.render("relationships_update.json", item) + + [item.follower, item.following] + |> Enum.map(fn %{id: id} -> "user:#{id}" end) + |> Enum.each(fn user_topic -> + Logger.debug("Trying to push relationships:update to #{user_topic}\n\n") + + Registry.dispatch(@registry, user_topic, fn list -> + Enum.each(list, fn {pid, _auth} -> + send(pid, {:text, text}) + end) + end) + end) + end + defp do_stream("participation", participation) do user_topic = "direct:#{participation.user_id}" Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n") diff --git a/lib/pleroma/web/views/streamer_view.ex b/lib/pleroma/web/views/streamer_view.ex index 476a33245..92239a411 100644 --- a/lib/pleroma/web/views/streamer_view.ex +++ b/lib/pleroma/web/views/streamer_view.ex @@ -74,6 +74,28 @@ defmodule Pleroma.Web.StreamerView do |> Jason.encode!() end + def render("relationships_update.json", item) do + %{ + event: "pleroma:relationships_update", + payload: + %{ + state: item.state, + follower: %{ + id: item.follower.id, + follower_count: item.follower.follower_count, + following_count: item.follower.following_count + }, + following: %{ + id: item.following.id, + follower_count: item.following.follower_count, + following_count: item.following.following_count + } + } + |> Jason.encode!() + } + |> Jason.encode!() + end + def render("conversation.json", %Participation{} = participation) do %{ event: "conversation", |