aboutsummaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorlain <lain@soykaf.club>2019-05-03 13:39:14 +0200
committerlain <lain@soykaf.club>2019-05-03 13:39:14 +0200
commit81d1aa424d65b364ec8f2aee45247e7d95e3f255 (patch)
treef1851149c8eecf5871f4b72fb56ae89ae3b83243 /lib
parent8af55728e47f9f62d237704cd5a33fba5f946fa2 (diff)
downloadpleroma-81d1aa424d65b364ec8f2aee45247e7d95e3f255.tar.gz
Streamer: Stream out Conversations/Participations.
Diffstat (limited to 'lib')
-rw-r--r--lib/pleroma/conversation.ex11
-rw-r--r--lib/pleroma/web/activity_pub/activity_pub.ex22
-rw-r--r--lib/pleroma/web/streamer.ex29
3 files changed, 57 insertions, 5 deletions
diff --git a/lib/pleroma/conversation.ex b/lib/pleroma/conversation.ex
index e6a4ccc85..6e26c5fd4 100644
--- a/lib/pleroma/conversation.ex
+++ b/lib/pleroma/conversation.ex
@@ -63,10 +63,13 @@ defmodule Pleroma.Conversation do
participation
end)
- %{
- conversation
- | participations: participations
- }
+ {:ok,
+ %{
+ conversation
+ | participations: participations
+ }}
+ else
+ e -> {:error, e}
end
end
end
diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex
index 28754e864..6c737d0a4 100644
--- a/lib/pleroma/web/activity_pub/activity_pub.ex
+++ b/lib/pleroma/web/activity_pub/activity_pub.ex
@@ -142,8 +142,14 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
end)
Notification.create_notifications(activity)
- Conversation.create_or_bump_for(activity)
+
+ participations =
+ activity
+ |> Conversation.create_or_bump_for()
+ |> get_participations()
+
stream_out(activity)
+ stream_out_participations(participations)
{:ok, activity}
else
%Activity{} = activity ->
@@ -166,6 +172,19 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
end
end
+ defp get_participations({:ok, %{participations: participations}}), do: participations
+ defp get_participations(_), do: []
+
+ def stream_out_participations(participations) do
+ participations =
+ participations
+ |> Repo.preload(:user)
+
+ Enum.each(participations, fn participation ->
+ Pleroma.Web.Streamer.stream("participation", participation)
+ end)
+ end
+
def stream_out(activity) do
public = "https://www.w3.org/ns/activitystreams#Public"
@@ -197,6 +216,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
end
end
else
+ # TODO: Write test, replace with visibility test
if !Enum.member?(activity.data["cc"] || [], public) &&
!Enum.member?(
activity.data["to"],
diff --git a/lib/pleroma/web/streamer.ex b/lib/pleroma/web/streamer.ex
index 72eaf2084..b8f6663a1 100644
--- a/lib/pleroma/web/streamer.ex
+++ b/lib/pleroma/web/streamer.ex
@@ -5,6 +5,7 @@
defmodule Pleroma.Web.Streamer do
use GenServer
require Logger
+ alias Pleroma.Conversation.Participation
alias Pleroma.Activity
alias Pleroma.Notification
alias Pleroma.Object
@@ -71,6 +72,15 @@ defmodule Pleroma.Web.Streamer do
{:noreply, topics}
end
+ def handle_cast(%{action: :stream, topic: "participation", item: participation}, topics) do
+ user_topic = "direct:#{participation.user_id}"
+ Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
+
+ push_to_socket(topics, user_topic, participation)
+
+ {:noreply, topics}
+ end
+
def handle_cast(%{action: :stream, topic: "list", item: item}, topics) do
# filter the recipient list if the activity is not public, see #270.
recipient_lists =
@@ -192,6 +202,19 @@ defmodule Pleroma.Web.Streamer do
|> Jason.encode!()
end
+ def represent_conversation(%Participation{} = participation) do
+ %{
+ event: "conversation",
+ payload:
+ Pleroma.Web.MastodonAPI.ConversationView.render("participation.json", %{
+ participation: participation,
+ user: participation.user
+ })
+ |> Jason.encode!()
+ }
+ |> Jason.encode!()
+ end
+
def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do
Enum.each(topics[topic] || [], fn socket ->
# Get the current user so we have up-to-date blocks etc.
@@ -214,6 +237,12 @@ defmodule Pleroma.Web.Streamer do
end)
end
+ def push_to_socket(topics, topic, %Participation{} = participation) do
+ Enum.each(topics[topic] || [], fn socket ->
+ send(socket.transport_pid, {:text, represent_conversation(participation)})
+ end)
+ end
+
def push_to_socket(topics, topic, %Activity{
data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
}) do