aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/pleroma/conversation.ex11
-rw-r--r--lib/pleroma/web/activity_pub/activity_pub.ex22
-rw-r--r--lib/pleroma/web/streamer.ex29
-rw-r--r--mix.exs2
-rw-r--r--mix.lock2
-rw-r--r--test/conversation_test.exs17
-rw-r--r--test/web/activity_pub/activity_pub_test.exs22
7 files changed, 98 insertions, 7 deletions
diff --git a/lib/pleroma/conversation.ex b/lib/pleroma/conversation.ex
index e6a4ccc85..6e26c5fd4 100644
--- a/lib/pleroma/conversation.ex
+++ b/lib/pleroma/conversation.ex
@@ -63,10 +63,13 @@ defmodule Pleroma.Conversation do
participation
end)
- %{
- conversation
- | participations: participations
- }
+ {:ok,
+ %{
+ conversation
+ | participations: participations
+ }}
+ else
+ e -> {:error, e}
end
end
end
diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex
index 28754e864..6c737d0a4 100644
--- a/lib/pleroma/web/activity_pub/activity_pub.ex
+++ b/lib/pleroma/web/activity_pub/activity_pub.ex
@@ -142,8 +142,14 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
end)
Notification.create_notifications(activity)
- Conversation.create_or_bump_for(activity)
+
+ participations =
+ activity
+ |> Conversation.create_or_bump_for()
+ |> get_participations()
+
stream_out(activity)
+ stream_out_participations(participations)
{:ok, activity}
else
%Activity{} = activity ->
@@ -166,6 +172,19 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
end
end
+ defp get_participations({:ok, %{participations: participations}}), do: participations
+ defp get_participations(_), do: []
+
+ def stream_out_participations(participations) do
+ participations =
+ participations
+ |> Repo.preload(:user)
+
+ Enum.each(participations, fn participation ->
+ Pleroma.Web.Streamer.stream("participation", participation)
+ end)
+ end
+
def stream_out(activity) do
public = "https://www.w3.org/ns/activitystreams#Public"
@@ -197,6 +216,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
end
end
else
+ # TODO: Write test, replace with visibility test
if !Enum.member?(activity.data["cc"] || [], public) &&
!Enum.member?(
activity.data["to"],
diff --git a/lib/pleroma/web/streamer.ex b/lib/pleroma/web/streamer.ex
index 72eaf2084..b8f6663a1 100644
--- a/lib/pleroma/web/streamer.ex
+++ b/lib/pleroma/web/streamer.ex
@@ -5,6 +5,7 @@
defmodule Pleroma.Web.Streamer do
use GenServer
require Logger
+ alias Pleroma.Conversation.Participation
alias Pleroma.Activity
alias Pleroma.Notification
alias Pleroma.Object
@@ -71,6 +72,15 @@ defmodule Pleroma.Web.Streamer do
{:noreply, topics}
end
+ def handle_cast(%{action: :stream, topic: "participation", item: participation}, topics) do
+ user_topic = "direct:#{participation.user_id}"
+ Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
+
+ push_to_socket(topics, user_topic, participation)
+
+ {:noreply, topics}
+ end
+
def handle_cast(%{action: :stream, topic: "list", item: item}, topics) do
# filter the recipient list if the activity is not public, see #270.
recipient_lists =
@@ -192,6 +202,19 @@ defmodule Pleroma.Web.Streamer do
|> Jason.encode!()
end
+ def represent_conversation(%Participation{} = participation) do
+ %{
+ event: "conversation",
+ payload:
+ Pleroma.Web.MastodonAPI.ConversationView.render("participation.json", %{
+ participation: participation,
+ user: participation.user
+ })
+ |> Jason.encode!()
+ }
+ |> Jason.encode!()
+ end
+
def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do
Enum.each(topics[topic] || [], fn socket ->
# Get the current user so we have up-to-date blocks etc.
@@ -214,6 +237,12 @@ defmodule Pleroma.Web.Streamer do
end)
end
+ def push_to_socket(topics, topic, %Participation{} = participation) do
+ Enum.each(topics[topic] || [], fn socket ->
+ send(socket.transport_pid, {:text, represent_conversation(participation)})
+ end)
+ end
+
def push_to_socket(topics, topic, %Activity{
data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
}) do
diff --git a/mix.exs b/mix.exs
index 9ded9931c..ad8f1123a 100644
--- a/mix.exs
+++ b/mix.exs
@@ -87,7 +87,7 @@ defmodule Pleroma.Mixfile do
{:bbcode, "~> 0.1"},
{:ex_machina, "~> 2.3", only: :test},
{:credo, "~> 0.9.3", only: [:dev, :test]},
- {:mock, "~> 0.3.1", only: :test},
+ {:mock, "~> 0.3.3", only: :test},
{:crypt,
git: "https://github.com/msantos/crypt", ref: "1f2b58927ab57e72910191a7ebaeff984382a1d3"},
{:cors_plug, "~> 1.5"},
diff --git a/mix.lock b/mix.lock
index 08221eadc..bb298a68b 100644
--- a/mix.lock
+++ b/mix.lock
@@ -43,7 +43,7 @@
"mime": {:hex, :mime, "1.3.1", "30ce04ab3175b6ad0bdce0035cba77bba68b813d523d1aac73d9781b4d193cf8", [:mix], [], "hexpm"},
"mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm"},
"mochiweb": {:hex, :mochiweb, "2.15.0", "e1daac474df07651e5d17cc1e642c4069c7850dc4508d3db7263a0651330aacc", [:rebar3], [], "hexpm"},
- "mock": {:hex, :mock, "0.3.1", "994f00150f79a0ea50dc9d86134cd9ebd0d177ad60bd04d1e46336cdfdb98ff9", [:mix], [{:meck, "~> 0.8.8", [hex: :meck, repo: "hexpm", optional: false]}], "hexpm"},
+ "mock": {:hex, :mock, "0.3.3", "42a433794b1291a9cf1525c6d26b38e039e0d3a360732b5e467bfc77ef26c914", [:mix], [{:meck, "~> 0.8.13", [hex: :meck, repo: "hexpm", optional: false]}], "hexpm"},
"mogrify": {:hex, :mogrify, "0.6.1", "de1b527514f2d95a7bbe9642eb556061afb337e220cf97adbf3a4e6438ed70af", [:mix], [], "hexpm"},
"nimble_parsec": {:hex, :nimble_parsec, "0.5.0", "90e2eca3d0266e5c53f8fbe0079694740b9c91b6747f2b7e3c5d21966bba8300", [:mix], [], "hexpm"},
"parse_trans": {:hex, :parse_trans, "3.3.0", "09765507a3c7590a784615cfd421d101aec25098d50b89d7aa1d66646bc571c1", [:rebar3], [], "hexpm"},
diff --git a/test/conversation_test.exs b/test/conversation_test.exs
index 763183d6b..f3300e7d1 100644
--- a/test/conversation_test.exs
+++ b/test/conversation_test.exs
@@ -117,4 +117,21 @@ defmodule Pleroma.ConversationTest do
tridi.id == user_id
end)
end
+
+ test "create_or_bump_for returns the conversation with participations" do
+ har = insert(:user)
+ jafnhar = insert(:user, local: false)
+
+ {:ok, activity} =
+ CommonAPI.post(har, %{"status" => "Hey @#{jafnhar.nickname}", "visibility" => "direct"})
+
+ {:ok, conversation} = Conversation.create_or_bump_for(activity)
+
+ assert length(conversation.participations) == 2
+
+ {:ok, activity} =
+ CommonAPI.post(har, %{"status" => "Hey @#{jafnhar.nickname}", "visibility" => "public"})
+
+ assert {:error, _} = Conversation.create_or_bump_for(activity)
+ end
end
diff --git a/test/web/activity_pub/activity_pub_test.exs b/test/web/activity_pub/activity_pub_test.exs
index 047270a2a..1e056b7ee 100644
--- a/test/web/activity_pub/activity_pub_test.exs
+++ b/test/web/activity_pub/activity_pub_test.exs
@@ -22,6 +22,28 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubTest do
:ok
end
+ describe "streaming out participations" do
+ test "it streams them out" do
+ user = insert(:user)
+ {:ok, activity} = CommonAPI.post(user, %{"status" => ".", "visibility" => "direct"})
+
+ {:ok, conversation} = Pleroma.Conversation.create_or_bump_for(activity)
+
+ participations =
+ conversation.participations
+ |> Repo.preload(:user)
+
+ with_mock Pleroma.Web.Streamer,
+ stream: fn _, _ -> nil end do
+ ActivityPub.stream_out_participations(conversation.participations)
+
+ Enum.each(participations, fn participation ->
+ assert called(Pleroma.Web.Streamer.stream("participation", participation))
+ end)
+ end
+ end
+ end
+
describe "fetching restricted by visibility" do
test "it restricts by the appropriate visibility" do
user = insert(:user)