aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorlain <lain@soykaf.club>2020-05-29 15:24:41 +0200
committerlain <lain@soykaf.club>2020-05-29 15:24:41 +0200
commitc86a88edec75223f650faa2bb442c09aa95ad694 (patch)
tree23105518be633fc4c1e2d1ea3071da1f532e3111
parentaf6d01ec93a07cd896bc4f0a2c2cf437c6fd51fc (diff)
downloadpleroma-c86a88edec75223f650faa2bb442c09aa95ad694.tar.gz
Streamer: Add a chat message stream.
-rw-r--r--lib/pleroma/web/streamer/streamer.ex23
-rw-r--r--lib/pleroma/web/views/streamer_view.ex19
-rw-r--r--test/web/streamer/streamer_test.exs24
3 files changed, 65 insertions, 1 deletions
diff --git a/lib/pleroma/web/streamer/streamer.ex b/lib/pleroma/web/streamer/streamer.ex
index 49a400df7..331490a78 100644
--- a/lib/pleroma/web/streamer/streamer.ex
+++ b/lib/pleroma/web/streamer/streamer.ex
@@ -10,6 +10,7 @@ defmodule Pleroma.Web.Streamer do
alias Pleroma.Conversation.Participation
alias Pleroma.Notification
alias Pleroma.Object
+ alias Pleroma.Repo
alias Pleroma.User
alias Pleroma.Web.ActivityPub.ActivityPub
alias Pleroma.Web.ActivityPub.Visibility
@@ -22,7 +23,7 @@ defmodule Pleroma.Web.Streamer do
def registry, do: @registry
@public_streams ["public", "public:local", "public:media", "public:local:media"]
- @user_streams ["user", "user:notification", "direct"]
+ @user_streams ["user", "user:notification", "direct", "user:pleroma_chat"]
@doc "Expands and authorizes a stream, and registers the process for streaming."
@spec get_topic_and_add_socket(stream :: String.t(), User.t() | nil, Map.t() | nil) ::
@@ -200,6 +201,26 @@ defmodule Pleroma.Web.Streamer do
end)
end
+ defp do_stream(topic, %{data: %{"type" => "ChatMessage"}} = object)
+ when topic in ["user", "user:pleroma_chat"] do
+ recipients = [object.data["actor"] | object.data["to"]]
+
+ topics =
+ %{ap_id: recipients, local: true}
+ |> Pleroma.User.Query.build()
+ |> Repo.all()
+ |> Enum.map(fn %{id: id} = user -> {user, "#{topic}:#{id}"} end)
+
+ Enum.each(topics, fn {user, topic} ->
+ Registry.dispatch(@registry, topic, fn list ->
+ Enum.each(list, fn {pid, _auth} ->
+ text = StreamerView.render("chat_update.json", object, user, recipients)
+ send(pid, {:text, text})
+ end)
+ end)
+ end)
+ end
+
defp do_stream("user", item) do
Logger.debug("Trying to push to users")
diff --git a/lib/pleroma/web/views/streamer_view.ex b/lib/pleroma/web/views/streamer_view.ex
index 237b29ded..949e2ed37 100644
--- a/lib/pleroma/web/views/streamer_view.ex
+++ b/lib/pleroma/web/views/streamer_view.ex
@@ -6,11 +6,30 @@ defmodule Pleroma.Web.StreamerView do
use Pleroma.Web, :view
alias Pleroma.Activity
+ alias Pleroma.Chat
alias Pleroma.Conversation.Participation
alias Pleroma.Notification
alias Pleroma.User
alias Pleroma.Web.MastodonAPI.NotificationView
+ def render("chat_update.json", object, user, recipients) do
+ chat = Chat.get(user.id, hd(recipients -- [user.ap_id]))
+
+ representation =
+ Pleroma.Web.PleromaAPI.ChatMessageView.render(
+ "show.json",
+ %{object: object, chat: chat}
+ )
+
+ %{
+ event: "pleroma:chat_update",
+ payload:
+ representation
+ |> Jason.encode!()
+ }
+ |> Jason.encode!()
+ end
+
def render("update.json", %Activity{} = activity, %User{} = user) do
%{
event: "update",
diff --git a/test/web/streamer/streamer_test.exs b/test/web/streamer/streamer_test.exs
index 115ba4703..ffbff35ca 100644
--- a/test/web/streamer/streamer_test.exs
+++ b/test/web/streamer/streamer_test.exs
@@ -9,9 +9,11 @@ defmodule Pleroma.Web.StreamerTest do
alias Pleroma.Conversation.Participation
alias Pleroma.List
+ alias Pleroma.Object
alias Pleroma.User
alias Pleroma.Web.CommonAPI
alias Pleroma.Web.Streamer
+ alias Pleroma.Web.StreamerView
@moduletag needs_streamer: true, capture_log: true
@@ -126,6 +128,28 @@ defmodule Pleroma.Web.StreamerTest do
refute Streamer.filtered_by_user?(user, notify)
end
+ test "it sends chat messages to the 'user:pleroma_chat' stream", %{user: user} do
+ other_user = insert(:user)
+
+ {:ok, create_activity} = CommonAPI.post_chat_message(other_user, user, "hey")
+ object = Object.normalize(create_activity, false)
+ Streamer.get_topic_and_add_socket("user:pleroma_chat", user)
+ Streamer.stream("user:pleroma_chat", object)
+ text = StreamerView.render("chat_update.json", object, user, [user.ap_id, other_user.ap_id])
+ assert_receive {:text, ^text}
+ end
+
+ test "it sends chat messages to the 'user' stream", %{user: user} do
+ other_user = insert(:user)
+
+ {:ok, create_activity} = CommonAPI.post_chat_message(other_user, user, "hey")
+ object = Object.normalize(create_activity, false)
+ Streamer.get_topic_and_add_socket("user", user)
+ Streamer.stream("user", object)
+ text = StreamerView.render("chat_update.json", object, user, [user.ap_id, other_user.ap_id])
+ assert_receive {:text, ^text}
+ end
+
test "it sends chat message notifications to the 'user:notification' stream", %{user: user} do
other_user = insert(:user)