diff options
Diffstat (limited to 'lib/pleroma/web/mastodon_api/websocket_handler.ex')
-rw-r--r-- | lib/pleroma/web/mastodon_api/websocket_handler.ex | 47 |
1 files changed, 28 insertions, 19 deletions
diff --git a/lib/pleroma/web/mastodon_api/websocket_handler.ex b/lib/pleroma/web/mastodon_api/websocket_handler.ex index 5652a37c1..e2ffd02d0 100644 --- a/lib/pleroma/web/mastodon_api/websocket_handler.ex +++ b/lib/pleroma/web/mastodon_api/websocket_handler.ex @@ -12,6 +12,11 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do @behaviour :cowboy_websocket + # Cowboy timeout period. + @timeout :timer.seconds(30) + # Hibernate every X messages + @hibernate_every 100 + @streams [ "public", "public:local", @@ -25,9 +30,6 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do ] @anonymous_streams ["public", "public:local", "hashtag"] - # Handled by periodic keepalive in Pleroma.Web.Streamer.Ping. - @timeout :infinity - def init(%{qs: qs} = req, state) do with params <- :cow_qs.parse_qs(qs), sec_websocket <- :cowboy_req.header("sec-websocket-protocol", req, nil), @@ -42,7 +44,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do req end - {:cowboy_websocket, req, %{user: user, topic: topic}, %{idle_timeout: @timeout}} + {:cowboy_websocket, req, %{user: user, topic: topic, count: 0}, %{idle_timeout: @timeout}} else {:error, code} -> Logger.debug("#{__MODULE__} denied connection: #{inspect(code)} - #{inspect(req)}") @@ -57,7 +59,13 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do end def websocket_init(state) do - send(self(), :subscribe) + Logger.debug( + "#{__MODULE__} accepted websocket connection for user #{ + (state.user || %{id: "anonymous"}).id + }, topic #{state.topic}" + ) + + Streamer.add_socket(state.topic, state.user) {:ok, state} end @@ -66,19 +74,24 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do {:ok, state} end - def websocket_info(:subscribe, state) do - Logger.debug( - "#{__MODULE__} accepted websocket connection for user #{ - (state.user || %{id: "anonymous"}).id - }, topic #{state.topic}" - ) + def websocket_info({:render_with_user, view, template, item}, state) do + user = %User{} = User.get_cached_by_ap_id(state.user.ap_id) - Streamer.add_socket(state.topic, streamer_socket(state)) - {:ok, state} + unless Streamer.filtered_by_user?(user, item) do + websocket_info({:text, view.render(template, item, user)}, %{state | user: user}) + else + {:ok, state} + end end def websocket_info({:text, message}, state) do - {:reply, {:text, message}, state} + # If the websocket processed X messages, force an hibernate/GC. + # We don't hibernate at every message to balance CPU usage/latency with RAM usage. + if state.count > @hibernate_every do + {:reply, {:text, message}, %{state | count: 0}, :hibernate} + else + {:reply, {:text, message}, %{state | count: state.count + 1}} + end end def terminate(reason, _req, state) do @@ -88,7 +101,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do }, topic #{state.topic || "?"}: #{inspect(reason)}" ) - Streamer.remove_socket(state.topic, streamer_socket(state)) + Streamer.remove_socket(state.topic) :ok end @@ -136,8 +149,4 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do end defp expand_topic(topic, _), do: topic - - defp streamer_socket(state) do - %{transport_pid: self(), assigns: state} - end end |