diff options
Diffstat (limited to 'lib/pleroma/http')
-rw-r--r-- | lib/pleroma/http/adapter_helper.ex | 129 | ||||
-rw-r--r-- | lib/pleroma/http/adapter_helper/default.ex | 14 | ||||
-rw-r--r-- | lib/pleroma/http/adapter_helper/gun.ex | 63 | ||||
-rw-r--r-- | lib/pleroma/http/adapter_helper/hackney.ex | 3 | ||||
-rw-r--r-- | lib/pleroma/http/connection.ex | 124 | ||||
-rw-r--r-- | lib/pleroma/http/http.ex | 77 |
6 files changed, 195 insertions, 215 deletions
diff --git a/lib/pleroma/http/adapter_helper.ex b/lib/pleroma/http/adapter_helper.ex index 510722ff9..9ec3836b0 100644 --- a/lib/pleroma/http/adapter_helper.ex +++ b/lib/pleroma/http/adapter_helper.ex @@ -3,32 +3,30 @@ # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.HTTP.AdapterHelper do - alias Pleroma.HTTP.Connection + @moduledoc """ + Configure Tesla.Client with default and customized adapter options. + """ + @defaults [pool: :federation] + + @type proxy_type() :: :socks4 | :socks5 + @type host() :: charlist() | :inet.ip_address() + + alias Pleroma.Config + alias Pleroma.HTTP.AdapterHelper + require Logger @type proxy :: {Connection.host(), pos_integer()} | {Connection.proxy_type(), Connection.host(), pos_integer()} @callback options(keyword(), URI.t()) :: keyword() - @callback after_request(keyword()) :: :ok - - @spec options(keyword(), URI.t()) :: keyword() - def options(opts, _uri) do - proxy = Pleroma.Config.get([:http, :proxy_url], nil) - maybe_add_proxy(opts, format_proxy(proxy)) - end - - @spec maybe_get_conn(URI.t(), keyword()) :: keyword() - def maybe_get_conn(_uri, opts), do: opts - - @spec after_request(keyword()) :: :ok - def after_request(_opts), do: :ok + @callback get_conn(URI.t(), keyword()) :: {:ok, term()} | {:error, term()} @spec format_proxy(String.t() | tuple() | nil) :: proxy() | nil def format_proxy(nil), do: nil def format_proxy(proxy_url) do - case Connection.parse_proxy(proxy_url) do + case parse_proxy(proxy_url) do {:ok, host, port} -> {host, port} {:ok, type, host, port} -> {type, host, port} _ -> nil @@ -38,4 +36,105 @@ defmodule Pleroma.HTTP.AdapterHelper do @spec maybe_add_proxy(keyword(), proxy() | nil) :: keyword() def maybe_add_proxy(opts, nil), do: opts def maybe_add_proxy(opts, proxy), do: Keyword.put_new(opts, :proxy, proxy) + + @doc """ + Merge default connection & adapter options with received ones. + """ + + @spec options(URI.t(), keyword()) :: keyword() + def options(%URI{} = uri, opts \\ []) do + @defaults + |> put_timeout() + |> Keyword.merge(opts) + |> adapter_helper().options(uri) + end + + # For Hackney, this is the time a connection can stay idle in the pool. + # For Gun, this is the timeout to receive a message from Gun. + defp put_timeout(opts) do + {config_key, default} = + if adapter() == Tesla.Adapter.Gun do + {:pools, Config.get([:pools, :default, :timeout], 5_000)} + else + {:hackney_pools, 10_000} + end + + timeout = Config.get([config_key, opts[:pool], :timeout], default) + + Keyword.merge(opts, timeout: timeout) + end + + def get_conn(uri, opts), do: adapter_helper().get_conn(uri, opts) + defp adapter, do: Application.get_env(:tesla, :adapter) + + defp adapter_helper do + case adapter() do + Tesla.Adapter.Gun -> AdapterHelper.Gun + Tesla.Adapter.Hackney -> AdapterHelper.Hackney + _ -> AdapterHelper.Default + end + end + + @spec parse_proxy(String.t() | tuple() | nil) :: + {:ok, host(), pos_integer()} + | {:ok, proxy_type(), host(), pos_integer()} + | {:error, atom()} + | nil + + def parse_proxy(nil), do: nil + + def parse_proxy(proxy) when is_binary(proxy) do + with [host, port] <- String.split(proxy, ":"), + {port, ""} <- Integer.parse(port) do + {:ok, parse_host(host), port} + else + {_, _} -> + Logger.warn("Parsing port failed #{inspect(proxy)}") + {:error, :invalid_proxy_port} + + :error -> + Logger.warn("Parsing port failed #{inspect(proxy)}") + {:error, :invalid_proxy_port} + + _ -> + Logger.warn("Parsing proxy failed #{inspect(proxy)}") + {:error, :invalid_proxy} + end + end + + def parse_proxy(proxy) when is_tuple(proxy) do + with {type, host, port} <- proxy do + {:ok, type, parse_host(host), port} + else + _ -> + Logger.warn("Parsing proxy failed #{inspect(proxy)}") + {:error, :invalid_proxy} + end + end + + @spec parse_host(String.t() | atom() | charlist()) :: charlist() | :inet.ip_address() + def parse_host(host) when is_list(host), do: host + def parse_host(host) when is_atom(host), do: to_charlist(host) + + def parse_host(host) when is_binary(host) do + host = to_charlist(host) + + case :inet.parse_address(host) do + {:error, :einval} -> host + {:ok, ip} -> ip + end + end + + @spec format_host(String.t()) :: charlist() + def format_host(host) do + host_charlist = to_charlist(host) + + case :inet.parse_address(host_charlist) do + {:error, :einval} -> + :idna.encode(host_charlist) + + {:ok, _ip} -> + host_charlist + end + end end diff --git a/lib/pleroma/http/adapter_helper/default.ex b/lib/pleroma/http/adapter_helper/default.ex new file mode 100644 index 000000000..e13441316 --- /dev/null +++ b/lib/pleroma/http/adapter_helper/default.ex @@ -0,0 +1,14 @@ +defmodule Pleroma.HTTP.AdapterHelper.Default do + alias Pleroma.HTTP.AdapterHelper + + @behaviour Pleroma.HTTP.AdapterHelper + + @spec options(keyword(), URI.t()) :: keyword() + def options(opts, _uri) do + proxy = Pleroma.Config.get([:http, :proxy_url], nil) + AdapterHelper.maybe_add_proxy(opts, AdapterHelper.format_proxy(proxy)) + end + + @spec get_conn(URI.t(), keyword()) :: {:ok, keyword()} + def get_conn(_uri, opts), do: {:ok, opts} +end diff --git a/lib/pleroma/http/adapter_helper/gun.ex b/lib/pleroma/http/adapter_helper/gun.ex index ead7cdc6b..b4ff8306c 100644 --- a/lib/pleroma/http/adapter_helper/gun.ex +++ b/lib/pleroma/http/adapter_helper/gun.ex @@ -5,8 +5,8 @@ defmodule Pleroma.HTTP.AdapterHelper.Gun do @behaviour Pleroma.HTTP.AdapterHelper + alias Pleroma.Gun.ConnectionPool alias Pleroma.HTTP.AdapterHelper - alias Pleroma.Pool.Connections require Logger @@ -14,7 +14,7 @@ defmodule Pleroma.HTTP.AdapterHelper.Gun do connect_timeout: 5_000, domain_lookup_timeout: 5_000, tls_handshake_timeout: 5_000, - retry: 1, + retry: 0, retry_timeout: 1000, await_up_timeout: 5_000 ] @@ -31,16 +31,7 @@ defmodule Pleroma.HTTP.AdapterHelper.Gun do |> Keyword.merge(config_opts) |> add_scheme_opts(uri) |> AdapterHelper.maybe_add_proxy(proxy) - |> maybe_get_conn(uri, incoming_opts) - end - - @spec after_request(keyword()) :: :ok - def after_request(opts) do - if opts[:conn] && opts[:body_as] != :chunks do - Connections.checkout(opts[:conn], self(), :gun_connections) - end - - :ok + |> Keyword.merge(incoming_opts) end defp add_scheme_opts(opts, %{scheme: "http"}), do: opts @@ -48,30 +39,40 @@ defmodule Pleroma.HTTP.AdapterHelper.Gun do defp add_scheme_opts(opts, %{scheme: "https"}) do opts |> Keyword.put(:certificates_verification, true) - |> Keyword.put(:tls_opts, log_level: :warning) end - defp maybe_get_conn(adapter_opts, uri, incoming_opts) do - {receive_conn?, opts} = - adapter_opts - |> Keyword.merge(incoming_opts) - |> Keyword.pop(:receive_conn, true) - - if Connections.alive?(:gun_connections) and receive_conn? do - checkin_conn(uri, opts) - else - opts + @spec get_conn(URI.t(), keyword()) :: {:ok, keyword()} | {:error, atom()} + def get_conn(uri, opts) do + case ConnectionPool.get_conn(uri, opts) do + {:ok, conn_pid} -> {:ok, Keyword.merge(opts, conn: conn_pid, close_conn: false)} + err -> err end end - defp checkin_conn(uri, opts) do - case Connections.checkin(uri, :gun_connections) do - nil -> - Task.start(Pleroma.Gun.Conn, :open, [uri, :gun_connections, opts]) - opts + @prefix Pleroma.Gun.ConnectionPool + def limiter_setup do + wait = Pleroma.Config.get([:connections_pool, :connection_acquisition_wait]) + retries = Pleroma.Config.get([:connections_pool, :connection_acquisition_retries]) - conn when is_pid(conn) -> - Keyword.merge(opts, conn: conn, close_conn: false) - end + :pools + |> Pleroma.Config.get([]) + |> Enum.each(fn {name, opts} -> + max_running = Keyword.get(opts, :size, 50) + max_waiting = Keyword.get(opts, :max_waiting, 10) + + result = + ConcurrentLimiter.new(:"#{@prefix}.#{name}", max_running, max_waiting, + wait: wait, + max_retries: retries + ) + + case result do + :ok -> :ok + {:error, :existing} -> :ok + e -> raise e + end + end) + + :ok end end diff --git a/lib/pleroma/http/adapter_helper/hackney.ex b/lib/pleroma/http/adapter_helper/hackney.ex index 3972a03a9..cd569422b 100644 --- a/lib/pleroma/http/adapter_helper/hackney.ex +++ b/lib/pleroma/http/adapter_helper/hackney.ex @@ -24,5 +24,6 @@ defmodule Pleroma.HTTP.AdapterHelper.Hackney do defp add_scheme_opts(opts, _), do: opts - def after_request(_), do: :ok + @spec get_conn(URI.t(), keyword()) :: {:ok, keyword()} + def get_conn(_uri, opts), do: {:ok, opts} end diff --git a/lib/pleroma/http/connection.ex b/lib/pleroma/http/connection.ex deleted file mode 100644 index ebacf7902..000000000 --- a/lib/pleroma/http/connection.ex +++ /dev/null @@ -1,124 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.HTTP.Connection do - @moduledoc """ - Configure Tesla.Client with default and customized adapter options. - """ - - alias Pleroma.Config - alias Pleroma.HTTP.AdapterHelper - - require Logger - - @defaults [pool: :federation] - - @type ip_address :: ipv4_address() | ipv6_address() - @type ipv4_address :: {0..255, 0..255, 0..255, 0..255} - @type ipv6_address :: - {0..65_535, 0..65_535, 0..65_535, 0..65_535, 0..65_535, 0..65_535, 0..65_535, 0..65_535} - @type proxy_type() :: :socks4 | :socks5 - @type host() :: charlist() | ip_address() - - @doc """ - Merge default connection & adapter options with received ones. - """ - - @spec options(URI.t(), keyword()) :: keyword() - def options(%URI{} = uri, opts \\ []) do - @defaults - |> pool_timeout() - |> Keyword.merge(opts) - |> adapter_helper().options(uri) - end - - defp pool_timeout(opts) do - {config_key, default} = - if adapter() == Tesla.Adapter.Gun do - {:pools, Config.get([:pools, :default, :timeout])} - else - {:hackney_pools, 10_000} - end - - timeout = Config.get([config_key, opts[:pool], :timeout], default) - - Keyword.merge(opts, timeout: timeout) - end - - @spec after_request(keyword()) :: :ok - def after_request(opts), do: adapter_helper().after_request(opts) - - defp adapter, do: Application.get_env(:tesla, :adapter) - - defp adapter_helper do - case adapter() do - Tesla.Adapter.Gun -> AdapterHelper.Gun - Tesla.Adapter.Hackney -> AdapterHelper.Hackney - _ -> AdapterHelper - end - end - - @spec parse_proxy(String.t() | tuple() | nil) :: - {:ok, host(), pos_integer()} - | {:ok, proxy_type(), host(), pos_integer()} - | {:error, atom()} - | nil - - def parse_proxy(nil), do: nil - - def parse_proxy(proxy) when is_binary(proxy) do - with [host, port] <- String.split(proxy, ":"), - {port, ""} <- Integer.parse(port) do - {:ok, parse_host(host), port} - else - {_, _} -> - Logger.warn("Parsing port failed #{inspect(proxy)}") - {:error, :invalid_proxy_port} - - :error -> - Logger.warn("Parsing port failed #{inspect(proxy)}") - {:error, :invalid_proxy_port} - - _ -> - Logger.warn("Parsing proxy failed #{inspect(proxy)}") - {:error, :invalid_proxy} - end - end - - def parse_proxy(proxy) when is_tuple(proxy) do - with {type, host, port} <- proxy do - {:ok, type, parse_host(host), port} - else - _ -> - Logger.warn("Parsing proxy failed #{inspect(proxy)}") - {:error, :invalid_proxy} - end - end - - @spec parse_host(String.t() | atom() | charlist()) :: charlist() | ip_address() - def parse_host(host) when is_list(host), do: host - def parse_host(host) when is_atom(host), do: to_charlist(host) - - def parse_host(host) when is_binary(host) do - host = to_charlist(host) - - case :inet.parse_address(host) do - {:error, :einval} -> host - {:ok, ip} -> ip - end - end - - @spec format_host(String.t()) :: charlist() - def format_host(host) do - host_charlist = to_charlist(host) - - case :inet.parse_address(host_charlist) do - {:error, :einval} -> - :idna.encode(host_charlist) - - {:ok, _ip} -> - host_charlist - end - end -end diff --git a/lib/pleroma/http/http.ex b/lib/pleroma/http/http.ex index 66ca75367..6128bc4cf 100644 --- a/lib/pleroma/http/http.ex +++ b/lib/pleroma/http/http.ex @@ -7,7 +7,7 @@ defmodule Pleroma.HTTP do Wrapper for `Tesla.request/2`. """ - alias Pleroma.HTTP.Connection + alias Pleroma.HTTP.AdapterHelper alias Pleroma.HTTP.Request alias Pleroma.HTTP.RequestBuilder, as: Builder alias Tesla.Client @@ -60,49 +60,29 @@ defmodule Pleroma.HTTP do {:ok, Env.t()} | {:error, any()} def request(method, url, body, headers, options) when is_binary(url) do uri = URI.parse(url) - adapter_opts = Connection.options(uri, options[:adapter] || []) - options = put_in(options[:adapter], adapter_opts) - params = options[:params] || [] - request = build_request(method, headers, options, url, body, params) - - adapter = Application.get_env(:tesla, :adapter) - client = Tesla.client([Tesla.Middleware.FollowRedirects], adapter) - - pid = Process.whereis(adapter_opts[:pool]) - - pool_alive? = - if adapter == Tesla.Adapter.Gun && pid do - Process.alive?(pid) - else - false - end - - request_opts = - adapter_opts - |> Enum.into(%{}) - |> Map.put(:env, Pleroma.Config.get([:env])) - |> Map.put(:pool_alive?, pool_alive?) - - response = request(client, request, request_opts) - - Connection.after_request(adapter_opts) - - response - end - - @spec request(Client.t(), keyword(), map()) :: {:ok, Env.t()} | {:error, any()} - def request(%Client{} = client, request, %{env: :test}), do: request(client, request) - - def request(%Client{} = client, request, %{body_as: :chunks}), do: request(client, request) - - def request(%Client{} = client, request, %{pool_alive?: false}), do: request(client, request) - - def request(%Client{} = client, request, %{pool: pool, timeout: timeout}) do - :poolboy.transaction( - pool, - &Pleroma.Pool.Request.execute(&1, client, request, timeout), - timeout - ) + adapter_opts = AdapterHelper.options(uri, options[:adapter] || []) + + case AdapterHelper.get_conn(uri, adapter_opts) do + {:ok, adapter_opts} -> + options = put_in(options[:adapter], adapter_opts) + params = options[:params] || [] + request = build_request(method, headers, options, url, body, params) + + adapter = Application.get_env(:tesla, :adapter) + client = Tesla.client([Pleroma.HTTP.Middleware.FollowRedirects], adapter) + + maybe_limit( + fn -> + request(client, request) + end, + adapter, + adapter_opts + ) + + # Connection release is handled in a custom FollowRedirects middleware + err -> + err + end end @spec request(Client.t(), keyword()) :: {:ok, Env.t()} | {:error, any()} @@ -118,4 +98,13 @@ defmodule Pleroma.HTTP do |> Builder.add_param(:query, :query, params) |> Builder.convert_to_keyword() end + + @prefix Pleroma.Gun.ConnectionPool + defp maybe_limit(fun, Tesla.Adapter.Gun, opts) do + ConcurrentLimiter.limit(:"#{@prefix}.#{opts[:pool] || :default}", fun) + end + + defp maybe_limit(fun, _, _) do + fun.() + end end |