aboutsummaryrefslogtreecommitdiff
path: root/lib/pleroma/web/mastodon_api/websocket_handler.ex
diff options
context:
space:
mode:
authorEgor Kislitsyn <egor@kislitsyn.com>2020-06-01 15:48:51 +0400
committerEgor Kislitsyn <egor@kislitsyn.com>2020-06-01 15:48:51 +0400
commita7627bdc7ae67a5c103f968eea02d6b1cf1ef8da (patch)
treee12af401307cfc3120d50c01580cd959f3b2503a /lib/pleroma/web/mastodon_api/websocket_handler.ex
parentdecaa64f75f8bd69622fa5fba757f99719f09808 (diff)
parente96765df6b04fe5e9766271a9c62e559392758b2 (diff)
downloadpleroma-a7627bdc7ae67a5c103f968eea02d6b1cf1ef8da.tar.gz
Merge remote-tracking branch 'origin/develop' into global-status-expiration
Diffstat (limited to 'lib/pleroma/web/mastodon_api/websocket_handler.ex')
-rw-r--r--lib/pleroma/web/mastodon_api/websocket_handler.ex136
1 files changed, 63 insertions, 73 deletions
diff --git a/lib/pleroma/web/mastodon_api/websocket_handler.ex b/lib/pleroma/web/mastodon_api/websocket_handler.ex
index 5652a37c1..94e4595d8 100644
--- a/lib/pleroma/web/mastodon_api/websocket_handler.ex
+++ b/lib/pleroma/web/mastodon_api/websocket_handler.ex
@@ -12,29 +12,19 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
@behaviour :cowboy_websocket
- @streams [
- "public",
- "public:local",
- "public:media",
- "public:local:media",
- "user",
- "user:notification",
- "direct",
- "list",
- "hashtag"
- ]
- @anonymous_streams ["public", "public:local", "hashtag"]
-
- # Handled by periodic keepalive in Pleroma.Web.Streamer.Ping.
- @timeout :infinity
+ # Client ping period.
+ @tick :timer.seconds(30)
+ # Cowboy timeout period.
+ @timeout :timer.seconds(60)
+ # Hibernate every X messages
+ @hibernate_every 100
def init(%{qs: qs} = req, state) do
- with params <- :cow_qs.parse_qs(qs),
+ with params <- Enum.into(:cow_qs.parse_qs(qs), %{}),
sec_websocket <- :cowboy_req.header("sec-websocket-protocol", req, nil),
- access_token <- List.keyfind(params, "access_token", 0),
- {_, stream} <- List.keyfind(params, "stream", 0),
- {:ok, user} <- allow_request(stream, [access_token, sec_websocket]),
- topic when is_binary(topic) <- expand_topic(stream, params) do
+ access_token <- Map.get(params, "access_token"),
+ {:ok, user} <- authenticate_request(access_token, sec_websocket),
+ {:ok, topic} <- Streamer.get_topic(Map.get(params, "stream"), user, params) do
req =
if sec_websocket do
:cowboy_req.set_resp_header("sec-websocket-protocol", sec_websocket, req)
@@ -42,43 +32,70 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
req
end
- {:cowboy_websocket, req, %{user: user, topic: topic}, %{idle_timeout: @timeout}}
+ {:cowboy_websocket, req, %{user: user, topic: topic, count: 0, timer: nil},
+ %{idle_timeout: @timeout}}
else
- {:error, code} ->
- Logger.debug("#{__MODULE__} denied connection: #{inspect(code)} - #{inspect(req)}")
- {:ok, req} = :cowboy_req.reply(code, req)
+ {:error, :bad_topic} ->
+ Logger.debug("#{__MODULE__} bad topic #{inspect(req)}")
+ {:ok, req} = :cowboy_req.reply(404, req)
{:ok, req, state}
- error ->
- Logger.debug("#{__MODULE__} denied connection: #{inspect(error)} - #{inspect(req)}")
- {:ok, req} = :cowboy_req.reply(400, req)
+ {:error, :unauthorized} ->
+ Logger.debug("#{__MODULE__} authentication error: #{inspect(req)}")
+ {:ok, req} = :cowboy_req.reply(401, req)
{:ok, req, state}
end
end
def websocket_init(state) do
- send(self(), :subscribe)
- {:ok, state}
- end
-
- # We never receive messages.
- def websocket_handle(_frame, state) do
- {:ok, state}
- end
-
- def websocket_info(:subscribe, state) do
Logger.debug(
"#{__MODULE__} accepted websocket connection for user #{
(state.user || %{id: "anonymous"}).id
}, topic #{state.topic}"
)
- Streamer.add_socket(state.topic, streamer_socket(state))
+ Streamer.add_socket(state.topic, state.user)
+ {:ok, %{state | timer: timer()}}
+ end
+
+ # Client's Pong frame.
+ def websocket_handle(:pong, state) do
+ if state.timer, do: Process.cancel_timer(state.timer)
+ {:ok, %{state | timer: timer()}}
+ end
+
+ # We never receive messages.
+ def websocket_handle(frame, state) do
+ Logger.error("#{__MODULE__} received frame: #{inspect(frame)}")
{:ok, state}
end
+ def websocket_info({:render_with_user, view, template, item}, state) do
+ user = %User{} = User.get_cached_by_ap_id(state.user.ap_id)
+
+ unless Streamer.filtered_by_user?(user, item) do
+ websocket_info({:text, view.render(template, item, user)}, %{state | user: user})
+ else
+ {:ok, state}
+ end
+ end
+
def websocket_info({:text, message}, state) do
- {:reply, {:text, message}, state}
+ # If the websocket processed X messages, force an hibernate/GC.
+ # We don't hibernate at every message to balance CPU usage/latency with RAM usage.
+ if state.count > @hibernate_every do
+ {:reply, {:text, message}, %{state | count: 0}, :hibernate}
+ else
+ {:reply, {:text, message}, %{state | count: state.count + 1}}
+ end
+ end
+
+ # Ping tick. We don't re-queue a timer there, it is instead queued when :pong is received.
+ # As we hibernate there, reset the count to 0.
+ # If the client misses :pong, Cowboy will automatically timeout the connection after
+ # `@idle_timeout`.
+ def websocket_info(:tick, state) do
+ {:reply, :ping, %{state | timer: nil, count: 0}, :hibernate}
end
def terminate(reason, _req, state) do
@@ -88,56 +105,29 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
}, topic #{state.topic || "?"}: #{inspect(reason)}"
)
- Streamer.remove_socket(state.topic, streamer_socket(state))
+ Streamer.remove_socket(state.topic)
:ok
end
# Public streams without authentication.
- defp allow_request(stream, [nil, nil]) when stream in @anonymous_streams do
+ defp authenticate_request(nil, nil) do
{:ok, nil}
end
# Authenticated streams.
- defp allow_request(stream, [access_token, sec_websocket]) when stream in @streams do
- token =
- with {"access_token", token} <- access_token do
- token
- else
- _ -> sec_websocket
- end
+ defp authenticate_request(access_token, sec_websocket) do
+ token = access_token || sec_websocket
with true <- is_bitstring(token),
%Token{user_id: user_id} <- Repo.get_by(Token, token: token),
user = %User{} <- User.get_cached_by_id(user_id) do
{:ok, user}
else
- _ -> {:error, 403}
+ _ -> {:error, :unauthorized}
end
end
- # Not authenticated.
- defp allow_request(stream, _) when stream in @streams, do: {:error, 403}
-
- # No matching stream.
- defp allow_request(_, _), do: {:error, 404}
-
- defp expand_topic("hashtag", params) do
- case List.keyfind(params, "tag", 0) do
- {_, tag} -> "hashtag:#{tag}"
- _ -> nil
- end
- end
-
- defp expand_topic("list", params) do
- case List.keyfind(params, "list", 0) do
- {_, list} -> "list:#{list}"
- _ -> nil
- end
- end
-
- defp expand_topic(topic, _), do: topic
-
- defp streamer_socket(state) do
- %{transport_pid: self(), assigns: state}
+ defp timer do
+ Process.send_after(self(), :tick, @tick)
end
end