diff options
Diffstat (limited to 'lib/pleroma/gun')
-rw-r--r-- | lib/pleroma/gun/api/api.ex | 26 | ||||
-rw-r--r-- | lib/pleroma/gun/api/gun.ex | 43 | ||||
-rw-r--r-- | lib/pleroma/gun/api/mock.ex | 118 | ||||
-rw-r--r-- | lib/pleroma/gun/conn.ex | 29 | ||||
-rw-r--r-- | lib/pleroma/gun/connections.ex | 304 |
5 files changed, 520 insertions, 0 deletions
diff --git a/lib/pleroma/gun/api/api.ex b/lib/pleroma/gun/api/api.ex new file mode 100644 index 000000000..43ee7f354 --- /dev/null +++ b/lib/pleroma/gun/api/api.ex @@ -0,0 +1,26 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Gun.API do + @callback open(charlist(), pos_integer(), map()) :: {:ok, pid()} + @callback info(pid()) :: map() + @callback close(pid()) :: :ok + @callback await_up(pid) :: {:ok, atom()} | {:error, atom()} + @callback connect(pid(), map()) :: reference() + @callback await(pid(), reference()) :: {:response, :fin, 200, []} + + def open(host, port, opts), do: api().open(host, port, opts) + + def info(pid), do: api().info(pid) + + def close(pid), do: api().close(pid) + + def await_up(pid), do: api().await_up(pid) + + def connect(pid, opts), do: api().connect(pid, opts) + + def await(pid, ref), do: api().await(pid, ref) + + defp api, do: Pleroma.Config.get([Pleroma.Gun.API], Pleroma.Gun.API.Gun) +end diff --git a/lib/pleroma/gun/api/gun.ex b/lib/pleroma/gun/api/gun.ex new file mode 100644 index 000000000..603dd700e --- /dev/null +++ b/lib/pleroma/gun/api/gun.ex @@ -0,0 +1,43 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Gun.API.Gun do + @behaviour Pleroma.Gun.API + + alias Pleroma.Gun.API + + @gun_keys [ + :connect_timeout, + :http_opts, + :http2_opts, + :protocols, + :retry, + :retry_timeout, + :trace, + :transport, + :tls_opts, + :tcp_opts, + :ws_opts + ] + + @impl API + def open(host, port, opts) do + :gun.open(host, port, Map.take(opts, @gun_keys)) + end + + @impl API + def info(pid), do: :gun.info(pid) + + @impl API + def close(pid), do: :gun.close(pid) + + @impl API + def await_up(pid), do: :gun.await_up(pid) + + @impl API + def connect(pid, opts), do: :gun.connect(pid, opts) + + @impl API + def await(pid, ref), do: :gun.await(pid, ref) +end diff --git a/lib/pleroma/gun/api/mock.ex b/lib/pleroma/gun/api/mock.ex new file mode 100644 index 000000000..5e1bb8abc --- /dev/null +++ b/lib/pleroma/gun/api/mock.ex @@ -0,0 +1,118 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Gun.API.Mock do + @behaviour Pleroma.Gun.API + + alias Pleroma.Gun.API + + @impl API + def open(domain, 80, %{genserver_pid: genserver_pid}) + when domain in ['another-domain.com', 'some-domain.com'] do + {:ok, conn_pid} = Task.start_link(fn -> Process.sleep(1_000) end) + + Registry.register(API.Mock, conn_pid, %{ + origin_scheme: "http", + origin_host: domain, + origin_port: 80 + }) + + send(genserver_pid, {:gun_up, conn_pid, :http}) + {:ok, conn_pid} + end + + @impl API + def open('some-domain.com', 443, %{genserver_pid: genserver_pid}) do + {:ok, conn_pid} = Task.start_link(fn -> Process.sleep(1_000) end) + + Registry.register(API.Mock, conn_pid, %{ + origin_scheme: "https", + origin_host: 'some-domain.com', + origin_port: 443 + }) + + send(genserver_pid, {:gun_up, conn_pid, :http2}) + {:ok, conn_pid} + end + + @impl API + def open('gun_down.com', 80, %{genserver_pid: genserver_pid}) do + {:ok, conn_pid} = Task.start_link(fn -> Process.sleep(1_000) end) + + Registry.register(API.Mock, conn_pid, %{ + origin_scheme: "http", + origin_host: 'gun_down.com', + origin_port: 80 + }) + + send(genserver_pid, {:gun_down, conn_pid, :http, nil, nil, nil}) + {:ok, conn_pid} + end + + @impl API + def open('gun_down_and_up.com', 80, %{genserver_pid: genserver_pid}) do + {:ok, conn_pid} = Task.start_link(fn -> Process.sleep(1_000) end) + + Registry.register(API.Mock, conn_pid, %{ + origin_scheme: "http", + origin_host: 'gun_down_and_up.com', + origin_port: 80 + }) + + send(genserver_pid, {:gun_down, conn_pid, :http, nil, nil, nil}) + + {:ok, _} = + Task.start_link(fn -> + Process.sleep(500) + + send(genserver_pid, {:gun_up, conn_pid, :http}) + end) + + {:ok, conn_pid} + end + + @impl API + def open({127, 0, 0, 1}, 8123, _) do + Task.start_link(fn -> Process.sleep(1_000) end) + end + + @impl API + def open('localhost', 9050, _) do + Task.start_link(fn -> Process.sleep(1_000) end) + end + + @impl API + def await_up(_pid) do + {:ok, :http} + end + + @impl API + def connect(pid, %{host: _, port: 80}) do + ref = make_ref() + Registry.register(API.Mock, ref, pid) + ref + end + + @impl API + def connect(pid, %{host: _, port: 443, protocols: [:http2], transport: :tls}) do + ref = make_ref() + Registry.register(API.Mock, ref, pid) + ref + end + + @impl API + def await(pid, ref) do + [{_, ^pid}] = Registry.lookup(API.Mock, ref) + {:response, :fin, 200, []} + end + + @impl API + def info(pid) do + [{_, info}] = Registry.lookup(API.Mock, pid) + info + end + + @impl API + def close(_pid), do: :ok +end diff --git a/lib/pleroma/gun/conn.ex b/lib/pleroma/gun/conn.ex new file mode 100644 index 000000000..906607b28 --- /dev/null +++ b/lib/pleroma/gun/conn.ex @@ -0,0 +1,29 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Gun.Conn do + @moduledoc """ + Struct for gun connection data + """ + @type gun_state :: :open | :up | :down + @type conn_state :: :init | :active | :idle + + @type t :: %__MODULE__{ + conn: pid(), + gun_state: gun_state(), + waiting_pids: [pid()], + conn_state: conn_state(), + used_by: [pid()], + last_reference: pos_integer(), + crf: float() + } + + defstruct conn: nil, + gun_state: :open, + waiting_pids: [], + conn_state: :init, + used_by: [], + last_reference: :os.system_time(:second), + crf: 1 +end diff --git a/lib/pleroma/gun/connections.ex b/lib/pleroma/gun/connections.ex new file mode 100644 index 000000000..e3d392de7 --- /dev/null +++ b/lib/pleroma/gun/connections.ex @@ -0,0 +1,304 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Gun.Connections do + use GenServer + require Logger + + @type domain :: String.t() + @type conn :: Pleroma.Gun.Conn.t() + + @type t :: %__MODULE__{ + conns: %{domain() => conn()}, + opts: keyword() + } + + defstruct conns: %{}, opts: [], queue: [] + + alias Pleroma.Gun.API + alias Pleroma.Gun.Conn + + @spec start_link({atom(), keyword()}) :: {:ok, pid()} | :ignore + def start_link({name, opts}) do + GenServer.start_link(__MODULE__, opts, name: name) + end + + @impl true + def init(opts), do: {:ok, %__MODULE__{conns: %{}, opts: opts}} + + @spec checkin(String.t(), keyword(), atom()) :: pid() + def checkin(url, opts \\ [], name \\ :default) do + opts = Enum.into(opts, %{}) + + uri = URI.parse(url) + + opts = + if uri.scheme == "https" and uri.port != 443, + do: Map.put(opts, :transport, :tls), + else: opts + + opts = + if uri.scheme == "https" do + host = uri.host |> to_charlist() + + tls_opts = + Map.get(opts, :tls_opts, []) + |> Keyword.put(:server_name_indication, host) + + Map.put(opts, :tls_opts, tls_opts) + else + opts + end + + GenServer.call( + name, + {:checkin, %{opts: opts, uri: uri}} + ) + end + + @spec alive?(atom()) :: boolean() + def alive?(name \\ :default) do + pid = Process.whereis(name) + if pid, do: Process.alive?(pid), else: false + end + + @spec get_state(atom()) :: t() + def get_state(name \\ :default) do + GenServer.call(name, {:state}) + end + + def checkout(conn, pid, name \\ :default) do + GenServer.cast(name, {:checkout, conn, pid}) + end + + def process_queue(name \\ :default) do + GenServer.cast(name, {:process_queue}) + end + + @impl true + def handle_cast({:checkout, conn_pid, pid}, state) do + {key, conn} = find_conn(state.conns, conn_pid) + used_by = List.keydelete(conn.used_by, pid, 0) + conn_state = if used_by == [], do: :idle, else: conn.conn_state + state = put_in(state.conns[key], %{conn | conn_state: conn_state, used_by: used_by}) + {:noreply, state} + end + + @impl true + def handle_cast({:process_queue}, state) do + case state.queue do + [{from, key, uri, opts} | _queue] -> + try_to_checkin(key, uri, from, state, Map.put(opts, :from_cast, true)) + + [] -> + {:noreply, state} + end + end + + @impl true + def handle_call({:checkin, %{opts: opts, uri: uri}}, from, state) do + key = compose_key(uri) + + case state.conns[key] do + %{conn: conn, gun_state: gun_state} = current_conn when gun_state == :up -> + time = current_time() + last_reference = time - current_conn.last_reference + + current_crf = crf(last_reference, 100, current_conn.crf) + + state = + put_in(state.conns[key], %{ + current_conn + | last_reference: time, + crf: current_crf, + conn_state: :active, + used_by: [from | current_conn.used_by] + }) + + {:reply, conn, state} + + %{gun_state: gun_state, waiting_pids: pids} when gun_state in [:open, :down] -> + state = put_in(state.conns[key].waiting_pids, [from | pids]) + {:noreply, state} + + nil -> + max_connections = state.opts[:max_connections] + + if Enum.count(state.conns) < max_connections do + open_conn(key, uri, from, state, opts) + else + try_to_checkin(key, uri, from, state, opts) + end + end + end + + @impl true + def handle_call({:state}, _from, state), do: {:reply, state, state} + + defp try_to_checkin(key, uri, from, state, opts) do + unused_conns = + state.conns + |> Enum.filter(fn {_k, v} -> + v.conn_state == :idle and v.waiting_pids == [] and v.used_by == [] + end) + |> Enum.sort(fn {_x_k, x}, {_y_k, y} -> + x.crf < y.crf and x.last_reference < y.last_reference + end) + + case unused_conns do + [{close_key, least_used} | _conns] -> + :ok = API.close(least_used.conn) + + state = + put_in( + state.conns, + Map.delete(state.conns, close_key) + ) + + open_conn(key, uri, from, state, opts) + + [] -> + queue = + if List.keymember?(state.queue, from, 0), + do: state.queue, + else: state.queue ++ [{from, key, uri, opts}] + + state = put_in(state.queue, queue) + {:noreply, state} + end + end + + @impl true + def handle_info({:gun_up, conn_pid, _protocol}, state) do + conn_key = compose_key_gun_info(conn_pid) + {key, conn} = find_conn(state.conns, conn_pid, conn_key) + + # Update state of the current connection and set waiting_pids to empty list + time = current_time() + last_reference = time - conn.last_reference + current_crf = crf(last_reference, 100, conn.crf) + + state = + put_in(state.conns[key], %{ + conn + | gun_state: :up, + waiting_pids: [], + last_reference: time, + crf: current_crf, + conn_state: :active, + used_by: conn.waiting_pids ++ conn.used_by + }) + + # Send to all waiting processes connection pid + Enum.each(conn.waiting_pids, fn waiting_pid -> GenServer.reply(waiting_pid, conn_pid) end) + + {:noreply, state} + end + + @impl true + def handle_info({:gun_down, conn_pid, _protocol, _reason, _killed, _unprocessed}, state) do + # we can't get info on this pid, because pid is dead + {key, conn} = find_conn(state.conns, conn_pid) + + Enum.each(conn.waiting_pids, fn waiting_pid -> GenServer.reply(waiting_pid, nil) end) + + state = put_in(state.conns[key].gun_state, :down) + {:noreply, state} + end + + defp compose_key(uri), do: "#{uri.scheme}:#{uri.host}:#{uri.port}" + + defp compose_key_gun_info(pid) do + info = API.info(pid) + "#{info.origin_scheme}:#{info.origin_host}:#{info.origin_port}" + end + + defp find_conn(conns, conn_pid) do + Enum.find(conns, fn {_key, conn} -> + conn.conn == conn_pid + end) + end + + defp find_conn(conns, conn_pid, conn_key) do + Enum.find(conns, fn {key, conn} -> + key == conn_key and conn.conn == conn_pid + end) + end + + defp open_conn(key, uri, from, state, %{proxy: {proxy_host, proxy_port}} = opts) do + host = to_charlist(uri.host) + port = uri.port + + tls_opts = Map.get(opts, :tls_opts, []) + connect_opts = %{host: host, port: port} + + connect_opts = + if uri.scheme == "https" do + Map.put(connect_opts, :protocols, [:http2]) + |> Map.put(:transport, :tls) + |> Map.put(:tls_opts, tls_opts) + else + connect_opts + end + + with open_opts <- Map.delete(opts, :tls_opts), + {:ok, conn} <- API.open(proxy_host, proxy_port, open_opts), + {:ok, _} <- API.await_up(conn), + stream <- API.connect(conn, connect_opts), + {:response, :fin, 200, _} <- API.await(conn, stream) do + state = + put_in(state.conns[key], %Conn{ + conn: conn, + waiting_pids: [], + gun_state: :up, + conn_state: :active, + used_by: [from] + }) + + if opts[:from_cast] do + GenServer.reply(from, conn) + end + + {:reply, conn, state} + else + error -> + Logger.warn(inspect(error)) + {:reply, nil, state} + end + end + + defp open_conn(key, uri, from, state, opts) do + host = to_charlist(uri.host) + port = uri.port + + with {:ok, conn} <- API.open(host, port, opts) do + state = + if opts[:from_cast] do + put_in(state.queue, List.keydelete(state.queue, from, 0)) + else + state + end + + state = + put_in(state.conns[key], %Conn{ + conn: conn, + waiting_pids: [from] + }) + + {:noreply, state} + else + error -> + Logger.warn(inspect(error)) + {:reply, nil, state} + end + end + + defp current_time do + :os.system_time(:second) + end + + def crf(current, steps, crf) do + 1 + :math.pow(0.5, current / steps) * crf + end +end |