diff options
Diffstat (limited to 'lib/pleroma/gun')
-rw-r--r-- | lib/pleroma/gun/api.ex | 2 | ||||
-rw-r--r-- | lib/pleroma/gun/conn.ex | 24 | ||||
-rw-r--r-- | lib/pleroma/gun/connection_pool.ex | 4 | ||||
-rw-r--r-- | lib/pleroma/gun/connection_pool/reclaimer.ex | 10 | ||||
-rw-r--r-- | lib/pleroma/gun/connection_pool/worker.ex | 59 | ||||
-rw-r--r-- | lib/pleroma/gun/connection_pool/worker_supervisor.ex | 4 | ||||
-rw-r--r-- | lib/pleroma/gun/gun.ex | 31 |
7 files changed, 59 insertions, 75 deletions
diff --git a/lib/pleroma/gun/api.ex b/lib/pleroma/gun/api.ex index 09be74392..24d542781 100644 --- a/lib/pleroma/gun/api.ex +++ b/lib/pleroma/gun/api.ex @@ -1,5 +1,5 @@ # Pleroma: A lightweight social networking server -# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> +# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/> # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Gun.API do diff --git a/lib/pleroma/gun/conn.ex b/lib/pleroma/gun/conn.ex index a3f75a4bb..a1210eabf 100644 --- a/lib/pleroma/gun/conn.ex +++ b/lib/pleroma/gun/conn.ex @@ -1,5 +1,5 @@ # Pleroma: A lightweight social networking server -# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> +# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/> # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Gun.Conn do @@ -13,7 +13,7 @@ defmodule Pleroma.Gun.Conn do opts = opts |> Enum.into(%{}) - |> Map.put_new(:await_up_timeout, pool_opts[:await_up_timeout] || 5_000) + |> Map.put_new(:connect_timeout, pool_opts[:connect_timeout] || 5_000) |> Map.put_new(:supervise, false) |> maybe_add_tls_opts(uri) @@ -50,16 +50,14 @@ defmodule Pleroma.Gun.Conn do with open_opts <- Map.delete(opts, :tls_opts), {:ok, conn} <- Gun.open(proxy_host, proxy_port, open_opts), - {:ok, _} <- Gun.await_up(conn, opts[:await_up_timeout]), + {:ok, protocol} <- Gun.await_up(conn, opts[:connect_timeout]), stream <- Gun.connect(conn, connect_opts), {:response, :fin, 200, _} <- Gun.await(conn, stream) do - {:ok, conn} + {:ok, conn, protocol} else error -> Logger.warn( - "Opening proxied connection to #{compose_uri_log(uri)} failed with error #{ - inspect(error) - }" + "Opening proxied connection to #{compose_uri_log(uri)} failed with error #{inspect(error)}" ) error @@ -88,14 +86,12 @@ defmodule Pleroma.Gun.Conn do |> Map.put(:socks_opts, socks_opts) with {:ok, conn} <- Gun.open(proxy_host, proxy_port, opts), - {:ok, _} <- Gun.await_up(conn, opts[:await_up_timeout]) do - {:ok, conn} + {:ok, protocol} <- Gun.await_up(conn, opts[:connect_timeout]) do + {:ok, conn, protocol} else error -> Logger.warn( - "Opening socks proxied connection to #{compose_uri_log(uri)} failed with error #{ - inspect(error) - }" + "Opening socks proxied connection to #{compose_uri_log(uri)} failed with error #{inspect(error)}" ) error @@ -106,8 +102,8 @@ defmodule Pleroma.Gun.Conn do host = Pleroma.HTTP.AdapterHelper.parse_host(host) with {:ok, conn} <- Gun.open(host, port, opts), - {:ok, _} <- Gun.await_up(conn, opts[:await_up_timeout]) do - {:ok, conn} + {:ok, protocol} <- Gun.await_up(conn, opts[:connect_timeout]) do + {:ok, conn, protocol} else error -> Logger.warn( diff --git a/lib/pleroma/gun/connection_pool.ex b/lib/pleroma/gun/connection_pool.ex index f34602b73..f9fd77ade 100644 --- a/lib/pleroma/gun/connection_pool.ex +++ b/lib/pleroma/gun/connection_pool.ex @@ -1,3 +1,7 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + defmodule Pleroma.Gun.ConnectionPool do @registry __MODULE__ diff --git a/lib/pleroma/gun/connection_pool/reclaimer.ex b/lib/pleroma/gun/connection_pool/reclaimer.ex index cea800882..4c643d7cb 100644 --- a/lib/pleroma/gun/connection_pool/reclaimer.ex +++ b/lib/pleroma/gun/connection_pool/reclaimer.ex @@ -1,11 +1,15 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + defmodule Pleroma.Gun.ConnectionPool.Reclaimer do use GenServer, restart: :temporary - @registry Pleroma.Gun.ConnectionPool + defp registry, do: Pleroma.Gun.ConnectionPool def start_monitor do pid = - case :gen_server.start(__MODULE__, [], name: {:via, Registry, {@registry, "reclaimer"}}) do + case :gen_server.start(__MODULE__, [], name: {:via, Registry, {registry(), "reclaimer"}}) do {:ok, pid} -> pid @@ -42,7 +46,7 @@ defmodule Pleroma.Gun.ConnectionPool.Reclaimer do # {worker_pid, crf, last_reference} end) unused_conns = Registry.select( - @registry, + registry(), [ {{:_, :"$1", {:_, :"$2", :"$3", :"$4"}}, [{:==, :"$2", []}], [{{:"$1", :"$3", :"$4"}}]} ] diff --git a/lib/pleroma/gun/connection_pool/worker.ex b/lib/pleroma/gun/connection_pool/worker.ex index c36332817..a3fa75386 100644 --- a/lib/pleroma/gun/connection_pool/worker.ex +++ b/lib/pleroma/gun/connection_pool/worker.ex @@ -1,11 +1,15 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + defmodule Pleroma.Gun.ConnectionPool.Worker do alias Pleroma.Gun use GenServer, restart: :temporary - @registry Pleroma.Gun.ConnectionPool + defp registry, do: Pleroma.Gun.ConnectionPool def start_link([key | _] = opts) do - GenServer.start_link(__MODULE__, opts, name: {:via, Registry, {@registry, key}}) + GenServer.start_link(__MODULE__, opts, name: {:via, Registry, {registry(), key}}) end @impl true @@ -15,20 +19,24 @@ defmodule Pleroma.Gun.ConnectionPool.Worker do @impl true def handle_continue({:connect, [key, uri, opts, client_pid]}, _) do - with {:ok, conn_pid} <- Gun.Conn.open(uri, opts), + with {:ok, conn_pid, protocol} <- Gun.Conn.open(uri, opts), Process.link(conn_pid) do time = :erlang.monotonic_time(:millisecond) {_, _} = - Registry.update_value(@registry, key, fn _ -> + Registry.update_value(registry(), key, fn _ -> {conn_pid, [client_pid], 1, time} end) send(client_pid, {:conn_pid, conn_pid}) {:noreply, - %{key: key, timer: nil, client_monitors: %{client_pid => Process.monitor(client_pid)}}, - :hibernate} + %{ + key: key, + timer: nil, + client_monitors: %{client_pid => Process.monitor(client_pid)}, + protocol: protocol + }, :hibernate} else err -> {:stop, {:shutdown, err}, nil} @@ -53,14 +61,20 @@ defmodule Pleroma.Gun.ConnectionPool.Worker do end @impl true - def handle_call(:add_client, {client_pid, _}, %{key: key} = state) do + def handle_call(:add_client, {client_pid, _}, %{key: key, protocol: protocol} = state) do time = :erlang.monotonic_time(:millisecond) - {{conn_pid, _, _, _}, _} = - Registry.update_value(@registry, key, fn {conn_pid, used_by, crf, last_reference} -> + {{conn_pid, used_by, _, _}, _} = + Registry.update_value(registry(), key, fn {conn_pid, used_by, crf, last_reference} -> {conn_pid, [client_pid | used_by], crf(time - last_reference, crf), time} end) + :telemetry.execute( + [:pleroma, :connection_pool, :client, :add], + %{client_pid: client_pid, clients: used_by}, + %{key: state.key, protocol: protocol} + ) + state = if state.timer != nil do Process.cancel_timer(state[:timer]) @@ -78,30 +92,23 @@ defmodule Pleroma.Gun.ConnectionPool.Worker do @impl true def handle_call(:remove_client, {client_pid, _}, %{key: key} = state) do {{_conn_pid, used_by, _crf, _last_reference}, _} = - Registry.update_value(@registry, key, fn {conn_pid, used_by, crf, last_reference} -> + Registry.update_value(registry(), key, fn {conn_pid, used_by, crf, last_reference} -> {conn_pid, List.delete(used_by, client_pid), crf, last_reference} end) {ref, state} = pop_in(state.client_monitors[client_pid]) - # DOWN message can receive right after `remove_client` call and cause worker to terminate - state = - if is_nil(ref) do - state - else - Process.demonitor(ref) - timer = - if used_by == [] do - max_idle = Pleroma.Config.get([:connections_pool, :max_idle_time], 30_000) - Process.send_after(self(), :idle_close, max_idle) - else - nil - end + Process.demonitor(ref, [:flush]) - %{state | timer: timer} + timer = + if used_by == [] do + max_idle = Pleroma.Config.get([:connections_pool, :max_idle_time], 30_000) + Process.send_after(self(), :idle_close, max_idle) + else + nil end - {:reply, :ok, state, :hibernate} + {:reply, :ok, %{state | timer: timer}, :hibernate} end @impl true @@ -131,7 +138,7 @@ defmodule Pleroma.Gun.ConnectionPool.Worker do @impl true def handle_info({:DOWN, _ref, :process, pid, reason}, state) do :telemetry.execute( - [:pleroma, :connection_pool, :client_death], + [:pleroma, :connection_pool, :client, :dead], %{client_pid: pid, reason: reason}, %{key: state.key} ) diff --git a/lib/pleroma/gun/connection_pool/worker_supervisor.ex b/lib/pleroma/gun/connection_pool/worker_supervisor.ex index 39615c956..016b675f4 100644 --- a/lib/pleroma/gun/connection_pool/worker_supervisor.ex +++ b/lib/pleroma/gun/connection_pool/worker_supervisor.ex @@ -1,3 +1,7 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + defmodule Pleroma.Gun.ConnectionPool.WorkerSupervisor do @moduledoc "Supervisor for pool workers. Does not do anything except enforce max connection limit" diff --git a/lib/pleroma/gun/gun.ex b/lib/pleroma/gun/gun.ex deleted file mode 100644 index 4043e4880..000000000 --- a/lib/pleroma/gun/gun.ex +++ /dev/null @@ -1,31 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Gun do - @callback open(charlist(), pos_integer(), map()) :: {:ok, pid()} - @callback info(pid()) :: map() - @callback close(pid()) :: :ok - @callback await_up(pid, pos_integer()) :: {:ok, atom()} | {:error, atom()} - @callback connect(pid(), map()) :: reference() - @callback await(pid(), reference()) :: {:response, :fin, 200, []} - @callback set_owner(pid(), pid()) :: :ok - - @api Pleroma.Config.get([Pleroma.Gun], Pleroma.Gun.API) - - defp api, do: @api - - def open(host, port, opts), do: api().open(host, port, opts) - - def info(pid), do: api().info(pid) - - def close(pid), do: api().close(pid) - - def await_up(pid, timeout \\ 5_000), do: api().await_up(pid, timeout) - - def connect(pid, opts), do: api().connect(pid, opts) - - def await(pid, ref), do: api().await(pid, ref) - - def set_owner(pid, owner), do: api().set_owner(pid, owner) -end |