diff options
author | href <href+git-pleroma@random.sh> | 2020-05-07 09:13:32 +0000 |
---|---|---|
committer | lain <lain@soykaf.club> | 2020-05-07 09:13:32 +0000 |
commit | 9491ba3e49450e80cd1c21358c01e4e06e3d881d (patch) | |
tree | a5af903d41983aca0f4522be35b907854a581757 /lib/pleroma/web/streamer | |
parent | 68a126317d7cdd670c8e244319da08ff85639d33 (diff) | |
download | pleroma-9491ba3e49450e80cd1c21358c01e4e06e3d881d.tar.gz |
Streamer rework
Diffstat (limited to 'lib/pleroma/web/streamer')
-rw-r--r-- | lib/pleroma/web/streamer/ping.ex | 37 | ||||
-rw-r--r-- | lib/pleroma/web/streamer/state.ex | 82 | ||||
-rw-r--r-- | lib/pleroma/web/streamer/streamer.ex | 244 | ||||
-rw-r--r-- | lib/pleroma/web/streamer/streamer_socket.ex | 35 | ||||
-rw-r--r-- | lib/pleroma/web/streamer/supervisor.ex | 37 | ||||
-rw-r--r-- | lib/pleroma/web/streamer/worker.ex | 208 |
6 files changed, 216 insertions, 427 deletions
diff --git a/lib/pleroma/web/streamer/ping.ex b/lib/pleroma/web/streamer/ping.ex deleted file mode 100644 index 7a08202a9..000000000 --- a/lib/pleroma/web/streamer/ping.ex +++ /dev/null @@ -1,37 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Web.Streamer.Ping do - use GenServer - require Logger - - alias Pleroma.Web.Streamer.State - alias Pleroma.Web.Streamer.StreamerSocket - - @keepalive_interval :timer.seconds(30) - - def start_link(opts) do - ping_interval = Keyword.get(opts, :ping_interval, @keepalive_interval) - GenServer.start_link(__MODULE__, %{ping_interval: ping_interval}, name: __MODULE__) - end - - def init(%{ping_interval: ping_interval} = args) do - Process.send_after(self(), :ping, ping_interval) - {:ok, args} - end - - def handle_info(:ping, %{ping_interval: ping_interval} = state) do - State.get_sockets() - |> Map.values() - |> List.flatten() - |> Enum.each(fn %StreamerSocket{transport_pid: transport_pid} -> - Logger.debug("Sending keepalive ping") - send(transport_pid, {:text, ""}) - end) - - Process.send_after(self(), :ping, ping_interval) - - {:noreply, state} - end -end diff --git a/lib/pleroma/web/streamer/state.ex b/lib/pleroma/web/streamer/state.ex deleted file mode 100644 index 999550b88..000000000 --- a/lib/pleroma/web/streamer/state.ex +++ /dev/null @@ -1,82 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Web.Streamer.State do - use GenServer - require Logger - - alias Pleroma.Web.Streamer.StreamerSocket - - @env Mix.env() - - def start_link(_) do - GenServer.start_link(__MODULE__, %{sockets: %{}}, name: __MODULE__) - end - - def add_socket(topic, socket) do - GenServer.call(__MODULE__, {:add, topic, socket}) - end - - def remove_socket(topic, socket) do - do_remove_socket(@env, topic, socket) - end - - def get_sockets do - %{sockets: stream_sockets} = GenServer.call(__MODULE__, :get_state) - stream_sockets - end - - def init(init_arg) do - {:ok, init_arg} - end - - def handle_call(:get_state, _from, state) do - {:reply, state, state} - end - - def handle_call({:add, topic, socket}, _from, %{sockets: sockets} = state) do - internal_topic = internal_topic(topic, socket) - stream_socket = StreamerSocket.from_socket(socket) - - sockets_for_topic = - sockets - |> Map.get(internal_topic, []) - |> List.insert_at(0, stream_socket) - |> Enum.uniq() - - state = put_in(state, [:sockets, internal_topic], sockets_for_topic) - Logger.debug("Got new conn for #{topic}") - {:reply, state, state} - end - - def handle_call({:remove, topic, socket}, _from, %{sockets: sockets} = state) do - internal_topic = internal_topic(topic, socket) - stream_socket = StreamerSocket.from_socket(socket) - - sockets_for_topic = - sockets - |> Map.get(internal_topic, []) - |> List.delete(stream_socket) - - state = Kernel.put_in(state, [:sockets, internal_topic], sockets_for_topic) - {:reply, state, state} - end - - defp do_remove_socket(:test, _, _) do - :ok - end - - defp do_remove_socket(_env, topic, socket) do - GenServer.call(__MODULE__, {:remove, topic, socket}) - end - - defp internal_topic(topic, socket) - when topic in ~w[user user:notification direct] do - "#{topic}:#{socket.assigns[:user].id}" - end - - defp internal_topic(topic, _) do - topic - end -end diff --git a/lib/pleroma/web/streamer/streamer.ex b/lib/pleroma/web/streamer/streamer.ex index 814d5a729..5ad4aa936 100644 --- a/lib/pleroma/web/streamer/streamer.ex +++ b/lib/pleroma/web/streamer/streamer.ex @@ -3,53 +3,241 @@ # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Web.Streamer do - alias Pleroma.Web.Streamer.State - alias Pleroma.Web.Streamer.Worker + require Logger + + alias Pleroma.Activity + alias Pleroma.Config + alias Pleroma.Conversation.Participation + alias Pleroma.Notification + alias Pleroma.Object + alias Pleroma.User + alias Pleroma.Web.ActivityPub.ActivityPub + alias Pleroma.Web.ActivityPub.Visibility + alias Pleroma.Web.CommonAPI + alias Pleroma.Web.StreamerView - @timeout 60_000 @mix_env Mix.env() + @registry Pleroma.Web.StreamerRegistry + + def registry, do: @registry - def add_socket(topic, socket) do - State.add_socket(topic, socket) + def add_socket(topic, %User{} = user) do + if should_env_send?(), do: Registry.register(@registry, user_topic(topic, user), true) end - def remove_socket(topic, socket) do - State.remove_socket(topic, socket) + def add_socket(topic, _) do + if should_env_send?(), do: Registry.register(@registry, topic, false) end - def get_sockets do - State.get_sockets() + def remove_socket(topic) do + if should_env_send?(), do: Registry.unregister(@registry, topic) end - def stream(topics, items) do - if should_send?() do - Task.async(fn -> - :poolboy.transaction( - :streamer_worker, - &Worker.stream(&1, topics, items), - @timeout - ) + def stream(topics, item) when is_list(topics) do + if should_env_send?() do + Enum.each(topics, fn t -> + spawn(fn -> do_stream(t, item) end) end) end + + :ok end - def supervisor, do: Pleroma.Web.Streamer.Supervisor + def stream(topic, items) when is_list(items) do + if should_env_send?() do + Enum.each(items, fn i -> + spawn(fn -> do_stream(topic, i) end) + end) - defp should_send? do - handle_should_send(@mix_env) + :ok + end end - defp handle_should_send(:test) do - case Process.whereis(:streamer_worker) do - nil -> - false + def stream(topic, item) do + if should_env_send?() do + spawn(fn -> do_stream(topic, item) end) + end + + :ok + end - pid -> - Process.alive?(pid) + def filtered_by_user?(%User{} = user, %Activity{} = item) do + %{block: blocked_ap_ids, mute: muted_ap_ids, reblog_mute: reblog_muted_ap_ids} = + User.outgoing_relationships_ap_ids(user, [:block, :mute, :reblog_mute]) + + recipient_blocks = MapSet.new(blocked_ap_ids ++ muted_ap_ids) + recipients = MapSet.new(item.recipients) + domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.domain_blocks) + + with parent <- Object.normalize(item) || item, + true <- + Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)), + true <- item.data["type"] != "Announce" || item.actor not in reblog_muted_ap_ids, + true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(parent.data["actor"] not in &1)), + true <- MapSet.disjoint?(recipients, recipient_blocks), + %{host: item_host} <- URI.parse(item.actor), + %{host: parent_host} <- URI.parse(parent.data["actor"]), + false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host), + false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host), + true <- thread_containment(item, user), + false <- CommonAPI.thread_muted?(user, item) do + false + else + _ -> true end end - defp handle_should_send(:benchmark), do: false + def filtered_by_user?(%User{} = user, %Notification{activity: activity}) do + filtered_by_user?(user, activity) + end + + defp do_stream("direct", item) do + recipient_topics = + User.get_recipients_from_activity(item) + |> Enum.map(fn %{id: id} -> "direct:#{id}" end) + + Enum.each(recipient_topics, fn user_topic -> + Logger.debug("Trying to push direct message to #{user_topic}\n\n") + push_to_socket(user_topic, item) + end) + end + + defp do_stream("participation", participation) do + user_topic = "direct:#{participation.user_id}" + Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n") - defp handle_should_send(_), do: true + push_to_socket(user_topic, participation) + end + + defp do_stream("list", item) do + # filter the recipient list if the activity is not public, see #270. + recipient_lists = + case Visibility.is_public?(item) do + true -> + Pleroma.List.get_lists_from_activity(item) + + _ -> + Pleroma.List.get_lists_from_activity(item) + |> Enum.filter(fn list -> + owner = User.get_cached_by_id(list.user_id) + + Visibility.visible_for_user?(item, owner) + end) + end + + recipient_topics = + recipient_lists + |> Enum.map(fn %{id: id} -> "list:#{id}" end) + + Enum.each(recipient_topics, fn list_topic -> + Logger.debug("Trying to push message to #{list_topic}\n\n") + push_to_socket(list_topic, item) + end) + end + + defp do_stream(topic, %Notification{} = item) + when topic in ["user", "user:notification"] do + Registry.dispatch(@registry, "#{topic}:#{item.user_id}", fn list -> + Enum.each(list, fn {pid, _auth} -> + send(pid, {:render_with_user, StreamerView, "notification.json", item}) + end) + end) + end + + defp do_stream("user", item) do + Logger.debug("Trying to push to users") + + recipient_topics = + User.get_recipients_from_activity(item) + |> Enum.map(fn %{id: id} -> "user:#{id}" end) + + Enum.each(recipient_topics, fn topic -> + push_to_socket(topic, item) + end) + end + + defp do_stream(topic, item) do + Logger.debug("Trying to push to #{topic}") + Logger.debug("Pushing item to #{topic}") + push_to_socket(topic, item) + end + + defp push_to_socket(topic, %Participation{} = participation) do + rendered = StreamerView.render("conversation.json", participation) + + Registry.dispatch(@registry, topic, fn list -> + Enum.each(list, fn {pid, _} -> + send(pid, {:text, rendered}) + end) + end) + end + + defp push_to_socket(topic, %Activity{ + data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id} + }) do + rendered = Jason.encode!(%{event: "delete", payload: to_string(deleted_activity_id)}) + + Registry.dispatch(@registry, topic, fn list -> + Enum.each(list, fn {pid, _} -> + send(pid, {:text, rendered}) + end) + end) + end + + defp push_to_socket(_topic, %Activity{data: %{"type" => "Delete"}}), do: :noop + + defp push_to_socket(topic, item) do + anon_render = StreamerView.render("update.json", item) + + Registry.dispatch(@registry, topic, fn list -> + Enum.each(list, fn {pid, auth?} -> + if auth? do + send(pid, {:render_with_user, StreamerView, "update.json", item}) + else + send(pid, {:text, anon_render}) + end + end) + end) + end + + defp thread_containment(_activity, %User{skip_thread_containment: true}), do: true + + defp thread_containment(activity, user) do + if Config.get([:instance, :skip_thread_containment]) do + true + else + ActivityPub.contain_activity(activity, user) + end + end + + # In test environement, only return true if the registry is started. + # In benchmark environment, returns false. + # In any other environment, always returns true. + cond do + @mix_env == :test -> + def should_env_send? do + case Process.whereis(@registry) do + nil -> + false + + pid -> + Process.alive?(pid) + end + end + + @mix_env == :benchmark -> + def should_env_send?, do: false + + true -> + def should_env_send?, do: true + end + + defp user_topic(topic, user) + when topic in ~w[user user:notification direct] do + "#{topic}:#{user.id}" + end + + defp user_topic(topic, _) do + topic + end end diff --git a/lib/pleroma/web/streamer/streamer_socket.ex b/lib/pleroma/web/streamer/streamer_socket.ex deleted file mode 100644 index 7d5dcd34e..000000000 --- a/lib/pleroma/web/streamer/streamer_socket.ex +++ /dev/null @@ -1,35 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Web.Streamer.StreamerSocket do - defstruct transport_pid: nil, user: nil - - alias Pleroma.User - alias Pleroma.Web.Streamer.StreamerSocket - - def from_socket(%{ - transport_pid: transport_pid, - assigns: %{user: nil} - }) do - %StreamerSocket{ - transport_pid: transport_pid - } - end - - def from_socket(%{ - transport_pid: transport_pid, - assigns: %{user: %User{} = user} - }) do - %StreamerSocket{ - transport_pid: transport_pid, - user: user - } - end - - def from_socket(%{transport_pid: transport_pid}) do - %StreamerSocket{ - transport_pid: transport_pid - } - end -end diff --git a/lib/pleroma/web/streamer/supervisor.ex b/lib/pleroma/web/streamer/supervisor.ex deleted file mode 100644 index bd9029bc0..000000000 --- a/lib/pleroma/web/streamer/supervisor.ex +++ /dev/null @@ -1,37 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Web.Streamer.Supervisor do - use Supervisor - - def start_link(opts) do - Supervisor.start_link(__MODULE__, opts, name: __MODULE__) - end - - def init(args) do - children = [ - {Pleroma.Web.Streamer.State, args}, - {Pleroma.Web.Streamer.Ping, args}, - :poolboy.child_spec(:streamer_worker, poolboy_config()) - ] - - opts = [strategy: :one_for_one, name: Pleroma.Web.Streamer.Supervisor] - Supervisor.init(children, opts) - end - - defp poolboy_config do - opts = - Pleroma.Config.get(:streamer, - workers: 3, - overflow_workers: 2 - ) - - [ - {:name, {:local, :streamer_worker}}, - {:worker_module, Pleroma.Web.Streamer.Worker}, - {:size, opts[:workers]}, - {:max_overflow, opts[:overflow_workers]} - ] - end -end diff --git a/lib/pleroma/web/streamer/worker.ex b/lib/pleroma/web/streamer/worker.ex deleted file mode 100644 index f6160fa4d..000000000 --- a/lib/pleroma/web/streamer/worker.ex +++ /dev/null @@ -1,208 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Web.Streamer.Worker do - use GenServer - - require Logger - - alias Pleroma.Activity - alias Pleroma.Config - alias Pleroma.Conversation.Participation - alias Pleroma.Notification - alias Pleroma.Object - alias Pleroma.User - alias Pleroma.Web.ActivityPub.ActivityPub - alias Pleroma.Web.ActivityPub.Visibility - alias Pleroma.Web.CommonAPI - alias Pleroma.Web.Streamer.State - alias Pleroma.Web.Streamer.StreamerSocket - alias Pleroma.Web.StreamerView - - def start_link(_) do - GenServer.start_link(__MODULE__, %{}, []) - end - - def init(init_arg) do - {:ok, init_arg} - end - - def stream(pid, topics, items) do - GenServer.call(pid, {:stream, topics, items}) - end - - def handle_call({:stream, topics, item}, _from, state) when is_list(topics) do - Enum.each(topics, fn t -> - do_stream(%{topic: t, item: item}) - end) - - {:reply, state, state} - end - - def handle_call({:stream, topic, items}, _from, state) when is_list(items) do - Enum.each(items, fn i -> - do_stream(%{topic: topic, item: i}) - end) - - {:reply, state, state} - end - - def handle_call({:stream, topic, item}, _from, state) do - do_stream(%{topic: topic, item: item}) - - {:reply, state, state} - end - - defp do_stream(%{topic: "direct", item: item}) do - recipient_topics = - User.get_recipients_from_activity(item) - |> Enum.map(fn %{id: id} -> "direct:#{id}" end) - - Enum.each(recipient_topics, fn user_topic -> - Logger.debug("Trying to push direct message to #{user_topic}\n\n") - push_to_socket(State.get_sockets(), user_topic, item) - end) - end - - defp do_stream(%{topic: "participation", item: participation}) do - user_topic = "direct:#{participation.user_id}" - Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n") - - push_to_socket(State.get_sockets(), user_topic, participation) - end - - defp do_stream(%{topic: "list", item: item}) do - # filter the recipient list if the activity is not public, see #270. - recipient_lists = - case Visibility.is_public?(item) do - true -> - Pleroma.List.get_lists_from_activity(item) - - _ -> - Pleroma.List.get_lists_from_activity(item) - |> Enum.filter(fn list -> - owner = User.get_cached_by_id(list.user_id) - - Visibility.visible_for_user?(item, owner) - end) - end - - recipient_topics = - recipient_lists - |> Enum.map(fn %{id: id} -> "list:#{id}" end) - - Enum.each(recipient_topics, fn list_topic -> - Logger.debug("Trying to push message to #{list_topic}\n\n") - push_to_socket(State.get_sockets(), list_topic, item) - end) - end - - defp do_stream(%{topic: topic, item: %Notification{} = item}) - when topic in ["user", "user:notification"] do - State.get_sockets() - |> Map.get("#{topic}:#{item.user_id}", []) - |> Enum.each(fn %StreamerSocket{transport_pid: transport_pid, user: socket_user} -> - with %User{} = user <- User.get_cached_by_ap_id(socket_user.ap_id), - true <- should_send?(user, item) do - send(transport_pid, {:text, StreamerView.render("notification.json", socket_user, item)}) - end - end) - end - - defp do_stream(%{topic: "user", item: item}) do - Logger.debug("Trying to push to users") - - recipient_topics = - User.get_recipients_from_activity(item) - |> Enum.map(fn %{id: id} -> "user:#{id}" end) - - Enum.each(recipient_topics, fn topic -> - push_to_socket(State.get_sockets(), topic, item) - end) - end - - defp do_stream(%{topic: topic, item: item}) do - Logger.debug("Trying to push to #{topic}") - Logger.debug("Pushing item to #{topic}") - push_to_socket(State.get_sockets(), topic, item) - end - - defp should_send?(%User{} = user, %Activity{} = item) do - %{block: blocked_ap_ids, mute: muted_ap_ids, reblog_mute: reblog_muted_ap_ids} = - User.outgoing_relationships_ap_ids(user, [:block, :mute, :reblog_mute]) - - recipient_blocks = MapSet.new(blocked_ap_ids ++ muted_ap_ids) - recipients = MapSet.new(item.recipients) - domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.domain_blocks) - - with parent <- Object.normalize(item) || item, - true <- - Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)), - true <- item.data["type"] != "Announce" || item.actor not in reblog_muted_ap_ids, - true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(parent.data["actor"] not in &1)), - true <- MapSet.disjoint?(recipients, recipient_blocks), - %{host: item_host} <- URI.parse(item.actor), - %{host: parent_host} <- URI.parse(parent.data["actor"]), - false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host), - false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host), - true <- thread_containment(item, user), - false <- CommonAPI.thread_muted?(user, item) do - true - else - _ -> false - end - end - - defp should_send?(%User{} = user, %Notification{activity: activity}) do - should_send?(user, activity) - end - - def push_to_socket(topics, topic, %Participation{} = participation) do - Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} -> - send(transport_pid, {:text, StreamerView.render("conversation.json", participation)}) - end) - end - - def push_to_socket(topics, topic, %Activity{ - data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id} - }) do - Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} -> - send( - transport_pid, - {:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()} - ) - end) - end - - def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop - - def push_to_socket(topics, topic, item) do - Enum.each(topics[topic] || [], fn %StreamerSocket{ - transport_pid: transport_pid, - user: socket_user - } -> - # Get the current user so we have up-to-date blocks etc. - if socket_user do - user = User.get_cached_by_ap_id(socket_user.ap_id) - - if should_send?(user, item) do - send(transport_pid, {:text, StreamerView.render("update.json", item, user)}) - end - else - send(transport_pid, {:text, StreamerView.render("update.json", item)}) - end - end) - end - - @spec thread_containment(Activity.t(), User.t()) :: boolean() - defp thread_containment(_activity, %User{skip_thread_containment: true}), do: true - - defp thread_containment(activity, user) do - if Config.get([:instance, :skip_thread_containment]) do - true - else - ActivityPub.contain_activity(activity, user) - end - end -end |