aboutsummaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorhref <href+git-pleroma@random.sh>2020-05-07 09:13:32 +0000
committerlain <lain@soykaf.club>2020-05-07 09:13:32 +0000
commit9491ba3e49450e80cd1c21358c01e4e06e3d881d (patch)
treea5af903d41983aca0f4522be35b907854a581757 /lib
parent68a126317d7cdd670c8e244319da08ff85639d33 (diff)
downloadpleroma-9491ba3e49450e80cd1c21358c01e4e06e3d881d.tar.gz
Streamer rework
Diffstat (limited to 'lib')
-rw-r--r--lib/pleroma/application.ex9
-rw-r--r--lib/pleroma/web/activity_pub/activity_pub.ex32
-rw-r--r--lib/pleroma/web/mastodon_api/websocket_handler.ex47
-rw-r--r--lib/pleroma/web/streamer/ping.ex37
-rw-r--r--lib/pleroma/web/streamer/state.ex82
-rw-r--r--lib/pleroma/web/streamer/streamer.ex244
-rw-r--r--lib/pleroma/web/streamer/streamer_socket.ex35
-rw-r--r--lib/pleroma/web/streamer/supervisor.ex37
-rw-r--r--lib/pleroma/web/streamer/worker.ex208
9 files changed, 277 insertions, 454 deletions
diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex
index 308d8cffa..a00bc0624 100644
--- a/lib/pleroma/application.ex
+++ b/lib/pleroma/application.ex
@@ -173,7 +173,14 @@ defmodule Pleroma.Application do
defp streamer_child(env) when env in [:test, :benchmark], do: []
defp streamer_child(_) do
- [Pleroma.Web.Streamer.supervisor()]
+ [
+ {Registry,
+ [
+ name: Pleroma.Web.Streamer.registry(),
+ keys: :duplicate,
+ partitions: System.schedulers_online()
+ ]}
+ ]
end
defp chat_child(_env, true) do
diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex
index 099df5879..8baaf97ac 100644
--- a/lib/pleroma/web/activity_pub/activity_pub.ex
+++ b/lib/pleroma/web/activity_pub/activity_pub.ex
@@ -170,12 +170,6 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
- Notification.create_notifications(activity)
-
- conversation = create_or_bump_conversation(activity, map["actor"])
- participations = get_participations(conversation)
- stream_out(activity)
- stream_out_participations(participations)
{:ok, activity}
else
%Activity{} = activity ->
@@ -198,6 +192,15 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
end
end
+ def notify_and_stream(activity) do
+ Notification.create_notifications(activity)
+
+ conversation = create_or_bump_conversation(activity, activity.actor)
+ participations = get_participations(conversation)
+ stream_out(activity)
+ stream_out_participations(participations)
+ end
+
defp create_or_bump_conversation(activity, actor) do
with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
%User{} = user <- User.get_cached_by_ap_id(actor),
@@ -274,6 +277,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
_ <- increase_poll_votes_if_vote(create_data),
{:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
{:ok, _actor} <- increase_note_count_if_public(actor, activity),
+ _ <- notify_and_stream(activity),
:ok <- maybe_federate(activity) do
{:ok, activity}
else
@@ -301,6 +305,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
additional
),
{:ok, activity} <- insert(listen_data, local),
+ _ <- notify_and_stream(activity),
:ok <- maybe_federate(activity) do
{:ok, activity}
end
@@ -325,6 +330,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
%{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object}
|> Utils.maybe_put("id", activity_id),
{:ok, activity} <- insert(data, local),
+ _ <- notify_and_stream(activity),
:ok <- maybe_federate(activity) do
{:ok, activity}
end
@@ -344,6 +350,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
},
data <- Utils.maybe_put(data, "id", activity_id),
{:ok, activity} <- insert(data, local),
+ _ <- notify_and_stream(activity),
:ok <- maybe_federate(activity) do
{:ok, activity}
end
@@ -365,6 +372,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
reaction_data <- make_emoji_reaction_data(user, object, emoji, activity_id),
{:ok, activity} <- insert(reaction_data, local),
{:ok, object} <- add_emoji_reaction_to_object(activity, object),
+ _ <- notify_and_stream(activity),
:ok <- maybe_federate(activity) do
{:ok, activity, object}
else
@@ -391,6 +399,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
unreact_data <- make_undo_data(user, reaction_activity, activity_id),
{:ok, activity} <- insert(unreact_data, local),
{:ok, object} <- remove_emoji_reaction_from_object(reaction_activity, object),
+ _ <- notify_and_stream(activity),
:ok <- maybe_federate(activity) do
{:ok, activity, object}
else
@@ -413,6 +422,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
{:ok, unlike_activity} <- insert(unlike_data, local),
{:ok, _activity} <- Repo.delete(like_activity),
{:ok, object} <- remove_like_from_object(like_activity, object),
+ _ <- notify_and_stream(unlike_activity),
:ok <- maybe_federate(unlike_activity) do
{:ok, unlike_activity, like_activity, object}
else
@@ -442,6 +452,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
announce_data <- make_announce_data(user, object, activity_id, public),
{:ok, activity} <- insert(announce_data, local),
{:ok, object} <- add_announce_to_object(activity, object),
+ _ <- notify_and_stream(activity),
:ok <- maybe_federate(activity) do
{:ok, activity, object}
else
@@ -468,6 +479,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object),
unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id),
{:ok, unannounce_activity} <- insert(unannounce_data, local),
+ _ <- notify_and_stream(unannounce_activity),
:ok <- maybe_federate(unannounce_activity),
{:ok, _activity} <- Repo.delete(announce_activity),
{:ok, object} <- remove_announce_from_object(announce_activity, object) do
@@ -490,6 +502,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
defp do_follow(follower, followed, activity_id, local) do
with data <- make_follow_data(follower, followed, activity_id),
{:ok, activity} <- insert(data, local),
+ _ <- notify_and_stream(activity),
:ok <- maybe_federate(activity) do
{:ok, activity}
else
@@ -511,6 +524,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
{:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
{:ok, activity} <- insert(unfollow_data, local),
+ _ <- notify_and_stream(activity),
:ok <- maybe_federate(activity) do
{:ok, activity}
else
@@ -540,6 +554,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
with true <- outgoing_blocks,
block_data <- make_block_data(blocker, blocked, activity_id),
{:ok, activity} <- insert(block_data, local),
+ _ <- notify_and_stream(activity),
:ok <- maybe_federate(activity) do
{:ok, activity}
else
@@ -560,6 +575,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked),
unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id),
{:ok, activity} <- insert(unblock_data, local),
+ _ <- notify_and_stream(activity),
:ok <- maybe_federate(activity) do
{:ok, activity}
else
@@ -594,6 +610,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
with flag_data <- make_flag_data(params, additional),
{:ok, activity} <- insert(flag_data, local),
{:ok, stripped_activity} <- strip_report_status_data(activity),
+ _ <- notify_and_stream(activity),
:ok <- maybe_federate(stripped_activity) do
User.all_superusers()
|> Enum.filter(fn user -> not is_nil(user.email) end)
@@ -617,7 +634,8 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
}
with true <- origin.ap_id in target.also_known_as,
- {:ok, activity} <- insert(params, local) do
+ {:ok, activity} <- insert(params, local),
+ _ <- notify_and_stream(activity) do
maybe_federate(activity)
BackgroundWorker.enqueue("move_following", %{
diff --git a/lib/pleroma/web/mastodon_api/websocket_handler.ex b/lib/pleroma/web/mastodon_api/websocket_handler.ex
index 5652a37c1..6ef3fe2dd 100644
--- a/lib/pleroma/web/mastodon_api/websocket_handler.ex
+++ b/lib/pleroma/web/mastodon_api/websocket_handler.ex
@@ -12,6 +12,11 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
@behaviour :cowboy_websocket
+ # Cowboy timeout period.
+ @timeout :timer.seconds(30)
+ # Hibernate every X messages
+ @hibernate_every 100
+
@streams [
"public",
"public:local",
@@ -25,9 +30,6 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
]
@anonymous_streams ["public", "public:local", "hashtag"]
- # Handled by periodic keepalive in Pleroma.Web.Streamer.Ping.
- @timeout :infinity
-
def init(%{qs: qs} = req, state) do
with params <- :cow_qs.parse_qs(qs),
sec_websocket <- :cowboy_req.header("sec-websocket-protocol", req, nil),
@@ -42,7 +44,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
req
end
- {:cowboy_websocket, req, %{user: user, topic: topic}, %{idle_timeout: @timeout}}
+ {:cowboy_websocket, req, %{user: user, topic: topic, count: 0}, %{idle_timeout: @timeout}}
else
{:error, code} ->
Logger.debug("#{__MODULE__} denied connection: #{inspect(code)} - #{inspect(req)}")
@@ -57,7 +59,13 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
end
def websocket_init(state) do
- send(self(), :subscribe)
+ Logger.debug(
+ "#{__MODULE__} accepted websocket connection for user #{
+ (state.user || %{id: "anonymous"}).id
+ }, topic #{state.topic}"
+ )
+
+ Streamer.add_socket(state.topic, state.user)
{:ok, state}
end
@@ -66,19 +74,24 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
{:ok, state}
end
- def websocket_info(:subscribe, state) do
- Logger.debug(
- "#{__MODULE__} accepted websocket connection for user #{
- (state.user || %{id: "anonymous"}).id
- }, topic #{state.topic}"
- )
+ def websocket_info({:render_with_user, view, template, item}, state) do
+ user = %User{} = User.get_cached_by_ap_id(state.user.ap_id)
- Streamer.add_socket(state.topic, streamer_socket(state))
- {:ok, state}
+ unless Streamer.filtered_by_user?(user, item) do
+ websocket_info({:text, view.render(template, user, item)}, %{state | user: user})
+ else
+ {:ok, state}
+ end
end
def websocket_info({:text, message}, state) do
- {:reply, {:text, message}, state}
+ # If the websocket processed X messages, force an hibernate/GC.
+ # We don't hibernate at every message to balance CPU usage/latency with RAM usage.
+ if state.count > @hibernate_every do
+ {:reply, {:text, message}, %{state | count: 0}, :hibernate}
+ else
+ {:reply, {:text, message}, %{state | count: state.count + 1}}
+ end
end
def terminate(reason, _req, state) do
@@ -88,7 +101,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
}, topic #{state.topic || "?"}: #{inspect(reason)}"
)
- Streamer.remove_socket(state.topic, streamer_socket(state))
+ Streamer.remove_socket(state.topic)
:ok
end
@@ -136,8 +149,4 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
end
defp expand_topic(topic, _), do: topic
-
- defp streamer_socket(state) do
- %{transport_pid: self(), assigns: state}
- end
end
diff --git a/lib/pleroma/web/streamer/ping.ex b/lib/pleroma/web/streamer/ping.ex
deleted file mode 100644
index 7a08202a9..000000000
--- a/lib/pleroma/web/streamer/ping.ex
+++ /dev/null
@@ -1,37 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Web.Streamer.Ping do
- use GenServer
- require Logger
-
- alias Pleroma.Web.Streamer.State
- alias Pleroma.Web.Streamer.StreamerSocket
-
- @keepalive_interval :timer.seconds(30)
-
- def start_link(opts) do
- ping_interval = Keyword.get(opts, :ping_interval, @keepalive_interval)
- GenServer.start_link(__MODULE__, %{ping_interval: ping_interval}, name: __MODULE__)
- end
-
- def init(%{ping_interval: ping_interval} = args) do
- Process.send_after(self(), :ping, ping_interval)
- {:ok, args}
- end
-
- def handle_info(:ping, %{ping_interval: ping_interval} = state) do
- State.get_sockets()
- |> Map.values()
- |> List.flatten()
- |> Enum.each(fn %StreamerSocket{transport_pid: transport_pid} ->
- Logger.debug("Sending keepalive ping")
- send(transport_pid, {:text, ""})
- end)
-
- Process.send_after(self(), :ping, ping_interval)
-
- {:noreply, state}
- end
-end
diff --git a/lib/pleroma/web/streamer/state.ex b/lib/pleroma/web/streamer/state.ex
deleted file mode 100644
index 999550b88..000000000
--- a/lib/pleroma/web/streamer/state.ex
+++ /dev/null
@@ -1,82 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Web.Streamer.State do
- use GenServer
- require Logger
-
- alias Pleroma.Web.Streamer.StreamerSocket
-
- @env Mix.env()
-
- def start_link(_) do
- GenServer.start_link(__MODULE__, %{sockets: %{}}, name: __MODULE__)
- end
-
- def add_socket(topic, socket) do
- GenServer.call(__MODULE__, {:add, topic, socket})
- end
-
- def remove_socket(topic, socket) do
- do_remove_socket(@env, topic, socket)
- end
-
- def get_sockets do
- %{sockets: stream_sockets} = GenServer.call(__MODULE__, :get_state)
- stream_sockets
- end
-
- def init(init_arg) do
- {:ok, init_arg}
- end
-
- def handle_call(:get_state, _from, state) do
- {:reply, state, state}
- end
-
- def handle_call({:add, topic, socket}, _from, %{sockets: sockets} = state) do
- internal_topic = internal_topic(topic, socket)
- stream_socket = StreamerSocket.from_socket(socket)
-
- sockets_for_topic =
- sockets
- |> Map.get(internal_topic, [])
- |> List.insert_at(0, stream_socket)
- |> Enum.uniq()
-
- state = put_in(state, [:sockets, internal_topic], sockets_for_topic)
- Logger.debug("Got new conn for #{topic}")
- {:reply, state, state}
- end
-
- def handle_call({:remove, topic, socket}, _from, %{sockets: sockets} = state) do
- internal_topic = internal_topic(topic, socket)
- stream_socket = StreamerSocket.from_socket(socket)
-
- sockets_for_topic =
- sockets
- |> Map.get(internal_topic, [])
- |> List.delete(stream_socket)
-
- state = Kernel.put_in(state, [:sockets, internal_topic], sockets_for_topic)
- {:reply, state, state}
- end
-
- defp do_remove_socket(:test, _, _) do
- :ok
- end
-
- defp do_remove_socket(_env, topic, socket) do
- GenServer.call(__MODULE__, {:remove, topic, socket})
- end
-
- defp internal_topic(topic, socket)
- when topic in ~w[user user:notification direct] do
- "#{topic}:#{socket.assigns[:user].id}"
- end
-
- defp internal_topic(topic, _) do
- topic
- end
-end
diff --git a/lib/pleroma/web/streamer/streamer.ex b/lib/pleroma/web/streamer/streamer.ex
index 814d5a729..5ad4aa936 100644
--- a/lib/pleroma/web/streamer/streamer.ex
+++ b/lib/pleroma/web/streamer/streamer.ex
@@ -3,53 +3,241 @@
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.Streamer do
- alias Pleroma.Web.Streamer.State
- alias Pleroma.Web.Streamer.Worker
+ require Logger
+
+ alias Pleroma.Activity
+ alias Pleroma.Config
+ alias Pleroma.Conversation.Participation
+ alias Pleroma.Notification
+ alias Pleroma.Object
+ alias Pleroma.User
+ alias Pleroma.Web.ActivityPub.ActivityPub
+ alias Pleroma.Web.ActivityPub.Visibility
+ alias Pleroma.Web.CommonAPI
+ alias Pleroma.Web.StreamerView
- @timeout 60_000
@mix_env Mix.env()
+ @registry Pleroma.Web.StreamerRegistry
+
+ def registry, do: @registry
- def add_socket(topic, socket) do
- State.add_socket(topic, socket)
+ def add_socket(topic, %User{} = user) do
+ if should_env_send?(), do: Registry.register(@registry, user_topic(topic, user), true)
end
- def remove_socket(topic, socket) do
- State.remove_socket(topic, socket)
+ def add_socket(topic, _) do
+ if should_env_send?(), do: Registry.register(@registry, topic, false)
end
- def get_sockets do
- State.get_sockets()
+ def remove_socket(topic) do
+ if should_env_send?(), do: Registry.unregister(@registry, topic)
end
- def stream(topics, items) do
- if should_send?() do
- Task.async(fn ->
- :poolboy.transaction(
- :streamer_worker,
- &Worker.stream(&1, topics, items),
- @timeout
- )
+ def stream(topics, item) when is_list(topics) do
+ if should_env_send?() do
+ Enum.each(topics, fn t ->
+ spawn(fn -> do_stream(t, item) end)
end)
end
+
+ :ok
end
- def supervisor, do: Pleroma.Web.Streamer.Supervisor
+ def stream(topic, items) when is_list(items) do
+ if should_env_send?() do
+ Enum.each(items, fn i ->
+ spawn(fn -> do_stream(topic, i) end)
+ end)
- defp should_send? do
- handle_should_send(@mix_env)
+ :ok
+ end
end
- defp handle_should_send(:test) do
- case Process.whereis(:streamer_worker) do
- nil ->
- false
+ def stream(topic, item) do
+ if should_env_send?() do
+ spawn(fn -> do_stream(topic, item) end)
+ end
+
+ :ok
+ end
- pid ->
- Process.alive?(pid)
+ def filtered_by_user?(%User{} = user, %Activity{} = item) do
+ %{block: blocked_ap_ids, mute: muted_ap_ids, reblog_mute: reblog_muted_ap_ids} =
+ User.outgoing_relationships_ap_ids(user, [:block, :mute, :reblog_mute])
+
+ recipient_blocks = MapSet.new(blocked_ap_ids ++ muted_ap_ids)
+ recipients = MapSet.new(item.recipients)
+ domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.domain_blocks)
+
+ with parent <- Object.normalize(item) || item,
+ true <-
+ Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)),
+ true <- item.data["type"] != "Announce" || item.actor not in reblog_muted_ap_ids,
+ true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(parent.data["actor"] not in &1)),
+ true <- MapSet.disjoint?(recipients, recipient_blocks),
+ %{host: item_host} <- URI.parse(item.actor),
+ %{host: parent_host} <- URI.parse(parent.data["actor"]),
+ false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
+ false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
+ true <- thread_containment(item, user),
+ false <- CommonAPI.thread_muted?(user, item) do
+ false
+ else
+ _ -> true
end
end
- defp handle_should_send(:benchmark), do: false
+ def filtered_by_user?(%User{} = user, %Notification{activity: activity}) do
+ filtered_by_user?(user, activity)
+ end
+
+ defp do_stream("direct", item) do
+ recipient_topics =
+ User.get_recipients_from_activity(item)
+ |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
+
+ Enum.each(recipient_topics, fn user_topic ->
+ Logger.debug("Trying to push direct message to #{user_topic}\n\n")
+ push_to_socket(user_topic, item)
+ end)
+ end
+
+ defp do_stream("participation", participation) do
+ user_topic = "direct:#{participation.user_id}"
+ Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
- defp handle_should_send(_), do: true
+ push_to_socket(user_topic, participation)
+ end
+
+ defp do_stream("list", item) do
+ # filter the recipient list if the activity is not public, see #270.
+ recipient_lists =
+ case Visibility.is_public?(item) do
+ true ->
+ Pleroma.List.get_lists_from_activity(item)
+
+ _ ->
+ Pleroma.List.get_lists_from_activity(item)
+ |> Enum.filter(fn list ->
+ owner = User.get_cached_by_id(list.user_id)
+
+ Visibility.visible_for_user?(item, owner)
+ end)
+ end
+
+ recipient_topics =
+ recipient_lists
+ |> Enum.map(fn %{id: id} -> "list:#{id}" end)
+
+ Enum.each(recipient_topics, fn list_topic ->
+ Logger.debug("Trying to push message to #{list_topic}\n\n")
+ push_to_socket(list_topic, item)
+ end)
+ end
+
+ defp do_stream(topic, %Notification{} = item)
+ when topic in ["user", "user:notification"] do
+ Registry.dispatch(@registry, "#{topic}:#{item.user_id}", fn list ->
+ Enum.each(list, fn {pid, _auth} ->
+ send(pid, {:render_with_user, StreamerView, "notification.json", item})
+ end)
+ end)
+ end
+
+ defp do_stream("user", item) do
+ Logger.debug("Trying to push to users")
+
+ recipient_topics =
+ User.get_recipients_from_activity(item)
+ |> Enum.map(fn %{id: id} -> "user:#{id}" end)
+
+ Enum.each(recipient_topics, fn topic ->
+ push_to_socket(topic, item)
+ end)
+ end
+
+ defp do_stream(topic, item) do
+ Logger.debug("Trying to push to #{topic}")
+ Logger.debug("Pushing item to #{topic}")
+ push_to_socket(topic, item)
+ end
+
+ defp push_to_socket(topic, %Participation{} = participation) do
+ rendered = StreamerView.render("conversation.json", participation)
+
+ Registry.dispatch(@registry, topic, fn list ->
+ Enum.each(list, fn {pid, _} ->
+ send(pid, {:text, rendered})
+ end)
+ end)
+ end
+
+ defp push_to_socket(topic, %Activity{
+ data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
+ }) do
+ rendered = Jason.encode!(%{event: "delete", payload: to_string(deleted_activity_id)})
+
+ Registry.dispatch(@registry, topic, fn list ->
+ Enum.each(list, fn {pid, _} ->
+ send(pid, {:text, rendered})
+ end)
+ end)
+ end
+
+ defp push_to_socket(_topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
+
+ defp push_to_socket(topic, item) do
+ anon_render = StreamerView.render("update.json", item)
+
+ Registry.dispatch(@registry, topic, fn list ->
+ Enum.each(list, fn {pid, auth?} ->
+ if auth? do
+ send(pid, {:render_with_user, StreamerView, "update.json", item})
+ else
+ send(pid, {:text, anon_render})
+ end
+ end)
+ end)
+ end
+
+ defp thread_containment(_activity, %User{skip_thread_containment: true}), do: true
+
+ defp thread_containment(activity, user) do
+ if Config.get([:instance, :skip_thread_containment]) do
+ true
+ else
+ ActivityPub.contain_activity(activity, user)
+ end
+ end
+
+ # In test environement, only return true if the registry is started.
+ # In benchmark environment, returns false.
+ # In any other environment, always returns true.
+ cond do
+ @mix_env == :test ->
+ def should_env_send? do
+ case Process.whereis(@registry) do
+ nil ->
+ false
+
+ pid ->
+ Process.alive?(pid)
+ end
+ end
+
+ @mix_env == :benchmark ->
+ def should_env_send?, do: false
+
+ true ->
+ def should_env_send?, do: true
+ end
+
+ defp user_topic(topic, user)
+ when topic in ~w[user user:notification direct] do
+ "#{topic}:#{user.id}"
+ end
+
+ defp user_topic(topic, _) do
+ topic
+ end
end
diff --git a/lib/pleroma/web/streamer/streamer_socket.ex b/lib/pleroma/web/streamer/streamer_socket.ex
deleted file mode 100644
index 7d5dcd34e..000000000
--- a/lib/pleroma/web/streamer/streamer_socket.ex
+++ /dev/null
@@ -1,35 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Web.Streamer.StreamerSocket do
- defstruct transport_pid: nil, user: nil
-
- alias Pleroma.User
- alias Pleroma.Web.Streamer.StreamerSocket
-
- def from_socket(%{
- transport_pid: transport_pid,
- assigns: %{user: nil}
- }) do
- %StreamerSocket{
- transport_pid: transport_pid
- }
- end
-
- def from_socket(%{
- transport_pid: transport_pid,
- assigns: %{user: %User{} = user}
- }) do
- %StreamerSocket{
- transport_pid: transport_pid,
- user: user
- }
- end
-
- def from_socket(%{transport_pid: transport_pid}) do
- %StreamerSocket{
- transport_pid: transport_pid
- }
- end
-end
diff --git a/lib/pleroma/web/streamer/supervisor.ex b/lib/pleroma/web/streamer/supervisor.ex
deleted file mode 100644
index bd9029bc0..000000000
--- a/lib/pleroma/web/streamer/supervisor.ex
+++ /dev/null
@@ -1,37 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Web.Streamer.Supervisor do
- use Supervisor
-
- def start_link(opts) do
- Supervisor.start_link(__MODULE__, opts, name: __MODULE__)
- end
-
- def init(args) do
- children = [
- {Pleroma.Web.Streamer.State, args},
- {Pleroma.Web.Streamer.Ping, args},
- :poolboy.child_spec(:streamer_worker, poolboy_config())
- ]
-
- opts = [strategy: :one_for_one, name: Pleroma.Web.Streamer.Supervisor]
- Supervisor.init(children, opts)
- end
-
- defp poolboy_config do
- opts =
- Pleroma.Config.get(:streamer,
- workers: 3,
- overflow_workers: 2
- )
-
- [
- {:name, {:local, :streamer_worker}},
- {:worker_module, Pleroma.Web.Streamer.Worker},
- {:size, opts[:workers]},
- {:max_overflow, opts[:overflow_workers]}
- ]
- end
-end
diff --git a/lib/pleroma/web/streamer/worker.ex b/lib/pleroma/web/streamer/worker.ex
deleted file mode 100644
index f6160fa4d..000000000
--- a/lib/pleroma/web/streamer/worker.ex
+++ /dev/null
@@ -1,208 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Web.Streamer.Worker do
- use GenServer
-
- require Logger
-
- alias Pleroma.Activity
- alias Pleroma.Config
- alias Pleroma.Conversation.Participation
- alias Pleroma.Notification
- alias Pleroma.Object
- alias Pleroma.User
- alias Pleroma.Web.ActivityPub.ActivityPub
- alias Pleroma.Web.ActivityPub.Visibility
- alias Pleroma.Web.CommonAPI
- alias Pleroma.Web.Streamer.State
- alias Pleroma.Web.Streamer.StreamerSocket
- alias Pleroma.Web.StreamerView
-
- def start_link(_) do
- GenServer.start_link(__MODULE__, %{}, [])
- end
-
- def init(init_arg) do
- {:ok, init_arg}
- end
-
- def stream(pid, topics, items) do
- GenServer.call(pid, {:stream, topics, items})
- end
-
- def handle_call({:stream, topics, item}, _from, state) when is_list(topics) do
- Enum.each(topics, fn t ->
- do_stream(%{topic: t, item: item})
- end)
-
- {:reply, state, state}
- end
-
- def handle_call({:stream, topic, items}, _from, state) when is_list(items) do
- Enum.each(items, fn i ->
- do_stream(%{topic: topic, item: i})
- end)
-
- {:reply, state, state}
- end
-
- def handle_call({:stream, topic, item}, _from, state) do
- do_stream(%{topic: topic, item: item})
-
- {:reply, state, state}
- end
-
- defp do_stream(%{topic: "direct", item: item}) do
- recipient_topics =
- User.get_recipients_from_activity(item)
- |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
-
- Enum.each(recipient_topics, fn user_topic ->
- Logger.debug("Trying to push direct message to #{user_topic}\n\n")
- push_to_socket(State.get_sockets(), user_topic, item)
- end)
- end
-
- defp do_stream(%{topic: "participation", item: participation}) do
- user_topic = "direct:#{participation.user_id}"
- Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
-
- push_to_socket(State.get_sockets(), user_topic, participation)
- end
-
- defp do_stream(%{topic: "list", item: item}) do
- # filter the recipient list if the activity is not public, see #270.
- recipient_lists =
- case Visibility.is_public?(item) do
- true ->
- Pleroma.List.get_lists_from_activity(item)
-
- _ ->
- Pleroma.List.get_lists_from_activity(item)
- |> Enum.filter(fn list ->
- owner = User.get_cached_by_id(list.user_id)
-
- Visibility.visible_for_user?(item, owner)
- end)
- end
-
- recipient_topics =
- recipient_lists
- |> Enum.map(fn %{id: id} -> "list:#{id}" end)
-
- Enum.each(recipient_topics, fn list_topic ->
- Logger.debug("Trying to push message to #{list_topic}\n\n")
- push_to_socket(State.get_sockets(), list_topic, item)
- end)
- end
-
- defp do_stream(%{topic: topic, item: %Notification{} = item})
- when topic in ["user", "user:notification"] do
- State.get_sockets()
- |> Map.get("#{topic}:#{item.user_id}", [])
- |> Enum.each(fn %StreamerSocket{transport_pid: transport_pid, user: socket_user} ->
- with %User{} = user <- User.get_cached_by_ap_id(socket_user.ap_id),
- true <- should_send?(user, item) do
- send(transport_pid, {:text, StreamerView.render("notification.json", socket_user, item)})
- end
- end)
- end
-
- defp do_stream(%{topic: "user", item: item}) do
- Logger.debug("Trying to push to users")
-
- recipient_topics =
- User.get_recipients_from_activity(item)
- |> Enum.map(fn %{id: id} -> "user:#{id}" end)
-
- Enum.each(recipient_topics, fn topic ->
- push_to_socket(State.get_sockets(), topic, item)
- end)
- end
-
- defp do_stream(%{topic: topic, item: item}) do
- Logger.debug("Trying to push to #{topic}")
- Logger.debug("Pushing item to #{topic}")
- push_to_socket(State.get_sockets(), topic, item)
- end
-
- defp should_send?(%User{} = user, %Activity{} = item) do
- %{block: blocked_ap_ids, mute: muted_ap_ids, reblog_mute: reblog_muted_ap_ids} =
- User.outgoing_relationships_ap_ids(user, [:block, :mute, :reblog_mute])
-
- recipient_blocks = MapSet.new(blocked_ap_ids ++ muted_ap_ids)
- recipients = MapSet.new(item.recipients)
- domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.domain_blocks)
-
- with parent <- Object.normalize(item) || item,
- true <-
- Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)),
- true <- item.data["type"] != "Announce" || item.actor not in reblog_muted_ap_ids,
- true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(parent.data["actor"] not in &1)),
- true <- MapSet.disjoint?(recipients, recipient_blocks),
- %{host: item_host} <- URI.parse(item.actor),
- %{host: parent_host} <- URI.parse(parent.data["actor"]),
- false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
- false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
- true <- thread_containment(item, user),
- false <- CommonAPI.thread_muted?(user, item) do
- true
- else
- _ -> false
- end
- end
-
- defp should_send?(%User{} = user, %Notification{activity: activity}) do
- should_send?(user, activity)
- end
-
- def push_to_socket(topics, topic, %Participation{} = participation) do
- Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
- send(transport_pid, {:text, StreamerView.render("conversation.json", participation)})
- end)
- end
-
- def push_to_socket(topics, topic, %Activity{
- data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
- }) do
- Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
- send(
- transport_pid,
- {:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()}
- )
- end)
- end
-
- def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
-
- def push_to_socket(topics, topic, item) do
- Enum.each(topics[topic] || [], fn %StreamerSocket{
- transport_pid: transport_pid,
- user: socket_user
- } ->
- # Get the current user so we have up-to-date blocks etc.
- if socket_user do
- user = User.get_cached_by_ap_id(socket_user.ap_id)
-
- if should_send?(user, item) do
- send(transport_pid, {:text, StreamerView.render("update.json", item, user)})
- end
- else
- send(transport_pid, {:text, StreamerView.render("update.json", item)})
- end
- end)
- end
-
- @spec thread_containment(Activity.t(), User.t()) :: boolean()
- defp thread_containment(_activity, %User{skip_thread_containment: true}), do: true
-
- defp thread_containment(activity, user) do
- if Config.get([:instance, :skip_thread_containment]) do
- true
- else
- ActivityPub.contain_activity(activity, user)
- end
- end
-end