aboutsummaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/pleroma/application.ex10
-rw-r--r--lib/pleroma/gun/api.ex3
-rw-r--r--lib/pleroma/gun/conn.ex85
-rw-r--r--lib/pleroma/gun/connection_pool.ex79
-rw-r--r--lib/pleroma/gun/connection_pool/reclaimer.ex85
-rw-r--r--lib/pleroma/gun/connection_pool/worker.ex127
-rw-r--r--lib/pleroma/gun/connection_pool/worker_supervisor.ex45
-rw-r--r--lib/pleroma/http/adapter_helper.ex129
-rw-r--r--lib/pleroma/http/adapter_helper/default.ex14
-rw-r--r--lib/pleroma/http/adapter_helper/gun.ex63
-rw-r--r--lib/pleroma/http/adapter_helper/hackney.ex3
-rw-r--r--lib/pleroma/http/connection.ex124
-rw-r--r--lib/pleroma/http/http.ex84
-rw-r--r--lib/pleroma/pool/connections.ex283
-rw-r--r--lib/pleroma/pool/pool.ex22
-rw-r--r--lib/pleroma/pool/request.ex65
-rw-r--r--lib/pleroma/pool/supervisor.ex42
-rw-r--r--lib/pleroma/reverse_proxy/client/tesla.ex2
-rw-r--r--lib/pleroma/telemetry/logger.ex76
-rw-r--r--lib/pleroma/tesla/middleware/follow_redirects.ex110
-rw-r--r--lib/pleroma/web/api_spec/operations/account_operation.ex2
-rw-r--r--lib/pleroma/web/mastodon_api/views/instance_view.ex3
-rw-r--r--lib/pleroma/web/templates/o_auth/mfa/recovery.html.eex2
-rw-r--r--lib/pleroma/web/templates/o_auth/mfa/totp.html.eex2
24 files changed, 750 insertions, 710 deletions
diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex
index 956d2f4a2..1f86ab394 100644
--- a/lib/pleroma/application.ex
+++ b/lib/pleroma/application.ex
@@ -41,6 +41,7 @@ defmodule Pleroma.Application do
# every time the application is restarted, so we disable module
# conflicts at runtime
Code.compiler_options(ignore_module_conflict: true)
+ Pleroma.Telemetry.Logger.attach()
Config.Holder.save_default()
Pleroma.HTML.compile_scrubbers()
Config.DeprecationWarnings.warn()
@@ -225,9 +226,7 @@ defmodule Pleroma.Application do
# start hackney and gun pools in tests
defp http_children(_, :test) do
- hackney_options = Config.get([:hackney_pools, :federation])
- hackney_pool = :hackney_pool.child_spec(:federation, hackney_options)
- [hackney_pool, Pleroma.Pool.Supervisor]
+ http_children(Tesla.Adapter.Hackney, nil) ++ http_children(Tesla.Adapter.Gun, nil)
end
defp http_children(Tesla.Adapter.Hackney, _) do
@@ -246,7 +245,10 @@ defmodule Pleroma.Application do
end
end
- defp http_children(Tesla.Adapter.Gun, _), do: [Pleroma.Pool.Supervisor]
+ defp http_children(Tesla.Adapter.Gun, _) do
+ Pleroma.Gun.ConnectionPool.children() ++
+ [{Task, &Pleroma.HTTP.AdapterHelper.Gun.limiter_setup/0}]
+ end
defp http_children(_, _), do: []
end
diff --git a/lib/pleroma/gun/api.ex b/lib/pleroma/gun/api.ex
index f51cd7db8..09be74392 100644
--- a/lib/pleroma/gun/api.ex
+++ b/lib/pleroma/gun/api.ex
@@ -19,7 +19,8 @@ defmodule Pleroma.Gun.API do
:tls_opts,
:tcp_opts,
:socks_opts,
- :ws_opts
+ :ws_opts,
+ :supervise
]
@impl Gun
diff --git a/lib/pleroma/gun/conn.ex b/lib/pleroma/gun/conn.ex
index cd25a2e74..a3f75a4bb 100644
--- a/lib/pleroma/gun/conn.ex
+++ b/lib/pleroma/gun/conn.ex
@@ -3,85 +3,33 @@
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Gun.Conn do
- @moduledoc """
- Struct for gun connection data
- """
alias Pleroma.Gun
- alias Pleroma.Pool.Connections
require Logger
- @type gun_state :: :up | :down
- @type conn_state :: :active | :idle
-
- @type t :: %__MODULE__{
- conn: pid(),
- gun_state: gun_state(),
- conn_state: conn_state(),
- used_by: [pid()],
- last_reference: pos_integer(),
- crf: float(),
- retries: pos_integer()
- }
-
- defstruct conn: nil,
- gun_state: :open,
- conn_state: :init,
- used_by: [],
- last_reference: 0,
- crf: 1,
- retries: 0
-
- @spec open(String.t() | URI.t(), atom(), keyword()) :: :ok | nil
- def open(url, name, opts \\ [])
- def open(url, name, opts) when is_binary(url), do: open(URI.parse(url), name, opts)
-
- def open(%URI{} = uri, name, opts) do
+ def open(%URI{} = uri, opts) do
pool_opts = Pleroma.Config.get([:connections_pool], [])
opts =
opts
|> Enum.into(%{})
- |> Map.put_new(:retry, pool_opts[:retry] || 1)
- |> Map.put_new(:retry_timeout, pool_opts[:retry_timeout] || 1000)
|> Map.put_new(:await_up_timeout, pool_opts[:await_up_timeout] || 5_000)
+ |> Map.put_new(:supervise, false)
|> maybe_add_tls_opts(uri)
- key = "#{uri.scheme}:#{uri.host}:#{uri.port}"
-
- max_connections = pool_opts[:max_connections] || 250
-
- conn_pid =
- if Connections.count(name) < max_connections do
- do_open(uri, opts)
- else
- close_least_used_and_do_open(name, uri, opts)
- end
-
- if is_pid(conn_pid) do
- conn = %Pleroma.Gun.Conn{
- conn: conn_pid,
- gun_state: :up,
- conn_state: :active,
- last_reference: :os.system_time(:second)
- }
-
- :ok = Gun.set_owner(conn_pid, Process.whereis(name))
- Connections.add_conn(name, key, conn)
- end
+ do_open(uri, opts)
end
defp maybe_add_tls_opts(opts, %URI{scheme: "http"}), do: opts
- defp maybe_add_tls_opts(opts, %URI{scheme: "https", host: host}) do
+ defp maybe_add_tls_opts(opts, %URI{scheme: "https"}) do
tls_opts = [
verify: :verify_peer,
cacertfile: CAStore.file_path(),
depth: 20,
reuse_sessions: false,
- verify_fun:
- {&:ssl_verify_hostname.verify_fun/3,
- [check_hostname: Pleroma.HTTP.Connection.format_host(host)]}
+ log_level: :warning,
+ customize_hostname_check: [match_fun: :public_key.pkix_verify_hostname_match_fun(:https)]
]
tls_opts =
@@ -105,7 +53,7 @@ defmodule Pleroma.Gun.Conn do
{:ok, _} <- Gun.await_up(conn, opts[:await_up_timeout]),
stream <- Gun.connect(conn, connect_opts),
{:response, :fin, 200, _} <- Gun.await(conn, stream) do
- conn
+ {:ok, conn}
else
error ->
Logger.warn(
@@ -141,7 +89,7 @@ defmodule Pleroma.Gun.Conn do
with {:ok, conn} <- Gun.open(proxy_host, proxy_port, opts),
{:ok, _} <- Gun.await_up(conn, opts[:await_up_timeout]) do
- conn
+ {:ok, conn}
else
error ->
Logger.warn(
@@ -155,11 +103,11 @@ defmodule Pleroma.Gun.Conn do
end
defp do_open(%URI{host: host, port: port} = uri, opts) do
- host = Pleroma.HTTP.Connection.parse_host(host)
+ host = Pleroma.HTTP.AdapterHelper.parse_host(host)
with {:ok, conn} <- Gun.open(host, port, opts),
{:ok, _} <- Gun.await_up(conn, opts[:await_up_timeout]) do
- conn
+ {:ok, conn}
else
error ->
Logger.warn(
@@ -171,7 +119,7 @@ defmodule Pleroma.Gun.Conn do
end
defp destination_opts(%URI{host: host, port: port}) do
- host = Pleroma.HTTP.Connection.parse_host(host)
+ host = Pleroma.HTTP.AdapterHelper.parse_host(host)
%{host: host, port: port}
end
@@ -181,17 +129,6 @@ defmodule Pleroma.Gun.Conn do
defp add_http2_opts(opts, _, _), do: opts
- defp close_least_used_and_do_open(name, uri, opts) do
- with [{key, conn} | _conns] <- Connections.get_unused_conns(name),
- :ok <- Gun.close(conn.conn) do
- Connections.remove_conn(name, key)
-
- do_open(uri, opts)
- else
- [] -> {:error, :pool_overflowed}
- end
- end
-
def compose_uri_log(%URI{scheme: scheme, host: host, path: path}) do
"#{scheme}://#{host}#{path}"
end
diff --git a/lib/pleroma/gun/connection_pool.ex b/lib/pleroma/gun/connection_pool.ex
new file mode 100644
index 000000000..8b41a668c
--- /dev/null
+++ b/lib/pleroma/gun/connection_pool.ex
@@ -0,0 +1,79 @@
+defmodule Pleroma.Gun.ConnectionPool do
+ @registry __MODULE__
+
+ alias Pleroma.Gun.ConnectionPool.WorkerSupervisor
+
+ def children do
+ [
+ {Registry, keys: :unique, name: @registry},
+ Pleroma.Gun.ConnectionPool.WorkerSupervisor
+ ]
+ end
+
+ def get_conn(uri, opts) do
+ key = "#{uri.scheme}:#{uri.host}:#{uri.port}"
+
+ case Registry.lookup(@registry, key) do
+ # The key has already been registered, but connection is not up yet
+ [{worker_pid, nil}] ->
+ get_gun_pid_from_worker(worker_pid, true)
+
+ [{worker_pid, {gun_pid, _used_by, _crf, _last_reference}}] ->
+ GenServer.cast(worker_pid, {:add_client, self(), false})
+ {:ok, gun_pid}
+
+ [] ->
+ # :gun.set_owner fails in :connected state for whatevever reason,
+ # so we open the connection in the process directly and send it's pid back
+ # We trust gun to handle timeouts by itself
+ case WorkerSupervisor.start_worker([key, uri, opts, self()]) do
+ {:ok, worker_pid} ->
+ get_gun_pid_from_worker(worker_pid, false)
+
+ {:error, {:already_started, worker_pid}} ->
+ get_gun_pid_from_worker(worker_pid, true)
+
+ err ->
+ err
+ end
+ end
+ end
+
+ defp get_gun_pid_from_worker(worker_pid, register) do
+ # GenServer.call will block the process for timeout length if
+ # the server crashes on startup (which will happen if gun fails to connect)
+ # so instead we use cast + monitor
+
+ ref = Process.monitor(worker_pid)
+ if register, do: GenServer.cast(worker_pid, {:add_client, self(), true})
+
+ receive do
+ {:conn_pid, pid} ->
+ Process.demonitor(ref)
+ {:ok, pid}
+
+ {:DOWN, ^ref, :process, ^worker_pid, reason} ->
+ case reason do
+ {:shutdown, error} -> error
+ _ -> {:error, reason}
+ end
+ end
+ end
+
+ def release_conn(conn_pid) do
+ # :ets.fun2ms(fn {_, {worker_pid, {gun_pid, _, _, _}}} when gun_pid == conn_pid ->
+ # worker_pid end)
+ query_result =
+ Registry.select(@registry, [
+ {{:_, :"$1", {:"$2", :_, :_, :_}}, [{:==, :"$2", conn_pid}], [:"$1"]}
+ ])
+
+ case query_result do
+ [worker_pid] ->
+ GenServer.cast(worker_pid, {:remove_client, self()})
+
+ [] ->
+ :ok
+ end
+ end
+end
diff --git a/lib/pleroma/gun/connection_pool/reclaimer.ex b/lib/pleroma/gun/connection_pool/reclaimer.ex
new file mode 100644
index 000000000..cea800882
--- /dev/null
+++ b/lib/pleroma/gun/connection_pool/reclaimer.ex
@@ -0,0 +1,85 @@
+defmodule Pleroma.Gun.ConnectionPool.Reclaimer do
+ use GenServer, restart: :temporary
+
+ @registry Pleroma.Gun.ConnectionPool
+
+ def start_monitor do
+ pid =
+ case :gen_server.start(__MODULE__, [], name: {:via, Registry, {@registry, "reclaimer"}}) do
+ {:ok, pid} ->
+ pid
+
+ {:error, {:already_registered, pid}} ->
+ pid
+ end
+
+ {pid, Process.monitor(pid)}
+ end
+
+ @impl true
+ def init(_) do
+ {:ok, nil, {:continue, :reclaim}}
+ end
+
+ @impl true
+ def handle_continue(:reclaim, _) do
+ max_connections = Pleroma.Config.get([:connections_pool, :max_connections])
+
+ reclaim_max =
+ [:connections_pool, :reclaim_multiplier]
+ |> Pleroma.Config.get()
+ |> Kernel.*(max_connections)
+ |> round
+ |> max(1)
+
+ :telemetry.execute([:pleroma, :connection_pool, :reclaim, :start], %{}, %{
+ max_connections: max_connections,
+ reclaim_max: reclaim_max
+ })
+
+ # :ets.fun2ms(
+ # fn {_, {worker_pid, {_, used_by, crf, last_reference}}} when used_by == [] ->
+ # {worker_pid, crf, last_reference} end)
+ unused_conns =
+ Registry.select(
+ @registry,
+ [
+ {{:_, :"$1", {:_, :"$2", :"$3", :"$4"}}, [{:==, :"$2", []}], [{{:"$1", :"$3", :"$4"}}]}
+ ]
+ )
+
+ case unused_conns do
+ [] ->
+ :telemetry.execute(
+ [:pleroma, :connection_pool, :reclaim, :stop],
+ %{reclaimed_count: 0},
+ %{
+ max_connections: max_connections
+ }
+ )
+
+ {:stop, :no_unused_conns, nil}
+
+ unused_conns ->
+ reclaimed =
+ unused_conns
+ |> Enum.sort(fn {_pid1, crf1, last_reference1}, {_pid2, crf2, last_reference2} ->
+ crf1 <= crf2 and last_reference1 <= last_reference2
+ end)
+ |> Enum.take(reclaim_max)
+
+ reclaimed
+ |> Enum.each(fn {pid, _, _} ->
+ DynamicSupervisor.terminate_child(Pleroma.Gun.ConnectionPool.WorkerSupervisor, pid)
+ end)
+
+ :telemetry.execute(
+ [:pleroma, :connection_pool, :reclaim, :stop],
+ %{reclaimed_count: Enum.count(reclaimed)},
+ %{max_connections: max_connections}
+ )
+
+ {:stop, :normal, nil}
+ end
+ end
+end
diff --git a/lib/pleroma/gun/connection_pool/worker.ex b/lib/pleroma/gun/connection_pool/worker.ex
new file mode 100644
index 000000000..f33447cb6
--- /dev/null
+++ b/lib/pleroma/gun/connection_pool/worker.ex
@@ -0,0 +1,127 @@
+defmodule Pleroma.Gun.ConnectionPool.Worker do
+ alias Pleroma.Gun
+ use GenServer, restart: :temporary
+
+ @registry Pleroma.Gun.ConnectionPool
+
+ def start_link([key | _] = opts) do
+ GenServer.start_link(__MODULE__, opts, name: {:via, Registry, {@registry, key}})
+ end
+
+ @impl true
+ def init([_key, _uri, _opts, _client_pid] = opts) do
+ {:ok, nil, {:continue, {:connect, opts}}}
+ end
+
+ @impl true
+ def handle_continue({:connect, [key, uri, opts, client_pid]}, _) do
+ with {:ok, conn_pid} <- Gun.Conn.open(uri, opts),
+ Process.link(conn_pid) do
+ time = :erlang.monotonic_time(:millisecond)
+
+ {_, _} =
+ Registry.update_value(@registry, key, fn _ ->
+ {conn_pid, [client_pid], 1, time}
+ end)
+
+ send(client_pid, {:conn_pid, conn_pid})
+
+ {:noreply,
+ %{key: key, timer: nil, client_monitors: %{client_pid => Process.monitor(client_pid)}},
+ :hibernate}
+ else
+ err ->
+ {:stop, {:shutdown, err}, nil}
+ end
+ end
+
+ @impl true
+ def handle_cast({:add_client, client_pid, send_pid_back}, %{key: key} = state) do
+ time = :erlang.monotonic_time(:millisecond)
+
+ {{conn_pid, _, _, _}, _} =
+ Registry.update_value(@registry, key, fn {conn_pid, used_by, crf, last_reference} ->
+ {conn_pid, [client_pid | used_by], crf(time - last_reference, crf), time}
+ end)
+
+ if send_pid_back, do: send(client_pid, {:conn_pid, conn_pid})
+
+ state =
+ if state.timer != nil do
+ Process.cancel_timer(state[:timer])
+ %{state | timer: nil}
+ else
+ state
+ end
+
+ ref = Process.monitor(client_pid)
+
+ state = put_in(state.client_monitors[client_pid], ref)
+ {:noreply, state, :hibernate}
+ end
+
+ @impl true
+ def handle_cast({:remove_client, client_pid}, %{key: key} = state) do
+ {{_conn_pid, used_by, _crf, _last_reference}, _} =
+ Registry.update_value(@registry, key, fn {conn_pid, used_by, crf, last_reference} ->
+ {conn_pid, List.delete(used_by, client_pid), crf, last_reference}
+ end)
+
+ {ref, state} = pop_in(state.client_monitors[client_pid])
+ Process.demonitor(ref)
+
+ timer =
+ if used_by == [] do
+ max_idle = Pleroma.Config.get([:connections_pool, :max_idle_time], 30_000)
+ Process.send_after(self(), :idle_close, max_idle)
+ else
+ nil
+ end
+
+ {:noreply, %{state | timer: timer}, :hibernate}
+ end
+
+ @impl true
+ def handle_info(:idle_close, state) do
+ # Gun monitors the owner process, and will close the connection automatically
+ # when it's terminated
+ {:stop, :normal, state}
+ end
+
+ # Gracefully shutdown if the connection got closed without any streams left
+ @impl true
+ def handle_info({:gun_down, _pid, _protocol, _reason, []}, state) do
+ {:stop, :normal, state}
+ end
+
+ # Otherwise, shutdown with an error
+ @impl true
+ def handle_info({:gun_down, _pid, _protocol, _reason, _killed_streams} = down_message, state) do
+ {:stop, {:error, down_message}, state}
+ end
+
+ @impl true
+ def handle_info({:DOWN, _ref, :process, pid, reason}, state) do
+ # Sometimes the client is dead before we demonitor it in :remove_client, so the message
+ # arrives anyway
+
+ case state.client_monitors[pid] do
+ nil ->
+ {:noreply, state, :hibernate}
+
+ _ref ->
+ :telemetry.execute(
+ [:pleroma, :connection_pool, :client_death],
+ %{client_pid: pid, reason: reason},
+ %{key: state.key}
+ )
+
+ handle_cast({:remove_client, pid}, state)
+ end
+ end
+
+ # LRFU policy: https://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.55.1478
+ defp crf(time_delta, prev_crf) do
+ 1 + :math.pow(0.5, 0.0001 * time_delta) * prev_crf
+ end
+end
diff --git a/lib/pleroma/gun/connection_pool/worker_supervisor.ex b/lib/pleroma/gun/connection_pool/worker_supervisor.ex
new file mode 100644
index 000000000..39615c956
--- /dev/null
+++ b/lib/pleroma/gun/connection_pool/worker_supervisor.ex
@@ -0,0 +1,45 @@
+defmodule Pleroma.Gun.ConnectionPool.WorkerSupervisor do
+ @moduledoc "Supervisor for pool workers. Does not do anything except enforce max connection limit"
+
+ use DynamicSupervisor
+
+ def start_link(opts) do
+ DynamicSupervisor.start_link(__MODULE__, opts, name: __MODULE__)
+ end
+
+ def init(_opts) do
+ DynamicSupervisor.init(
+ strategy: :one_for_one,
+ max_children: Pleroma.Config.get([:connections_pool, :max_connections])
+ )
+ end
+
+ def start_worker(opts, retry \\ false) do
+ case DynamicSupervisor.start_child(__MODULE__, {Pleroma.Gun.ConnectionPool.Worker, opts}) do
+ {:error, :max_children} ->
+ if retry or free_pool() == :error do
+ :telemetry.execute([:pleroma, :connection_pool, :provision_failure], %{opts: opts})
+ {:error, :pool_full}
+ else
+ start_worker(opts, true)
+ end
+
+ res ->
+ res
+ end
+ end
+
+ defp free_pool do
+ wait_for_reclaimer_finish(Pleroma.Gun.ConnectionPool.Reclaimer.start_monitor())
+ end
+
+ defp wait_for_reclaimer_finish({pid, mon}) do
+ receive do
+ {:DOWN, ^mon, :process, ^pid, :no_unused_conns} ->
+ :error
+
+ {:DOWN, ^mon, :process, ^pid, :normal} ->
+ :ok
+ end
+ end
+end
diff --git a/lib/pleroma/http/adapter_helper.ex b/lib/pleroma/http/adapter_helper.ex
index 510722ff9..9ec3836b0 100644
--- a/lib/pleroma/http/adapter_helper.ex
+++ b/lib/pleroma/http/adapter_helper.ex
@@ -3,32 +3,30 @@
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.HTTP.AdapterHelper do
- alias Pleroma.HTTP.Connection
+ @moduledoc """
+ Configure Tesla.Client with default and customized adapter options.
+ """
+ @defaults [pool: :federation]
+
+ @type proxy_type() :: :socks4 | :socks5
+ @type host() :: charlist() | :inet.ip_address()
+
+ alias Pleroma.Config
+ alias Pleroma.HTTP.AdapterHelper
+ require Logger
@type proxy ::
{Connection.host(), pos_integer()}
| {Connection.proxy_type(), Connection.host(), pos_integer()}
@callback options(keyword(), URI.t()) :: keyword()
- @callback after_request(keyword()) :: :ok
-
- @spec options(keyword(), URI.t()) :: keyword()
- def options(opts, _uri) do
- proxy = Pleroma.Config.get([:http, :proxy_url], nil)
- maybe_add_proxy(opts, format_proxy(proxy))
- end
-
- @spec maybe_get_conn(URI.t(), keyword()) :: keyword()
- def maybe_get_conn(_uri, opts), do: opts
-
- @spec after_request(keyword()) :: :ok
- def after_request(_opts), do: :ok
+ @callback get_conn(URI.t(), keyword()) :: {:ok, term()} | {:error, term()}
@spec format_proxy(String.t() | tuple() | nil) :: proxy() | nil
def format_proxy(nil), do: nil
def format_proxy(proxy_url) do
- case Connection.parse_proxy(proxy_url) do
+ case parse_proxy(proxy_url) do
{:ok, host, port} -> {host, port}
{:ok, type, host, port} -> {type, host, port}
_ -> nil
@@ -38,4 +36,105 @@ defmodule Pleroma.HTTP.AdapterHelper do
@spec maybe_add_proxy(keyword(), proxy() | nil) :: keyword()
def maybe_add_proxy(opts, nil), do: opts
def maybe_add_proxy(opts, proxy), do: Keyword.put_new(opts, :proxy, proxy)
+
+ @doc """
+ Merge default connection & adapter options with received ones.
+ """
+
+ @spec options(URI.t(), keyword()) :: keyword()
+ def options(%URI{} = uri, opts \\ []) do
+ @defaults
+ |> put_timeout()
+ |> Keyword.merge(opts)
+ |> adapter_helper().options(uri)
+ end
+
+ # For Hackney, this is the time a connection can stay idle in the pool.
+ # For Gun, this is the timeout to receive a message from Gun.
+ defp put_timeout(opts) do
+ {config_key, default} =
+ if adapter() == Tesla.Adapter.Gun do
+ {:pools, Config.get([:pools, :default, :timeout], 5_000)}
+ else
+ {:hackney_pools, 10_000}
+ end
+
+ timeout = Config.get([config_key, opts[:pool], :timeout], default)
+
+ Keyword.merge(opts, timeout: timeout)
+ end
+
+ def get_conn(uri, opts), do: adapter_helper().get_conn(uri, opts)
+ defp adapter, do: Application.get_env(:tesla, :adapter)
+
+ defp adapter_helper do
+ case adapter() do
+ Tesla.Adapter.Gun -> AdapterHelper.Gun
+ Tesla.Adapter.Hackney -> AdapterHelper.Hackney
+ _ -> AdapterHelper.Default
+ end
+ end
+
+ @spec parse_proxy(String.t() | tuple() | nil) ::
+ {:ok, host(), pos_integer()}
+ | {:ok, proxy_type(), host(), pos_integer()}
+ | {:error, atom()}
+ | nil
+
+ def parse_proxy(nil), do: nil
+
+ def parse_proxy(proxy) when is_binary(proxy) do
+ with [host, port] <- String.split(proxy, ":"),
+ {port, ""} <- Integer.parse(port) do
+ {:ok, parse_host(host), port}
+ else
+ {_, _} ->
+ Logger.warn("Parsing port failed #{inspect(proxy)}")
+ {:error, :invalid_proxy_port}
+
+ :error ->
+ Logger.warn("Parsing port failed #{inspect(proxy)}")
+ {:error, :invalid_proxy_port}
+
+ _ ->
+ Logger.warn("Parsing proxy failed #{inspect(proxy)}")
+ {:error, :invalid_proxy}
+ end
+ end
+
+ def parse_proxy(proxy) when is_tuple(proxy) do
+ with {type, host, port} <- proxy do
+ {:ok, type, parse_host(host), port}
+ else
+ _ ->
+ Logger.warn("Parsing proxy failed #{inspect(proxy)}")
+ {:error, :invalid_proxy}
+ end
+ end
+
+ @spec parse_host(String.t() | atom() | charlist()) :: charlist() | :inet.ip_address()
+ def parse_host(host) when is_list(host), do: host
+ def parse_host(host) when is_atom(host), do: to_charlist(host)
+
+ def parse_host(host) when is_binary(host) do
+ host = to_charlist(host)
+
+ case :inet.parse_address(host) do
+ {:error, :einval} -> host
+ {:ok, ip} -> ip
+ end
+ end
+
+ @spec format_host(String.t()) :: charlist()
+ def format_host(host) do
+ host_charlist = to_charlist(host)
+
+ case :inet.parse_address(host_charlist) do
+ {:error, :einval} ->
+ :idna.encode(host_charlist)
+
+ {:ok, _ip} ->
+ host_charlist
+ end
+ end
end
diff --git a/lib/pleroma/http/adapter_helper/default.ex b/lib/pleroma/http/adapter_helper/default.ex
new file mode 100644
index 000000000..e13441316
--- /dev/null
+++ b/lib/pleroma/http/adapter_helper/default.ex
@@ -0,0 +1,14 @@
+defmodule Pleroma.HTTP.AdapterHelper.Default do
+ alias Pleroma.HTTP.AdapterHelper
+
+ @behaviour Pleroma.HTTP.AdapterHelper
+
+ @spec options(keyword(), URI.t()) :: keyword()
+ def options(opts, _uri) do
+ proxy = Pleroma.Config.get([:http, :proxy_url], nil)
+ AdapterHelper.maybe_add_proxy(opts, AdapterHelper.format_proxy(proxy))
+ end
+
+ @spec get_conn(URI.t(), keyword()) :: {:ok, keyword()}
+ def get_conn(_uri, opts), do: {:ok, opts}
+end
diff --git a/lib/pleroma/http/adapter_helper/gun.ex b/lib/pleroma/http/adapter_helper/gun.ex
index ead7cdc6b..b4ff8306c 100644
--- a/lib/pleroma/http/adapter_helper/gun.ex
+++ b/lib/pleroma/http/adapter_helper/gun.ex
@@ -5,8 +5,8 @@
defmodule Pleroma.HTTP.AdapterHelper.Gun do
@behaviour Pleroma.HTTP.AdapterHelper
+ alias Pleroma.Gun.ConnectionPool
alias Pleroma.HTTP.AdapterHelper
- alias Pleroma.Pool.Connections
require Logger
@@ -14,7 +14,7 @@ defmodule Pleroma.HTTP.AdapterHelper.Gun do
connect_timeout: 5_000,
domain_lookup_timeout: 5_000,
tls_handshake_timeout: 5_000,
- retry: 1,
+ retry: 0,
retry_timeout: 1000,
await_up_timeout: 5_000
]
@@ -31,16 +31,7 @@ defmodule Pleroma.HTTP.AdapterHelper.Gun do
|> Keyword.merge(config_opts)
|> add_scheme_opts(uri)
|> AdapterHelper.maybe_add_proxy(proxy)
- |> maybe_get_conn(uri, incoming_opts)
- end
-
- @spec after_request(keyword()) :: :ok
- def after_request(opts) do
- if opts[:conn] && opts[:body_as] != :chunks do
- Connections.checkout(opts[:conn], self(), :gun_connections)
- end
-
- :ok
+ |> Keyword.merge(incoming_opts)
end
defp add_scheme_opts(opts, %{scheme: "http"}), do: opts
@@ -48,30 +39,40 @@ defmodule Pleroma.HTTP.AdapterHelper.Gun do
defp add_scheme_opts(opts, %{scheme: "https"}) do
opts
|> Keyword.put(:certificates_verification, true)
- |> Keyword.put(:tls_opts, log_level: :warning)
end
- defp maybe_get_conn(adapter_opts, uri, incoming_opts) do
- {receive_conn?, opts} =
- adapter_opts
- |> Keyword.merge(incoming_opts)
- |> Keyword.pop(:receive_conn, true)
-
- if Connections.alive?(:gun_connections) and receive_conn? do
- checkin_conn(uri, opts)
- else
- opts
+ @spec get_conn(URI.t(), keyword()) :: {:ok, keyword()} | {:error, atom()}
+ def get_conn(uri, opts) do
+ case ConnectionPool.get_conn(uri, opts) do
+ {:ok, conn_pid} -> {:ok, Keyword.merge(opts, conn: conn_pid, close_conn: false)}
+ err -> err
end
end
- defp checkin_conn(uri, opts) do
- case Connections.checkin(uri, :gun_connections) do
- nil ->
- Task.start(Pleroma.Gun.Conn, :open, [uri, :gun_connections, opts])
- opts
+ @prefix Pleroma.Gun.ConnectionPool
+ def limiter_setup do
+ wait = Pleroma.Config.get([:connections_pool, :connection_acquisition_wait])
+ retries = Pleroma.Config.get([:connections_pool, :connection_acquisition_retries])
- conn when is_pid(conn) ->
- Keyword.merge(opts, conn: conn, close_conn: false)
- end
+ :pools
+ |> Pleroma.Config.get([])
+ |> Enum.each(fn {name, opts} ->
+ max_running = Keyword.get(opts, :size, 50)
+ max_waiting = Keyword.get(opts, :max_waiting, 10)
+
+ result =
+ ConcurrentLimiter.new(:"#{@prefix}.#{name}", max_running, max_waiting,
+ wait: wait,
+ max_retries: retries
+ )
+
+ case result do
+ :ok -> :ok
+ {:error, :existing} -> :ok
+ e -> raise e
+ end
+ end)
+
+ :ok
end
end
diff --git a/lib/pleroma/http/adapter_helper/hackney.ex b/lib/pleroma/http/adapter_helper/hackney.ex
index 3972a03a9..cd569422b 100644
--- a/lib/pleroma/http/adapter_helper/hackney.ex
+++ b/lib/pleroma/http/adapter_helper/hackney.ex
@@ -24,5 +24,6 @@ defmodule Pleroma.HTTP.AdapterHelper.Hackney do
defp add_scheme_opts(opts, _), do: opts
- def after_request(_), do: :ok
+ @spec get_conn(URI.t(), keyword()) :: {:ok, keyword()}
+ def get_conn(_uri, opts), do: {:ok, opts}
end
diff --git a/lib/pleroma/http/connection.ex b/lib/pleroma/http/connection.ex
deleted file mode 100644
index ebacf7902..000000000
--- a/lib/pleroma/http/connection.ex
+++ /dev/null
@@ -1,124 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.HTTP.Connection do
- @moduledoc """
- Configure Tesla.Client with default and customized adapter options.
- """
-
- alias Pleroma.Config
- alias Pleroma.HTTP.AdapterHelper
-
- require Logger
-
- @defaults [pool: :federation]
-
- @type ip_address :: ipv4_address() | ipv6_address()
- @type ipv4_address :: {0..255, 0..255, 0..255, 0..255}
- @type ipv6_address ::
- {0..65_535, 0..65_535, 0..65_535, 0..65_535, 0..65_535, 0..65_535, 0..65_535, 0..65_535}
- @type proxy_type() :: :socks4 | :socks5
- @type host() :: charlist() | ip_address()
-
- @doc """
- Merge default connection & adapter options with received ones.
- """
-
- @spec options(URI.t(), keyword()) :: keyword()
- def options(%URI{} = uri, opts \\ []) do
- @defaults
- |> pool_timeout()
- |> Keyword.merge(opts)
- |> adapter_helper().options(uri)
- end
-
- defp pool_timeout(opts) do
- {config_key, default} =
- if adapter() == Tesla.Adapter.Gun do
- {:pools, Config.get([:pools, :default, :timeout])}
- else
- {:hackney_pools, 10_000}
- end
-
- timeout = Config.get([config_key, opts[:pool], :timeout], default)
-
- Keyword.merge(opts, timeout: timeout)
- end
-
- @spec after_request(keyword()) :: :ok
- def after_request(opts), do: adapter_helper().after_request(opts)
-
- defp adapter, do: Application.get_env(:tesla, :adapter)
-
- defp adapter_helper do
- case adapter() do
- Tesla.Adapter.Gun -> AdapterHelper.Gun
- Tesla.Adapter.Hackney -> AdapterHelper.Hackney
- _ -> AdapterHelper
- end
- end
-
- @spec parse_proxy(String.t() | tuple() | nil) ::
- {:ok, host(), pos_integer()}
- | {:ok, proxy_type(), host(), pos_integer()}
- | {:error, atom()}
- | nil
-
- def parse_proxy(nil), do: nil
-
- def parse_proxy(proxy) when is_binary(proxy) do
- with [host, port] <- String.split(proxy, ":"),
- {port, ""} <- Integer.parse(port) do
- {:ok, parse_host(host), port}
- else
- {_, _} ->
- Logger.warn("Parsing port failed #{inspect(proxy)}")
- {:error, :invalid_proxy_port}
-
- :error ->
- Logger.warn("Parsing port failed #{inspect(proxy)}")
- {:error, :invalid_proxy_port}
-
- _ ->
- Logger.warn("Parsing proxy failed #{inspect(proxy)}")
- {:error, :invalid_proxy}
- end
- end
-
- def parse_proxy(proxy) when is_tuple(proxy) do
- with {type, host, port} <- proxy do
- {:ok, type, parse_host(host), port}
- else
- _ ->
- Logger.warn("Parsing proxy failed #{inspect(proxy)}")
- {:error, :invalid_proxy}
- end
- end
-
- @spec parse_host(String.t() | atom() | charlist()) :: charlist() | ip_address()
- def parse_host(host) when is_list(host), do: host
- def parse_host(host) when is_atom(host), do: to_charlist(host)
-
- def parse_host(host) when is_binary(host) do
- host = to_charlist(host)
-
- case :inet.parse_address(host) do
- {:error, :einval} -> host
- {:ok, ip} -> ip
- end
- end
-
- @spec format_host(String.t()) :: charlist()
- def format_host(host) do
- host_charlist = to_charlist(host)
-
- case :inet.parse_address(host_charlist) do
- {:error, :einval} ->
- :idna.encode(host_charlist)
-
- {:ok, _ip} ->
- host_charlist
- end
- end
-end
diff --git a/lib/pleroma/http/http.ex b/lib/pleroma/http/http.ex
index 66ca75367..b37b3fa89 100644
--- a/lib/pleroma/http/http.ex
+++ b/lib/pleroma/http/http.ex
@@ -7,7 +7,7 @@ defmodule Pleroma.HTTP do
Wrapper for `Tesla.request/2`.
"""
- alias Pleroma.HTTP.Connection
+ alias Pleroma.HTTP.AdapterHelper
alias Pleroma.HTTP.Request
alias Pleroma.HTTP.RequestBuilder, as: Builder
alias Tesla.Client
@@ -60,49 +60,30 @@ defmodule Pleroma.HTTP do
{:ok, Env.t()} | {:error, any()}
def request(method, url, body, headers, options) when is_binary(url) do
uri = URI.parse(url)
- adapter_opts = Connection.options(uri, options[:adapter] || [])
- options = put_in(options[:adapter], adapter_opts)
- params = options[:params] || []
- request = build_request(method, headers, options, url, body, params)
-
- adapter = Application.get_env(:tesla, :adapter)
- client = Tesla.client([Tesla.Middleware.FollowRedirects], adapter)
-
- pid = Process.whereis(adapter_opts[:pool])
-
- pool_alive? =
- if adapter == Tesla.Adapter.Gun && pid do
- Process.alive?(pid)
- else
- false
- end
-
- request_opts =
- adapter_opts
- |> Enum.into(%{})
- |> Map.put(:env, Pleroma.Config.get([:env]))
- |> Map.put(:pool_alive?, pool_alive?)
-
- response = request(client, request, request_opts)
-
- Connection.after_request(adapter_opts)
-
- response
- end
-
- @spec request(Client.t(), keyword(), map()) :: {:ok, Env.t()} | {:error, any()}
- def request(%Client{} = client, request, %{env: :test}), do: request(client, request)
-
- def request(%Client{} = client, request, %{body_as: :chunks}), do: request(client, request)
-
- def request(%Client{} = client, request, %{pool_alive?: false}), do: request(client, request)
-
- def request(%Client{} = client, request, %{pool: pool, timeout: timeout}) do
- :poolboy.transaction(
- pool,
- &Pleroma.Pool.Request.execute(&1, client, request, timeout),
- timeout
- )
+ adapter_opts = AdapterHelper.options(uri, options[:adapter] || [])
+
+ case AdapterHelper.get_conn(uri, adapter_opts) do
+ {:ok, adapter_opts} ->
+ options = put_in(options[:adapter], adapter_opts)
+ params = options[:params] || []
+ request = build_request(method, headers, options, url, body, params)
+
+ adapter = Application.get_env(:tesla, :adapter)
+
+ client = Tesla.client(adapter_middlewares(adapter), adapter)
+
+ maybe_limit(
+ fn ->
+ request(client, request)
+ end,
+ adapter,
+ adapter_opts
+ )
+
+ # Connection release is handled in a custom FollowRedirects middleware
+ err ->
+ err
+ end
end
@spec request(Client.t(), keyword()) :: {:ok, Env.t()} | {:error, any()}
@@ -118,4 +99,19 @@ defmodule Pleroma.HTTP do
|> Builder.add_param(:query, :query, params)
|> Builder.convert_to_keyword()
end
+
+ @prefix Pleroma.Gun.ConnectionPool
+ defp maybe_limit(fun, Tesla.Adapter.Gun, opts) do
+ ConcurrentLimiter.limit(:"#{@prefix}.#{opts[:pool] || :default}", fun)
+ end
+
+ defp maybe_limit(fun, _, _) do
+ fun.()
+ end
+
+ defp adapter_middlewares(Tesla.Adapter.Gun) do
+ [Pleroma.HTTP.Middleware.FollowRedirects]
+ end
+
+ defp adapter_middlewares(_), do: []
end
diff --git a/lib/pleroma/pool/connections.ex b/lib/pleroma/pool/connections.ex
deleted file mode 100644
index acafe1bea..000000000
--- a/lib/pleroma/pool/connections.ex
+++ /dev/null
@@ -1,283 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Pool.Connections do
- use GenServer
-
- alias Pleroma.Config
- alias Pleroma.Gun
-
- require Logger
-
- @type domain :: String.t()
- @type conn :: Pleroma.Gun.Conn.t()
-
- @type t :: %__MODULE__{
- conns: %{domain() => conn()},
- opts: keyword()
- }
-
- defstruct conns: %{}, opts: []
-
- @spec start_link({atom(), keyword()}) :: {:ok, pid()}
- def start_link({name, opts}) do
- GenServer.start_link(__MODULE__, opts, name: name)
- end
-
- @impl true
- def init(opts), do: {:ok, %__MODULE__{conns: %{}, opts: opts}}
-
- @spec checkin(String.t() | URI.t(), atom()) :: pid() | nil
- def checkin(url, name)
- def checkin(url, name) when is_binary(url), do: checkin(URI.parse(url), name)
-
- def checkin(%URI{} = uri, name) do
- timeout = Config.get([:connections_pool, :checkin_timeout], 250)
-
- GenServer.call(name, {:checkin, uri}, timeout)
- end
-
- @spec alive?(atom()) :: boolean()
- def alive?(name) do
- if pid = Process.whereis(name) do
- Process.alive?(pid)
- else
- false
- end
- end
-
- @spec get_state(atom()) :: t()
- def get_state(name) do
- GenServer.call(name, :state)
- end
-
- @spec count(atom()) :: pos_integer()
- def count(name) do
- GenServer.call(name, :count)
- end
-
- @spec get_unused_conns(atom()) :: [{domain(), conn()}]
- def get_unused_conns(name) do
- GenServer.call(name, :unused_conns)
- end
-
- @spec checkout(pid(), pid(), atom()) :: :ok
- def checkout(conn, pid, name) do
- GenServer.cast(name, {:checkout, conn, pid})
- end
-
- @spec add_conn(atom(), String.t(), Pleroma.Gun.Conn.t()) :: :ok
- def add_conn(name, key, conn) do
- GenServer.cast(name, {:add_conn, key, conn})
- end
-
- @spec remove_conn(atom(), String.t()) :: :ok
- def remove_conn(name, key) do
- GenServer.cast(name, {:remove_conn, key})
- end
-
- @impl true
- def handle_cast({:add_conn, key, conn}, state) do
- state = put_in(state.conns[key], conn)
-
- Process.monitor(conn.conn)
- {:noreply, state}
- end
-
- @impl true
- def handle_cast({:checkout, conn_pid, pid}, state) do
- state =
- with true <- Process.alive?(conn_pid),
- {key, conn} <- find_conn(state.conns, conn_pid),
- used_by <- List.keydelete(conn.used_by, pid, 0) do
- conn_state = if used_by == [], do: :idle, else: conn.conn_state
-
- put_in(state.conns[key], %{conn | conn_state: conn_state, used_by: used_by})
- else
- false ->
- Logger.debug("checkout for closed conn #{inspect(conn_pid)}")
- state
-
- nil ->
- Logger.debug("checkout for alive conn #{inspect(conn_pid)}, but is not in state")
- state
- end
-
- {:noreply, state}
- end
-
- @impl true
- def handle_cast({:remove_conn, key}, state) do
- state = put_in(state.conns, Map.delete(state.conns, key))
- {:noreply, state}
- end
-
- @impl true
- def handle_call({:checkin, uri}, from, state) do
- key = "#{uri.scheme}:#{uri.host}:#{uri.port}"
-
- case state.conns[key] do
- %{conn: pid, gun_state: :up} = conn ->
- time = :os.system_time(:second)
- last_reference = time - conn.last_reference
- crf = crf(last_reference, 100, conn.crf)
-
- state =
- put_in(state.conns[key], %{
- conn
- | last_reference: time,
- crf: crf,
- conn_state: :active,
- used_by: [from | conn.used_by]
- })
-
- {:reply, pid, state}
-
- %{gun_state: :down} ->
- {:reply, nil, state}
-
- nil ->
- {:reply, nil, state}
- end
- end
-
- @impl true
- def handle_call(:state, _from, state), do: {:reply, state, state}
-
- @impl true
- def handle_call(:count, _from, state) do
- {:reply, Enum.count(state.conns), state}
- end
-
- @impl true
- def handle_call(:unused_conns, _from, state) do
- unused_conns =
- state.conns
- |> Enum.filter(&filter_conns/1)
- |> Enum.sort(&sort_conns/2)
-
- {:reply, unused_conns, state}
- end
-
- defp filter_conns({_, %{conn_state: :idle, used_by: []}}), do: true
- defp filter_conns(_), do: false
-
- defp sort_conns({_, c1}, {_, c2}) do
- c1.crf <= c2.crf and c1.last_reference <= c2.last_reference
- end
-
- @impl true
- def handle_info({:gun_up, conn_pid, _protocol}, state) do
- %{origin_host: host, origin_scheme: scheme, origin_port: port} = Gun.info(conn_pid)
-
- host =
- case :inet.ntoa(host) do
- {:error, :einval} -> host
- ip -> ip
- end
-
- key = "#{scheme}:#{host}:#{port}"
-
- state =
- with {key, conn} <- find_conn(state.conns, conn_pid, key),
- {true, key} <- {Process.alive?(conn_pid), key} do
- put_in(state.conns[key], %{
- conn
- | gun_state: :up,
- conn_state: :active,
- retries: 0
- })
- else
- {false, key} ->
- put_in(
- state.conns,
- Map.delete(state.conns, key)
- )
-
- nil ->
- :ok = Gun.close(conn_pid)
-
- state
- end
-
- {:noreply, state}
- end
-
- @impl true
- def handle_info({:gun_down, conn_pid, _protocol, _reason, _killed}, state) do
- retries = Config.get([:connections_pool, :retry], 1)
- # we can't get info on this pid, because pid is dead
- state =
- with {key, conn} <- find_conn(state.conns, conn_pid),
- {true, key} <- {Process.alive?(conn_pid), key} do
- if conn.retries == retries do
- :ok = Gun.close(conn.conn)
-
- put_in(
- state.conns,
- Map.delete(state.conns, key)
- )
- else
- put_in(state.conns[key], %{
- conn
- | gun_state: :down,
- retries: conn.retries + 1
- })
- end
- else
- {false, key} ->
- put_in(
- state.conns,
- Map.delete(state.conns, key)
- )
-
- nil ->
- Logger.debug(":gun_down for conn which isn't found in state")
-
- state
- end
-
- {:noreply, state}
- end
-
- @impl true
- def handle_info({:DOWN, _ref, :process, conn_pid, reason}, state) do
- Logger.debug("received DOWN message for #{inspect(conn_pid)} reason -> #{inspect(reason)}")
-
- state =
- with {key, conn} <- find_conn(state.conns, conn_pid) do
- Enum.each(conn.used_by, fn {pid, _ref} ->
- Process.exit(pid, reason)
- end)
-
- put_in(
- state.conns,
- Map.delete(state.conns, key)
- )
- else
- nil ->
- Logger.debug(":DOWN for conn which isn't found in state")
-
- state
- end
-
- {:noreply, state}
- end
-
- defp find_conn(conns, conn_pid) do
- Enum.find(conns, fn {_key, conn} ->
- conn.conn == conn_pid
- end)
- end
-
- defp find_conn(conns, conn_pid, conn_key) do
- Enum.find(conns, fn {key, conn} ->
- key == conn_key and conn.conn == conn_pid
- end)
- end
-
- def crf(current, steps, crf) do
- 1 + :math.pow(0.5, current / steps) * crf
- end
-end
diff --git a/lib/pleroma/pool/pool.ex b/lib/pleroma/pool/pool.ex
deleted file mode 100644
index 21a6fbbc5..000000000
--- a/lib/pleroma/pool/pool.ex
+++ /dev/null
@@ -1,22 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Pool do
- def child_spec(opts) do
- poolboy_opts =
- opts
- |> Keyword.put(:worker_module, Pleroma.Pool.Request)
- |> Keyword.put(:name, {:local, opts[:name]})
- |> Keyword.put(:size, opts[:size])
- |> Keyword.put(:max_overflow, opts[:max_overflow])
-
- %{
- id: opts[:id] || {__MODULE__, make_ref()},
- start: {:poolboy, :start_link, [poolboy_opts, [name: opts[:name]]]},
- restart: :permanent,
- shutdown: 5000,
- type: :worker
- }
- end
-end
diff --git a/lib/pleroma/pool/request.ex b/lib/pleroma/pool/request.ex
deleted file mode 100644
index 3fb930db7..000000000
--- a/lib/pleroma/pool/request.ex
+++ /dev/null
@@ -1,65 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Pool.Request do
- use GenServer
-
- require Logger
-
- def start_link(args) do
- GenServer.start_link(__MODULE__, args)
- end
-
- @impl true
- def init(_), do: {:ok, []}
-
- @spec execute(pid() | atom(), Tesla.Client.t(), keyword(), pos_integer()) ::
- {:ok, Tesla.Env.t()} | {:error, any()}
- def execute(pid, client, request, timeout) do
- GenServer.call(pid, {:execute, client, request}, timeout)
- end
-
- @impl true
- def handle_call({:execute, client, request}, _from, state) do
- response = Pleroma.HTTP.request(client, request)
-
- {:reply, response, state}
- end
-
- @impl true
- def handle_info({:gun_data, _conn, _stream, _, _}, state) do
- {:noreply, state}
- end
-
- @impl true
- def handle_info({:gun_up, _conn, _protocol}, state) do
- {:noreply, state}
- end
-
- @impl true
- def handle_info({:gun_down, _conn, _protocol, _reason, _killed}, state) do
- {:noreply, state}
- end
-
- @impl true
- def handle_info({:gun_error, _conn, _stream, _error}, state) do
- {:noreply, state}
- end
-
- @impl true
- def handle_info({:gun_push, _conn, _stream, _new_stream, _method, _uri, _headers}, state) do
- {:noreply, state}
- end
-
- @impl true
- def handle_info({:gun_response, _conn, _stream, _, _status, _headers}, state) do
- {:noreply, state}
- end
-
- @impl true
- def handle_info(msg, state) do
- Logger.warn("Received unexpected message #{inspect(__MODULE__)} #{inspect(msg)}")
- {:noreply, state}
- end
-end
diff --git a/lib/pleroma/pool/supervisor.ex b/lib/pleroma/pool/supervisor.ex
deleted file mode 100644
index faf646cb2..000000000
--- a/lib/pleroma/pool/supervisor.ex
+++ /dev/null
@@ -1,42 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Pool.Supervisor do
- use Supervisor
-
- alias Pleroma.Config
- alias Pleroma.Pool
-
- def start_link(args) do
- Supervisor.start_link(__MODULE__, args, name: __MODULE__)
- end
-
- def init(_) do
- conns_child = %{
- id: Pool.Connections,
- start:
- {Pool.Connections, :start_link, [{:gun_connections, Config.get([:connections_pool])}]}
- }
-
- Supervisor.init([conns_child | pools()], strategy: :one_for_one)
- end
-
- defp pools do
- pools = Config.get(:pools)
-
- pools =
- if Config.get([Pleroma.Upload, :proxy_remote]) == false do
- Keyword.delete(pools, :upload)
- else
- pools
- end
-
- for {pool_name, pool_opts} <- pools do
- pool_opts
- |> Keyword.put(:id, {Pool, pool_name})
- |> Keyword.put(:name, pool_name)
- |> Pool.child_spec()
- end
- end
-end
diff --git a/lib/pleroma/reverse_proxy/client/tesla.ex b/lib/pleroma/reverse_proxy/client/tesla.ex
index e81ea8bde..65785445d 100644
--- a/lib/pleroma/reverse_proxy/client/tesla.ex
+++ b/lib/pleroma/reverse_proxy/client/tesla.ex
@@ -48,7 +48,7 @@ defmodule Pleroma.ReverseProxy.Client.Tesla do
# if there were redirects we need to checkout old conn
conn = opts[:old_conn] || opts[:conn]
- if conn, do: :ok = Pleroma.Pool.Connections.checkout(conn, self(), :gun_connections)
+ if conn, do: :ok = Pleroma.Gun.ConnectionPool.release_conn(conn)
:done
end
diff --git a/lib/pleroma/telemetry/logger.ex b/lib/pleroma/telemetry/logger.ex
new file mode 100644
index 000000000..4cacae02f
--- /dev/null
+++ b/lib/pleroma/telemetry/logger.ex
@@ -0,0 +1,76 @@
+defmodule Pleroma.Telemetry.Logger do
+ @moduledoc "Transforms Pleroma telemetry events to logs"
+
+ require Logger
+
+ @events [
+ [:pleroma, :connection_pool, :reclaim, :start],
+ [:pleroma, :connection_pool, :reclaim, :stop],
+ [:pleroma, :connection_pool, :provision_failure],
+ [:pleroma, :connection_pool, :client_death]
+ ]
+ def attach do
+ :telemetry.attach_many("pleroma-logger", @events, &handle_event/4, [])
+ end
+
+ # Passing anonymous functions instead of strings to logger is intentional,
+ # that way strings won't be concatenated if the message is going to be thrown
+ # out anyway due to higher log level configured
+
+ def handle_event(
+ [:pleroma, :connection_pool, :reclaim, :start],
+ _,
+ %{max_connections: max_connections, reclaim_max: reclaim_max},
+ _
+ ) do
+ Logger.debug(fn ->
+ "Connection pool is exhausted (reached #{max_connections} connections). Starting idle connection cleanup to reclaim as much as #{
+ reclaim_max
+ } connections"
+ end)
+ end
+
+ def handle_event(
+ [:pleroma, :connection_pool, :reclaim, :stop],
+ %{reclaimed_count: 0},
+ _,
+ _
+ ) do
+ Logger.error(fn ->
+ "Connection pool failed to reclaim any connections due to all of them being in use. It will have to drop requests for opening connections to new hosts"
+ end)
+ end
+
+ def handle_event(
+ [:pleroma, :connection_pool, :reclaim, :stop],
+ %{reclaimed_count: reclaimed_count},
+ _,
+ _
+ ) do
+ Logger.debug(fn -> "Connection pool cleaned up #{reclaimed_count} idle connections" end)
+ end
+
+ def handle_event(
+ [:pleroma, :connection_pool, :provision_failure],
+ %{opts: [key | _]},
+ _,
+ _
+ ) do
+ Logger.error(fn ->
+ "Connection pool had to refuse opening a connection to #{key} due to connection limit exhaustion"
+ end)
+ end
+
+ def handle_event(
+ [:pleroma, :connection_pool, :client_death],
+ %{client_pid: client_pid, reason: reason},
+ %{key: key},
+ _
+ ) do
+ Logger.warn(fn ->
+ "Pool worker for #{key}: Client #{inspect(client_pid)} died before releasing the connection with #{
+ inspect(reason)
+ }"
+ end)
+ end
+end
diff --git a/lib/pleroma/tesla/middleware/follow_redirects.ex b/lib/pleroma/tesla/middleware/follow_redirects.ex
new file mode 100644
index 000000000..5a7032215
--- /dev/null
+++ b/lib/pleroma/tesla/middleware/follow_redirects.ex
@@ -0,0 +1,110 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2015-2020 Tymon Tobolski <https://github.com/teamon/tesla/blob/master/lib/tesla/middleware/follow_redirects.ex>
+# Copyright © 2020 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.HTTP.Middleware.FollowRedirects do
+ @moduledoc """
+ Pool-aware version of https://github.com/teamon/tesla/blob/master/lib/tesla/middleware/follow_redirects.ex
+
+ Follow 3xx redirects
+ ## Options
+ - `:max_redirects` - limit number of redirects (default: `5`)
+ """
+
+ alias Pleroma.Gun.ConnectionPool
+
+ @behaviour Tesla.Middleware
+
+ @max_redirects 5
+ @redirect_statuses [301, 302, 303, 307, 308]
+
+ @impl Tesla.Middleware
+ def call(env, next, opts \\ []) do
+ max = Keyword.get(opts, :max_redirects, @max_redirects)
+
+ redirect(env, next, max)
+ end
+
+ defp redirect(env, next, left) do
+ opts = env.opts[:adapter]
+
+ case Tesla.run(env, next) do
+ {:ok, %{status: status} = res} when status in @redirect_statuses and left > 0 ->
+ release_conn(opts)
+
+ case Tesla.get_header(res, "location") do
+ nil ->
+ {:ok, res}
+
+ location ->
+ location = parse_location(location, res)
+
+ case get_conn(location, opts) do
+ {:ok, opts} ->
+ %{env | opts: Keyword.put(env.opts, :adapter, opts)}
+ |> new_request(res.status, location)
+ |> redirect(next, left - 1)
+
+ e ->
+ e
+ end
+ end
+
+ {:ok, %{status: status}} when status in @redirect_statuses ->
+ release_conn(opts)
+ {:error, {__MODULE__, :too_many_redirects}}
+
+ {:error, _} = e ->
+ release_conn(opts)
+ e
+
+ other ->
+ unless opts[:body_as] == :chunks do
+ release_conn(opts)
+ end
+
+ other
+ end
+ end
+
+ defp get_conn(location, opts) do
+ uri = URI.parse(location)
+
+ case ConnectionPool.get_conn(uri, opts) do
+ {:ok, conn} ->
+ {:ok, Keyword.merge(opts, conn: conn)}
+
+ e ->
+ e
+ end
+ end
+
+ defp release_conn(opts) do
+ ConnectionPool.release_conn(opts[:conn])
+ end
+
+ # The 303 (See Other) redirect was added in HTTP/1.1 to indicate that the originally
+ # requested resource is not available, however a related resource (or another redirect)
+ # available via GET is available at the specified location.
+ # https://tools.ietf.org/html/rfc7231#section-6.4.4
+ defp new_request(env, 303, location), do: %{env | url: location, method: :get, query: []}
+
+ # The 307 (Temporary Redirect) status code indicates that the target
+ # resource resides temporarily under a different URI and the user agent
+ # MUST NOT change the request method (...)
+ # https://tools.ietf.org/html/rfc7231#section-6.4.7
+ defp new_request(env, 307, location), do: %{env | url: location}
+
+ defp new_request(env, _, location), do: %{env | url: location, query: []}
+
+ defp parse_location("https://" <> _rest = location, _env), do: location
+ defp parse_location("http://" <> _rest = location, _env), do: location
+
+ defp parse_location(location, env) do
+ env.url
+ |> URI.parse()
+ |> URI.merge(location)
+ |> URI.to_string()
+ end
+end
diff --git a/lib/pleroma/web/api_spec/operations/account_operation.ex b/lib/pleroma/web/api_spec/operations/account_operation.ex
index 952d9347b..50c8e0242 100644
--- a/lib/pleroma/web/api_spec/operations/account_operation.ex
+++ b/lib/pleroma/web/api_spec/operations/account_operation.ex
@@ -159,6 +159,7 @@ defmodule Pleroma.Web.ApiSpec.AccountOperation do
"Accounts which follow the given account, if network is not hidden by the account owner.",
parameters: [
%Reference{"$ref": "#/components/parameters/accountIdOrNickname"},
+ Operation.parameter(:id, :query, :string, "ID of the resource owner"),
with_relationships_param() | pagination_params()
],
responses: %{
@@ -177,6 +178,7 @@ defmodule Pleroma.Web.ApiSpec.AccountOperation do
"Accounts which the given account is following, if network is not hidden by the account owner.",
parameters: [
%Reference{"$ref": "#/components/parameters/accountIdOrNickname"},
+ Operation.parameter(:id, :query, :string, "ID of the resource owner"),
with_relationships_param() | pagination_params()
],
responses: %{200 => Operation.response("Accounts", "application/json", array_of_accounts())}
diff --git a/lib/pleroma/web/mastodon_api/views/instance_view.ex b/lib/pleroma/web/mastodon_api/views/instance_view.ex
index 5deb0d7ed..cd3bc7f00 100644
--- a/lib/pleroma/web/mastodon_api/views/instance_view.ex
+++ b/lib/pleroma/web/mastodon_api/views/instance_view.ex
@@ -41,7 +41,8 @@ defmodule Pleroma.Web.MastodonAPI.InstanceView do
account_activation_required: Keyword.get(instance, :account_activation_required),
features: features(),
federation: federation(),
- fields_limits: fields_limits()
+ fields_limits: fields_limits(),
+ post_formats: Config.get([:instance, :allowed_post_formats])
},
vapid_public_key: Keyword.get(Pleroma.Web.Push.vapid_config(), :public_key)
}
diff --git a/lib/pleroma/web/templates/o_auth/mfa/recovery.html.eex b/lib/pleroma/web/templates/o_auth/mfa/recovery.html.eex
index 750f65386..5ab59b57b 100644
--- a/lib/pleroma/web/templates/o_auth/mfa/recovery.html.eex
+++ b/lib/pleroma/web/templates/o_auth/mfa/recovery.html.eex
@@ -10,7 +10,7 @@
<%= form_for @conn, mfa_verify_path(@conn, :verify), [as: "mfa"], fn f -> %>
<div class="input">
<%= label f, :code, "Recovery code" %>
- <%= text_input f, :code %>
+ <%= text_input f, :code, [autocomplete: false, autocorrect: "off", autocapitalize: "off", autofocus: true, spellcheck: false] %>
<%= hidden_input f, :mfa_token, value: @mfa_token %>
<%= hidden_input f, :state, value: @state %>
<%= hidden_input f, :redirect_uri, value: @redirect_uri %>
diff --git a/lib/pleroma/web/templates/o_auth/mfa/totp.html.eex b/lib/pleroma/web/templates/o_auth/mfa/totp.html.eex
index af6e546b0..af85777eb 100644
--- a/lib/pleroma/web/templates/o_auth/mfa/totp.html.eex
+++ b/lib/pleroma/web/templates/o_auth/mfa/totp.html.eex
@@ -10,7 +10,7 @@
<%= form_for @conn, mfa_verify_path(@conn, :verify), [as: "mfa"], fn f -> %>
<div class="input">
<%= label f, :code, "Authentication code" %>
- <%= text_input f, :code %>
+ <%= text_input f, :code, [autocomplete: false, autocorrect: "off", autocapitalize: "off", autofocus: true, pattern: "[0-9]*", spellcheck: false] %>
<%= hidden_input f, :mfa_token, value: @mfa_token %>
<%= hidden_input f, :state, value: @state %>
<%= hidden_input f, :redirect_uri, value: @redirect_uri %>