aboutsummaryrefslogtreecommitdiff
path: root/lib/pleroma/web/mastodon_api/websocket_handler.ex
diff options
context:
space:
mode:
Diffstat (limited to 'lib/pleroma/web/mastodon_api/websocket_handler.ex')
-rw-r--r--lib/pleroma/web/mastodon_api/websocket_handler.ex95
1 files changed, 38 insertions, 57 deletions
diff --git a/lib/pleroma/web/mastodon_api/websocket_handler.ex b/lib/pleroma/web/mastodon_api/websocket_handler.ex
index e2ffd02d0..94e4595d8 100644
--- a/lib/pleroma/web/mastodon_api/websocket_handler.ex
+++ b/lib/pleroma/web/mastodon_api/websocket_handler.ex
@@ -12,31 +12,19 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
@behaviour :cowboy_websocket
+ # Client ping period.
+ @tick :timer.seconds(30)
# Cowboy timeout period.
- @timeout :timer.seconds(30)
+ @timeout :timer.seconds(60)
# Hibernate every X messages
@hibernate_every 100
- @streams [
- "public",
- "public:local",
- "public:media",
- "public:local:media",
- "user",
- "user:notification",
- "direct",
- "list",
- "hashtag"
- ]
- @anonymous_streams ["public", "public:local", "hashtag"]
-
def init(%{qs: qs} = req, state) do
- with params <- :cow_qs.parse_qs(qs),
+ with params <- Enum.into(:cow_qs.parse_qs(qs), %{}),
sec_websocket <- :cowboy_req.header("sec-websocket-protocol", req, nil),
- access_token <- List.keyfind(params, "access_token", 0),
- {_, stream} <- List.keyfind(params, "stream", 0),
- {:ok, user} <- allow_request(stream, [access_token, sec_websocket]),
- topic when is_binary(topic) <- expand_topic(stream, params) do
+ access_token <- Map.get(params, "access_token"),
+ {:ok, user} <- authenticate_request(access_token, sec_websocket),
+ {:ok, topic} <- Streamer.get_topic(Map.get(params, "stream"), user, params) do
req =
if sec_websocket do
:cowboy_req.set_resp_header("sec-websocket-protocol", sec_websocket, req)
@@ -44,16 +32,17 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
req
end
- {:cowboy_websocket, req, %{user: user, topic: topic, count: 0}, %{idle_timeout: @timeout}}
+ {:cowboy_websocket, req, %{user: user, topic: topic, count: 0, timer: nil},
+ %{idle_timeout: @timeout}}
else
- {:error, code} ->
- Logger.debug("#{__MODULE__} denied connection: #{inspect(code)} - #{inspect(req)}")
- {:ok, req} = :cowboy_req.reply(code, req)
+ {:error, :bad_topic} ->
+ Logger.debug("#{__MODULE__} bad topic #{inspect(req)}")
+ {:ok, req} = :cowboy_req.reply(404, req)
{:ok, req, state}
- error ->
- Logger.debug("#{__MODULE__} denied connection: #{inspect(error)} - #{inspect(req)}")
- {:ok, req} = :cowboy_req.reply(400, req)
+ {:error, :unauthorized} ->
+ Logger.debug("#{__MODULE__} authentication error: #{inspect(req)}")
+ {:ok, req} = :cowboy_req.reply(401, req)
{:ok, req, state}
end
end
@@ -66,11 +55,18 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
)
Streamer.add_socket(state.topic, state.user)
- {:ok, state}
+ {:ok, %{state | timer: timer()}}
+ end
+
+ # Client's Pong frame.
+ def websocket_handle(:pong, state) do
+ if state.timer, do: Process.cancel_timer(state.timer)
+ {:ok, %{state | timer: timer()}}
end
# We never receive messages.
- def websocket_handle(_frame, state) do
+ def websocket_handle(frame, state) do
+ Logger.error("#{__MODULE__} received frame: #{inspect(frame)}")
{:ok, state}
end
@@ -94,6 +90,14 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
end
end
+ # Ping tick. We don't re-queue a timer there, it is instead queued when :pong is received.
+ # As we hibernate there, reset the count to 0.
+ # If the client misses :pong, Cowboy will automatically timeout the connection after
+ # `@idle_timeout`.
+ def websocket_info(:tick, state) do
+ {:reply, :ping, %{state | timer: nil, count: 0}, :hibernate}
+ end
+
def terminate(reason, _req, state) do
Logger.debug(
"#{__MODULE__} terminating websocket connection for user #{
@@ -106,47 +110,24 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
end
# Public streams without authentication.
- defp allow_request(stream, [nil, nil]) when stream in @anonymous_streams do
+ defp authenticate_request(nil, nil) do
{:ok, nil}
end
# Authenticated streams.
- defp allow_request(stream, [access_token, sec_websocket]) when stream in @streams do
- token =
- with {"access_token", token} <- access_token do
- token
- else
- _ -> sec_websocket
- end
+ defp authenticate_request(access_token, sec_websocket) do
+ token = access_token || sec_websocket
with true <- is_bitstring(token),
%Token{user_id: user_id} <- Repo.get_by(Token, token: token),
user = %User{} <- User.get_cached_by_id(user_id) do
{:ok, user}
else
- _ -> {:error, 403}
+ _ -> {:error, :unauthorized}
end
end
- # Not authenticated.
- defp allow_request(stream, _) when stream in @streams, do: {:error, 403}
-
- # No matching stream.
- defp allow_request(_, _), do: {:error, 404}
-
- defp expand_topic("hashtag", params) do
- case List.keyfind(params, "tag", 0) do
- {_, tag} -> "hashtag:#{tag}"
- _ -> nil
- end
- end
-
- defp expand_topic("list", params) do
- case List.keyfind(params, "list", 0) do
- {_, list} -> "list:#{list}"
- _ -> nil
- end
+ defp timer do
+ Process.send_after(self(), :tick, @tick)
end
-
- defp expand_topic(topic, _), do: topic
end