aboutsummaryrefslogtreecommitdiff
path: root/test/support
diff options
context:
space:
mode:
authorSean King <seanking2919@protonmail.com>2022-03-19 23:33:37 -0600
committerSean King <seanking2919@protonmail.com>2022-03-19 23:33:37 -0600
commitd0c1997d483f918b46bdf45cecef82d8aabcb5f1 (patch)
tree8e8ed6c41df907d154985df4ccd1a2bbc4b74289 /test/support
parent2db640632b81fb435456e4b3cc79901d02d7135c (diff)
downloadpleroma-d0c1997d483f918b46bdf45cecef82d8aabcb5f1.tar.gz
Rewrite integration-test websocket client with Mint.WebSocket
Diffstat (limited to 'test/support')
-rw-r--r--test/support/websocket_client.ex193
1 files changed, 166 insertions, 27 deletions
diff --git a/test/support/websocket_client.ex b/test/support/websocket_client.ex
index d149b324e..afcd0e8c7 100644
--- a/test/support/websocket_client.ex
+++ b/test/support/websocket_client.ex
@@ -3,60 +3,199 @@
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Integration.WebsocketClient do
- # https://github.com/phoenixframework/phoenix/blob/master/test/support/websocket_client.exs
+ @moduledoc """
+ A WebSocket client used to test Mastodon API streaming
- @doc """
- Starts the WebSocket server for given ws URL. Received Socket.Message's
- are forwarded to the sender pid
+ Based on Phoenix Framework's WebsocketClient
+ https://github.com/phoenixframework/phoenix/blob/master/test/support/websocket_client.exs
"""
- def start_link(sender, url, headers \\ []) do
- :crypto.start()
- :ssl.start()
- :websocket_client.start_link(
- String.to_charlist(url),
- __MODULE__,
- [sender],
- extra_headers: headers
- )
+ use GenServer
+ import Kernel, except: [send: 2]
+
+ defstruct [
+ :conn,
+ :request_ref,
+ :websocket,
+ :caller,
+ :status,
+ :resp_headers,
+ :sender,
+ closing?: false
+ ]
+
+ @doc """
+ Starts the WebSocket client for given ws URL. `Phoenix.Socket.Message`s
+ received from the server are forwarded to the sender pid.
+ """
+ def connect(sender, url, headers \\ []) do
+ with {:ok, socket} <- GenServer.start_link(__MODULE__, {sender}),
+ {:ok, :connected} <- GenServer.call(socket, {:connect, url, headers}) do
+ {:ok, socket}
+ end
end
@doc """
Closes the socket
"""
def close(socket) do
- send(socket, :close)
+ GenServer.cast(socket, :close)
end
@doc """
Sends a low-level text message to the client.
"""
def send_text(server_pid, msg) do
- send(server_pid, {:text, msg})
+ GenServer.call(server_pid, {:text, msg})
end
@doc false
- def init([sender], _conn_state) do
- {:ok, %{sender: sender}}
+ def init({sender}) do
+ state = %__MODULE__{sender: sender}
+
+ {:ok, state}
end
@doc false
- def websocket_handle(frame, _conn_state, state) do
- send(state.sender, frame)
- {:ok, state}
+ def handle_call({:connect, url, headers}, from, state) do
+ uri = URI.parse(url)
+
+ http_scheme =
+ case uri.scheme do
+ "ws" -> :http
+ "wss" -> :https
+ end
+
+ ws_scheme =
+ case uri.scheme do
+ "ws" -> :ws
+ "wss" -> :wss
+ end
+
+ path =
+ case uri.query do
+ nil -> uri.path
+ query -> uri.path <> "?" <> query
+ end
+
+ with {:ok, conn} <- Mint.HTTP.connect(http_scheme, uri.host, uri.port),
+ {:ok, conn, ref} <- Mint.WebSocket.upgrade(ws_scheme, conn, path, headers) do
+ state = %{state | conn: conn, request_ref: ref, caller: from}
+ {:noreply, state}
+ else
+ {:error, reason} ->
+ {:reply, {:error, reason}, state}
+
+ {:error, conn, reason} ->
+ {:reply, {:error, reason}, put_in(state.conn, conn)}
+ end
end
@doc false
- def websocket_info({:text, msg}, _conn_state, state) do
- {:reply, {:text, msg}, state}
+ def handle_info(message, state) do
+ case Mint.WebSocket.stream(state.conn, message) do
+ {:ok, conn, responses} ->
+ state = put_in(state.conn, conn) |> handle_responses(responses)
+ if state.closing?, do: do_close(state), else: {:noreply, state}
+
+ {:error, conn, reason, _responses} ->
+ state = put_in(state.conn, conn) |> reply({:error, reason})
+ {:noreply, state}
+
+ :unknown ->
+ {:noreply, state}
+ end
end
- def websocket_info(:close, _conn_state, _state) do
- {:close, <<>>, "done"}
+ defp do_close(state) do
+ # Streaming a close frame may fail if the server has already closed
+ # for writing.
+ _ = stream_frame(state, :close)
+ Mint.HTTP.close(state.conn)
+ {:stop, :normal, state}
end
- @doc false
- def websocket_terminate(_reason, _conn_state, _state) do
- :ok
+ defp handle_responses(state, responses)
+
+ defp handle_responses(%{request_ref: ref} = state, [{:status, ref, status} | rest]) do
+ put_in(state.status, status)
+ |> handle_responses(rest)
+ end
+
+ defp handle_responses(%{request_ref: ref} = state, [{:headers, ref, resp_headers} | rest]) do
+ put_in(state.resp_headers, resp_headers)
+ |> handle_responses(rest)
+ end
+
+ defp handle_responses(%{request_ref: ref} = state, [{:done, ref} | rest]) do
+ case Mint.WebSocket.new(state.conn, ref, state.status, state.resp_headers) do
+ {:ok, conn, websocket} ->
+ %{state | conn: conn, websocket: websocket, status: nil, resp_headers: nil}
+ |> reply({:ok, :connected})
+ |> handle_responses(rest)
+
+ {:error, conn, reason} ->
+ put_in(state.conn, conn)
+ |> reply({:error, reason})
+ end
+ end
+
+ defp handle_responses(%{request_ref: ref, websocket: websocket} = state, [
+ {:data, ref, data} | rest
+ ])
+ when websocket != nil do
+ case Mint.WebSocket.decode(websocket, data) do
+ {:ok, websocket, frames} ->
+ put_in(state.websocket, websocket)
+ |> handle_frames(frames)
+ |> handle_responses(rest)
+
+ {:error, websocket, reason} ->
+ put_in(state.websocket, websocket)
+ |> reply({:error, reason})
+ end
+ end
+
+ defp handle_responses(state, [_response | rest]) do
+ handle_responses(state, rest)
+ end
+
+ defp handle_responses(state, []), do: state
+
+ defp handle_frames(state, frames) do
+ {frames, state} =
+ Enum.flat_map_reduce(frames, state, fn
+ # prepare to close the connection when a close frame is received
+ {:close, _code, _data}, state ->
+ {[], put_in(state.closing?, true)}
+
+ frame, state ->
+ {[frame], state}
+ end)
+
+ Enum.each(frames, &Kernel.send(state.sender, &1))
+
+ state
+ end
+
+ defp reply(state, response) do
+ if state.caller, do: GenServer.reply(state.caller, response)
+ put_in(state.caller, nil)
+ end
+
+ # Encodes a frame as a binary and sends it along the wire, keeping `conn`
+ # and `websocket` up to date in `state`.
+ defp stream_frame(state, frame) do
+ with {:ok, websocket, data} <- Mint.WebSocket.encode(state.websocket, frame),
+ state = put_in(state.websocket, websocket),
+ {:ok, conn} <- Mint.WebSocket.stream_request_body(state.conn, state.request_ref, data) do
+ {:ok, put_in(state.conn, conn)}
+ else
+ {:error, %Mint.WebSocket{} = websocket, reason} ->
+ {:error, put_in(state.websocket, websocket), reason}
+
+ {:error, conn, reason} ->
+ {:error, put_in(state.conn, conn), reason}
+ end
end
end