diff options
author | Sean King <seanking2919@protonmail.com> | 2022-03-19 23:33:37 -0600 |
---|---|---|
committer | Sean King <seanking2919@protonmail.com> | 2022-03-19 23:33:37 -0600 |
commit | d0c1997d483f918b46bdf45cecef82d8aabcb5f1 (patch) | |
tree | 8e8ed6c41df907d154985df4ccd1a2bbc4b74289 /test/support | |
parent | 2db640632b81fb435456e4b3cc79901d02d7135c (diff) | |
download | pleroma-d0c1997d483f918b46bdf45cecef82d8aabcb5f1.tar.gz |
Rewrite integration-test websocket client with Mint.WebSocket
Diffstat (limited to 'test/support')
-rw-r--r-- | test/support/websocket_client.ex | 193 |
1 files changed, 166 insertions, 27 deletions
diff --git a/test/support/websocket_client.ex b/test/support/websocket_client.ex index d149b324e..afcd0e8c7 100644 --- a/test/support/websocket_client.ex +++ b/test/support/websocket_client.ex @@ -3,60 +3,199 @@ # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Integration.WebsocketClient do - # https://github.com/phoenixframework/phoenix/blob/master/test/support/websocket_client.exs + @moduledoc """ + A WebSocket client used to test Mastodon API streaming - @doc """ - Starts the WebSocket server for given ws URL. Received Socket.Message's - are forwarded to the sender pid + Based on Phoenix Framework's WebsocketClient + https://github.com/phoenixframework/phoenix/blob/master/test/support/websocket_client.exs """ - def start_link(sender, url, headers \\ []) do - :crypto.start() - :ssl.start() - :websocket_client.start_link( - String.to_charlist(url), - __MODULE__, - [sender], - extra_headers: headers - ) + use GenServer + import Kernel, except: [send: 2] + + defstruct [ + :conn, + :request_ref, + :websocket, + :caller, + :status, + :resp_headers, + :sender, + closing?: false + ] + + @doc """ + Starts the WebSocket client for given ws URL. `Phoenix.Socket.Message`s + received from the server are forwarded to the sender pid. + """ + def connect(sender, url, headers \\ []) do + with {:ok, socket} <- GenServer.start_link(__MODULE__, {sender}), + {:ok, :connected} <- GenServer.call(socket, {:connect, url, headers}) do + {:ok, socket} + end end @doc """ Closes the socket """ def close(socket) do - send(socket, :close) + GenServer.cast(socket, :close) end @doc """ Sends a low-level text message to the client. """ def send_text(server_pid, msg) do - send(server_pid, {:text, msg}) + GenServer.call(server_pid, {:text, msg}) end @doc false - def init([sender], _conn_state) do - {:ok, %{sender: sender}} + def init({sender}) do + state = %__MODULE__{sender: sender} + + {:ok, state} end @doc false - def websocket_handle(frame, _conn_state, state) do - send(state.sender, frame) - {:ok, state} + def handle_call({:connect, url, headers}, from, state) do + uri = URI.parse(url) + + http_scheme = + case uri.scheme do + "ws" -> :http + "wss" -> :https + end + + ws_scheme = + case uri.scheme do + "ws" -> :ws + "wss" -> :wss + end + + path = + case uri.query do + nil -> uri.path + query -> uri.path <> "?" <> query + end + + with {:ok, conn} <- Mint.HTTP.connect(http_scheme, uri.host, uri.port), + {:ok, conn, ref} <- Mint.WebSocket.upgrade(ws_scheme, conn, path, headers) do + state = %{state | conn: conn, request_ref: ref, caller: from} + {:noreply, state} + else + {:error, reason} -> + {:reply, {:error, reason}, state} + + {:error, conn, reason} -> + {:reply, {:error, reason}, put_in(state.conn, conn)} + end end @doc false - def websocket_info({:text, msg}, _conn_state, state) do - {:reply, {:text, msg}, state} + def handle_info(message, state) do + case Mint.WebSocket.stream(state.conn, message) do + {:ok, conn, responses} -> + state = put_in(state.conn, conn) |> handle_responses(responses) + if state.closing?, do: do_close(state), else: {:noreply, state} + + {:error, conn, reason, _responses} -> + state = put_in(state.conn, conn) |> reply({:error, reason}) + {:noreply, state} + + :unknown -> + {:noreply, state} + end end - def websocket_info(:close, _conn_state, _state) do - {:close, <<>>, "done"} + defp do_close(state) do + # Streaming a close frame may fail if the server has already closed + # for writing. + _ = stream_frame(state, :close) + Mint.HTTP.close(state.conn) + {:stop, :normal, state} end - @doc false - def websocket_terminate(_reason, _conn_state, _state) do - :ok + defp handle_responses(state, responses) + + defp handle_responses(%{request_ref: ref} = state, [{:status, ref, status} | rest]) do + put_in(state.status, status) + |> handle_responses(rest) + end + + defp handle_responses(%{request_ref: ref} = state, [{:headers, ref, resp_headers} | rest]) do + put_in(state.resp_headers, resp_headers) + |> handle_responses(rest) + end + + defp handle_responses(%{request_ref: ref} = state, [{:done, ref} | rest]) do + case Mint.WebSocket.new(state.conn, ref, state.status, state.resp_headers) do + {:ok, conn, websocket} -> + %{state | conn: conn, websocket: websocket, status: nil, resp_headers: nil} + |> reply({:ok, :connected}) + |> handle_responses(rest) + + {:error, conn, reason} -> + put_in(state.conn, conn) + |> reply({:error, reason}) + end + end + + defp handle_responses(%{request_ref: ref, websocket: websocket} = state, [ + {:data, ref, data} | rest + ]) + when websocket != nil do + case Mint.WebSocket.decode(websocket, data) do + {:ok, websocket, frames} -> + put_in(state.websocket, websocket) + |> handle_frames(frames) + |> handle_responses(rest) + + {:error, websocket, reason} -> + put_in(state.websocket, websocket) + |> reply({:error, reason}) + end + end + + defp handle_responses(state, [_response | rest]) do + handle_responses(state, rest) + end + + defp handle_responses(state, []), do: state + + defp handle_frames(state, frames) do + {frames, state} = + Enum.flat_map_reduce(frames, state, fn + # prepare to close the connection when a close frame is received + {:close, _code, _data}, state -> + {[], put_in(state.closing?, true)} + + frame, state -> + {[frame], state} + end) + + Enum.each(frames, &Kernel.send(state.sender, &1)) + + state + end + + defp reply(state, response) do + if state.caller, do: GenServer.reply(state.caller, response) + put_in(state.caller, nil) + end + + # Encodes a frame as a binary and sends it along the wire, keeping `conn` + # and `websocket` up to date in `state`. + defp stream_frame(state, frame) do + with {:ok, websocket, data} <- Mint.WebSocket.encode(state.websocket, frame), + state = put_in(state.websocket, websocket), + {:ok, conn} <- Mint.WebSocket.stream_request_body(state.conn, state.request_ref, data) do + {:ok, put_in(state.conn, conn)} + else + {:error, %Mint.WebSocket{} = websocket, reason} -> + {:error, put_in(state.websocket, websocket), reason} + + {:error, conn, reason} -> + {:error, put_in(state.conn, conn), reason} + end end end |