aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/pleroma/web/activity_pub/side_effects.ex8
-rw-r--r--lib/pleroma/web/streamer/streamer.ex25
-rw-r--r--lib/pleroma/web/views/streamer_view.ex46
-rw-r--r--test/web/activity_pub/side_effects_test.exs5
-rw-r--r--test/web/streamer/streamer_test.exs28
5 files changed, 60 insertions, 52 deletions
diff --git a/lib/pleroma/web/activity_pub/side_effects.ex b/lib/pleroma/web/activity_pub/side_effects.ex
index 884d399d0..0c5709356 100644
--- a/lib/pleroma/web/activity_pub/side_effects.ex
+++ b/lib/pleroma/web/activity_pub/side_effects.ex
@@ -140,11 +140,15 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
|> Enum.each(fn [user, other_user] ->
if user.local do
{:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
- ChatMessageReference.create(chat, object, user.ap_id == actor.ap_id)
+ {:ok, cm_ref} = ChatMessageReference.create(chat, object, user.ap_id == actor.ap_id)
+
+ Streamer.stream(
+ ["user", "user:pleroma_chat"],
+ {user, %{cm_ref | chat: chat, object: object}}
+ )
end
end)
- Streamer.stream(["user", "user:pleroma_chat"], object)
{:ok, object, meta}
end
end
diff --git a/lib/pleroma/web/streamer/streamer.ex b/lib/pleroma/web/streamer/streamer.ex
index 2201cbfef..5e37e2cf2 100644
--- a/lib/pleroma/web/streamer/streamer.ex
+++ b/lib/pleroma/web/streamer/streamer.ex
@@ -6,11 +6,11 @@ defmodule Pleroma.Web.Streamer do
require Logger
alias Pleroma.Activity
+ alias Pleroma.ChatMessageReference
alias Pleroma.Config
alias Pleroma.Conversation.Participation
alias Pleroma.Notification
alias Pleroma.Object
- alias Pleroma.Repo
alias Pleroma.User
alias Pleroma.Web.ActivityPub.ActivityPub
alias Pleroma.Web.ActivityPub.Visibility
@@ -201,22 +201,15 @@ defmodule Pleroma.Web.Streamer do
end)
end
- defp do_stream(topic, %{data: %{"type" => "ChatMessage"}} = object)
+ defp do_stream(topic, {user, %ChatMessageReference{} = cm_ref})
when topic in ["user", "user:pleroma_chat"] do
- recipients = [object.data["actor"] | object.data["to"]]
-
- topics =
- %{ap_id: recipients, local: true}
- |> Pleroma.User.Query.build()
- |> Repo.all()
- |> Enum.map(fn %{id: id} = user -> {user, "#{topic}:#{id}"} end)
-
- Enum.each(topics, fn {user, topic} ->
- Registry.dispatch(@registry, topic, fn list ->
- Enum.each(list, fn {pid, _auth} ->
- text = StreamerView.render("chat_update.json", object, user, recipients)
- send(pid, {:text, text})
- end)
+ topic = "#{topic}:#{user.id}"
+
+ text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref})
+
+ Registry.dispatch(@registry, topic, fn list ->
+ Enum.each(list, fn {pid, _auth} ->
+ send(pid, {:text, text})
end)
end)
end
diff --git a/lib/pleroma/web/views/streamer_view.ex b/lib/pleroma/web/views/streamer_view.ex
index 616e0c4f2..a6efd0109 100644
--- a/lib/pleroma/web/views/streamer_view.ex
+++ b/lib/pleroma/web/views/streamer_view.ex
@@ -6,36 +6,11 @@ defmodule Pleroma.Web.StreamerView do
use Pleroma.Web, :view
alias Pleroma.Activity
- alias Pleroma.Chat
- alias Pleroma.ChatMessageReference
alias Pleroma.Conversation.Participation
alias Pleroma.Notification
alias Pleroma.User
alias Pleroma.Web.MastodonAPI.NotificationView
- def render("chat_update.json", object, user, recipients) do
- chat = Chat.get(user.id, hd(recipients -- [user.ap_id]))
-
- # Explicitly giving the cmr for the object here, so we don't accidentally
- # send a later 'last_message' that was inserted between inserting this and
- # streaming it out
- cm_ref = ChatMessageReference.for_chat_and_object(chat, object)
-
- representation =
- Pleroma.Web.PleromaAPI.ChatView.render(
- "show.json",
- %{last_message: cm_ref, chat: chat}
- )
-
- %{
- event: "pleroma:chat_update",
- payload:
- representation
- |> Jason.encode!()
- }
- |> Jason.encode!()
- end
-
def render("update.json", %Activity{} = activity, %User{} = user) do
%{
event: "update",
@@ -76,6 +51,27 @@ defmodule Pleroma.Web.StreamerView do
|> Jason.encode!()
end
+ def render("chat_update.json", %{chat_message_reference: cm_ref}) do
+ # Explicitly giving the cmr for the object here, so we don't accidentally
+ # send a later 'last_message' that was inserted between inserting this and
+ # streaming it out
+ Logger.debug("Trying to stream out #{inspect(cm_ref)}")
+
+ representation =
+ Pleroma.Web.PleromaAPI.ChatView.render(
+ "show.json",
+ %{last_message: cm_ref, chat: cm_ref.chat}
+ )
+
+ %{
+ event: "pleroma:chat_update",
+ payload:
+ representation
+ |> Jason.encode!()
+ }
+ |> Jason.encode!()
+ end
+
def render("conversation.json", %Participation{} = participation) do
%{
event: "conversation",
diff --git a/test/web/activity_pub/side_effects_test.exs b/test/web/activity_pub/side_effects_test.exs
index f2fa062b4..92c266d84 100644
--- a/test/web/activity_pub/side_effects_test.exs
+++ b/test/web/activity_pub/side_effects_test.exs
@@ -325,9 +325,8 @@ defmodule Pleroma.Web.ActivityPub.SideEffectsTest do
{:ok, _create_activity, _meta} =
SideEffects.handle(create_activity, local: false, object_data: chat_message_data)
- object = Object.normalize(create_activity, false)
-
- assert called(Pleroma.Web.Streamer.stream(["user", "user:pleroma_chat"], object))
+ assert called(Pleroma.Web.Streamer.stream(["user", "user:pleroma_chat"], {author, :_}))
+ assert called(Pleroma.Web.Streamer.stream(["user", "user:pleroma_chat"], {recipient, :_}))
end
end
diff --git a/test/web/streamer/streamer_test.exs b/test/web/streamer/streamer_test.exs
index bcb05a02d..893ae5449 100644
--- a/test/web/streamer/streamer_test.exs
+++ b/test/web/streamer/streamer_test.exs
@@ -7,6 +7,8 @@ defmodule Pleroma.Web.StreamerTest do
import Pleroma.Factory
+ alias Pleroma.Chat
+ alias Pleroma.ChatMessageReference
alias Pleroma.Conversation.Participation
alias Pleroma.List
alias Pleroma.Object
@@ -150,22 +152,36 @@ defmodule Pleroma.Web.StreamerTest do
test "it sends chat messages to the 'user:pleroma_chat' stream", %{user: user} do
other_user = insert(:user)
- {:ok, create_activity} = CommonAPI.post_chat_message(other_user, user, "hey")
+ {:ok, create_activity} = CommonAPI.post_chat_message(other_user, user, "hey cirno")
object = Object.normalize(create_activity, false)
+ chat = Chat.get(user.id, other_user.ap_id)
+ cm_ref = ChatMessageReference.for_chat_and_object(chat, object)
+ cm_ref = %{cm_ref | chat: chat, object: object}
+
Streamer.get_topic_and_add_socket("user:pleroma_chat", user)
- Streamer.stream("user:pleroma_chat", object)
- text = StreamerView.render("chat_update.json", object, user, [user.ap_id, other_user.ap_id])
+ Streamer.stream("user:pleroma_chat", {user, cm_ref})
+
+ text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref})
+
+ assert text =~ "hey cirno"
assert_receive {:text, ^text}
end
test "it sends chat messages to the 'user' stream", %{user: user} do
other_user = insert(:user)
- {:ok, create_activity} = CommonAPI.post_chat_message(other_user, user, "hey")
+ {:ok, create_activity} = CommonAPI.post_chat_message(other_user, user, "hey cirno")
object = Object.normalize(create_activity, false)
+ chat = Chat.get(user.id, other_user.ap_id)
+ cm_ref = ChatMessageReference.for_chat_and_object(chat, object)
+ cm_ref = %{cm_ref | chat: chat, object: object}
+
Streamer.get_topic_and_add_socket("user", user)
- Streamer.stream("user", object)
- text = StreamerView.render("chat_update.json", object, user, [user.ap_id, other_user.ap_id])
+ Streamer.stream("user", {user, cm_ref})
+
+ text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref})
+
+ assert text =~ "hey cirno"
assert_receive {:text, ^text}
end