aboutsummaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/mix/tasks/pleroma/benchmark.ex39
-rw-r--r--lib/mix/tasks/pleroma/emoji.ex9
-rw-r--r--lib/pleroma/application.ex90
-rw-r--r--lib/pleroma/config/config_db.ex11
-rw-r--r--lib/pleroma/config/transfer_task.ex43
-rw-r--r--lib/pleroma/gun/api.ex26
-rw-r--r--lib/pleroma/gun/api/mock.ex151
-rw-r--r--lib/pleroma/gun/conn.ex29
-rw-r--r--lib/pleroma/gun/gun.ex45
-rw-r--r--lib/pleroma/http/adapter.ex64
-rw-r--r--lib/pleroma/http/adapter/gun.ex123
-rw-r--r--lib/pleroma/http/adapter/hackney.ex41
-rw-r--r--lib/pleroma/http/connection.ex113
-rw-r--r--lib/pleroma/http/http.ex154
-rw-r--r--lib/pleroma/http/request.ex23
-rw-r--r--lib/pleroma/http/request_builder.ex105
-rw-r--r--lib/pleroma/object/fetcher.ex6
-rw-r--r--lib/pleroma/otp_version.ex63
-rw-r--r--lib/pleroma/pool/connections.ex415
-rw-r--r--lib/pleroma/pool/pool.ex22
-rw-r--r--lib/pleroma/pool/request.ex72
-rw-r--r--lib/pleroma/pool/supervisor.ex36
-rw-r--r--lib/pleroma/reverse_proxy/client.ex26
-rw-r--r--lib/pleroma/reverse_proxy/client/hackney.ex24
-rw-r--r--lib/pleroma/reverse_proxy/client/tesla.ex87
-rw-r--r--lib/pleroma/reverse_proxy/reverse_proxy.ex20
-rw-r--r--lib/pleroma/web/activity_pub/mrf/media_proxy_warming_policy.ex14
-rw-r--r--lib/pleroma/web/rel_me.ex18
-rw-r--r--lib/pleroma/web/rich_media/parser.ex18
-rw-r--r--lib/pleroma/web/web_finger/web_finger.ex2
30 files changed, 1647 insertions, 242 deletions
diff --git a/lib/mix/tasks/pleroma/benchmark.ex b/lib/mix/tasks/pleroma/benchmark.ex
index 84dccf7f3..01e079136 100644
--- a/lib/mix/tasks/pleroma/benchmark.ex
+++ b/lib/mix/tasks/pleroma/benchmark.ex
@@ -74,4 +74,43 @@ defmodule Mix.Tasks.Pleroma.Benchmark do
inputs: inputs
)
end
+
+ def run(["adapters"]) do
+ start_pleroma()
+
+ :ok =
+ Pleroma.Pool.Connections.open_conn(
+ "https://httpbin.org/stream-bytes/1500",
+ :gun_connections
+ )
+
+ Process.sleep(1_500)
+
+ Benchee.run(
+ %{
+ "Without conn and without pool" => fn ->
+ {:ok, %Tesla.Env{}} =
+ Pleroma.HTTP.get("https://httpbin.org/stream-bytes/1500", [],
+ adapter: [pool: :no_pool, receive_conn: false]
+ )
+ end,
+ "Without conn and with pool" => fn ->
+ {:ok, %Tesla.Env{}} =
+ Pleroma.HTTP.get("https://httpbin.org/stream-bytes/1500", [],
+ adapter: [receive_conn: false]
+ )
+ end,
+ "With reused conn and without pool" => fn ->
+ {:ok, %Tesla.Env{}} =
+ Pleroma.HTTP.get("https://httpbin.org/stream-bytes/1500", [],
+ adapter: [pool: :no_pool]
+ )
+ end,
+ "With reused conn and with pool" => fn ->
+ {:ok, %Tesla.Env{}} = Pleroma.HTTP.get("https://httpbin.org/stream-bytes/1500")
+ end
+ },
+ parallel: 10
+ )
+ end
end
diff --git a/lib/mix/tasks/pleroma/emoji.ex b/lib/mix/tasks/pleroma/emoji.ex
index 24d999707..b4e8d3a0b 100644
--- a/lib/mix/tasks/pleroma/emoji.ex
+++ b/lib/mix/tasks/pleroma/emoji.ex
@@ -4,13 +4,13 @@
defmodule Mix.Tasks.Pleroma.Emoji do
use Mix.Task
+ import Mix.Pleroma
@shortdoc "Manages emoji packs"
@moduledoc File.read!("docs/administration/CLI_tasks/emoji.md")
def run(["ls-packs" | args]) do
- Mix.Pleroma.start_pleroma()
- Application.ensure_all_started(:hackney)
+ start_pleroma()
{options, [], []} = parse_global_opts(args)
@@ -36,8 +36,7 @@ defmodule Mix.Tasks.Pleroma.Emoji do
end
def run(["get-packs" | args]) do
- Mix.Pleroma.start_pleroma()
- Application.ensure_all_started(:hackney)
+ start_pleroma()
{options, pack_names, []} = parse_global_opts(args)
@@ -135,7 +134,7 @@ defmodule Mix.Tasks.Pleroma.Emoji do
end
def run(["gen-pack", src]) do
- Application.ensure_all_started(:hackney)
+ start_pleroma()
proposed_name = Path.basename(src) |> Path.rootname()
name = String.trim(IO.gets("Pack name [#{proposed_name}]: "))
diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex
index 27758cf94..df6d3a98d 100644
--- a/lib/pleroma/application.ex
+++ b/lib/pleroma/application.ex
@@ -3,8 +3,12 @@
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Application do
- import Cachex.Spec
use Application
+
+ import Cachex.Spec
+
+ alias Pleroma.Config
+
require Logger
@name Mix.Project.config()[:name]
@@ -18,9 +22,9 @@ defmodule Pleroma.Application do
def repository, do: @repository
def user_agent do
- case Pleroma.Config.get([:http, :user_agent], :default) do
+ case Config.get([:http, :user_agent], :default) do
:default ->
- info = "#{Pleroma.Web.base_url()} <#{Pleroma.Config.get([:instance, :email], "")}>"
+ info = "#{Pleroma.Web.base_url()} <#{Config.get([:instance, :email], "")}>"
named_version() <> "; " <> info
custom ->
@@ -32,7 +36,7 @@ defmodule Pleroma.Application do
# for more information on OTP Applications
def start(_type, _args) do
Pleroma.HTML.compile_scrubbers()
- Pleroma.Config.DeprecationWarnings.warn()
+ Config.DeprecationWarnings.warn()
Pleroma.Plugs.HTTPSecurityPlug.warn_if_disabled()
Pleroma.Repo.check_migrations_applied!()
setup_instrumenters()
@@ -42,17 +46,17 @@ defmodule Pleroma.Application do
children =
[
Pleroma.Repo,
- Pleroma.Config.TransferTask,
+ Config.TransferTask,
Pleroma.Emoji,
Pleroma.Captcha,
Pleroma.Plugs.RateLimiter.Supervisor
] ++
cachex_children() ++
- hackney_pool_children() ++
+ http_pools_children(Config.get(:env)) ++
[
Pleroma.Stats,
Pleroma.JobQueueMonitor,
- {Oban, Pleroma.Config.get(Oban)}
+ {Oban, Config.get(Oban)}
] ++
task_children(@env) ++
streamer_child(@env) ++
@@ -62,6 +66,18 @@ defmodule Pleroma.Application do
Pleroma.Gopher.Server
]
+ case Pleroma.OTPVersion.check_version() do
+ :ok -> :ok
+ {:error, version} -> raise "
+ !!!OTP VERSION WARNING!!!
+ You are using gun adapter with OTP version #{version}, which doesn't support correct handling of unordered certificates chains.
+ "
+ :undefined -> raise "
+ !!!OTP VERSION WARNING!!!
+ To support correct handling of unordered certificates chains - OTP version must be > 22.2.
+ "
+ end
+
# See http://elixir-lang.org/docs/stable/elixir/Supervisor.html
# for other strategies and supported options
opts = [strategy: :one_for_one, name: Pleroma.Supervisor]
@@ -69,7 +85,7 @@ defmodule Pleroma.Application do
end
def load_custom_modules do
- dir = Pleroma.Config.get([:modules, :runtime_dir])
+ dir = Config.get([:modules, :runtime_dir])
if dir && File.exists?(dir) do
dir
@@ -110,20 +126,6 @@ defmodule Pleroma.Application do
Pleroma.Web.Endpoint.Instrumenter.setup()
end
- def enabled_hackney_pools do
- [:media] ++
- if Application.get_env(:tesla, :adapter) == Tesla.Adapter.Hackney do
- [:federation]
- else
- []
- end ++
- if Pleroma.Config.get([Pleroma.Upload, :proxy_remote]) do
- [:upload]
- else
- []
- end
- end
-
defp cachex_children do
[
build_cachex("used_captcha", ttl_interval: seconds_valid_interval()),
@@ -145,7 +147,7 @@ defmodule Pleroma.Application do
do: expiration(default: :timer.seconds(6 * 60 * 60), interval: :timer.seconds(60))
defp seconds_valid_interval,
- do: :timer.seconds(Pleroma.Config.get!([Pleroma.Captcha, :seconds_valid]))
+ do: :timer.seconds(Config.get!([Pleroma.Captcha, :seconds_valid]))
defp build_cachex(type, opts),
do: %{
@@ -154,7 +156,7 @@ defmodule Pleroma.Application do
type: :worker
}
- defp chat_enabled?, do: Pleroma.Config.get([:chat, :enabled])
+ defp chat_enabled?, do: Config.get([:chat, :enabled])
defp streamer_child(:test), do: []
@@ -168,13 +170,6 @@ defmodule Pleroma.Application do
defp chat_child(_, _), do: []
- defp hackney_pool_children do
- for pool <- enabled_hackney_pools() do
- options = Pleroma.Config.get([:hackney_pools, pool])
- :hackney_pool.child_spec(pool, options)
- end
- end
-
defp task_children(:test) do
[
%{
@@ -199,4 +194,37 @@ defmodule Pleroma.Application do
}
]
end
+
+ # start hackney and gun pools in tests
+ defp http_pools_children(:test) do
+ hackney_options = Config.get([:hackney_pools, :federation])
+ hackney_pool = :hackney_pool.child_spec(:federation, hackney_options)
+ [hackney_pool, Pleroma.Pool.Supervisor]
+ end
+
+ defp http_pools_children(_) do
+ :tesla
+ |> Application.get_env(:adapter)
+ |> http_pools()
+ end
+
+ defp http_pools(Tesla.Adapter.Hackney) do
+ pools = [:federation, :media]
+
+ pools =
+ if Config.get([Pleroma.Upload, :proxy_remote]) do
+ [:upload | pools]
+ else
+ pools
+ end
+
+ for pool <- pools do
+ options = Config.get([:hackney_pools, pool])
+ :hackney_pool.child_spec(pool, options)
+ end
+ end
+
+ defp http_pools(Tesla.Adapter.Gun), do: [Pleroma.Pool.Supervisor]
+
+ defp http_pools(_), do: []
end
diff --git a/lib/pleroma/config/config_db.ex b/lib/pleroma/config/config_db.ex
index 119251bee..bdacefa97 100644
--- a/lib/pleroma/config/config_db.ex
+++ b/lib/pleroma/config/config_db.ex
@@ -278,8 +278,6 @@ defmodule Pleroma.ConfigDB do
}
end
- defp do_convert({:partial_chain, entity}), do: %{"tuple" => [":partial_chain", inspect(entity)]}
-
defp do_convert(entity) when is_tuple(entity) do
value =
entity
@@ -323,15 +321,6 @@ defmodule Pleroma.ConfigDB do
{:proxy_url, {do_transform_string(type), parse_host(host), port}}
end
- defp do_transform(%{"tuple" => [":partial_chain", entity]}) do
- {partial_chain, []} =
- entity
- |> String.replace(~r/[^\w|^{:,[|^,|^[|^\]^}|^\/|^\.|^"]^\s/, "")
- |> Code.eval_string()
-
- {:partial_chain, partial_chain}
- end
-
defp do_transform(%{"tuple" => entity}) do
Enum.reduce(entity, {}, fn val, acc -> Tuple.append(acc, do_transform(val)) end)
end
diff --git a/lib/pleroma/config/transfer_task.ex b/lib/pleroma/config/transfer_task.ex
index 6c5ba1f95..251074aaa 100644
--- a/lib/pleroma/config/transfer_task.ex
+++ b/lib/pleroma/config/transfer_task.ex
@@ -18,7 +18,10 @@ defmodule Pleroma.Config.TransferTask do
{:pleroma, Oban},
{:pleroma, :rate_limit},
{:pleroma, :markup},
- {:plerome, :streamer}
+ {:pleroma, :streamer},
+ {:pleroma, :pools},
+ {:pleroma, :connections_pool},
+ {:tesla, :adapter}
]
@reboot_time_subkeys [
@@ -74,6 +77,28 @@ defmodule Pleroma.Config.TransferTask do
end
end
+ defp group_for_restart(:logger, key, _, merged_value) do
+ # change logger configuration in runtime, without restart
+ if Keyword.keyword?(merged_value) and
+ key not in [:compile_time_application, :backends, :compile_time_purge_matching] do
+ Logger.configure_backend(key, merged_value)
+ else
+ Logger.configure([{key, merged_value}])
+ end
+
+ nil
+ end
+
+ defp group_for_restart(:tesla, _, _, _), do: :pleroma
+
+ defp group_for_restart(group, _, _, _) when group != :pleroma, do: group
+
+ defp group_for_restart(group, key, value, _) do
+ if pleroma_need_restart?(group, key, value) do
+ group
+ end
+ end
+
defp merge_and_update(setting) do
try do
key = ConfigDB.from_string(setting.key)
@@ -95,21 +120,7 @@ defmodule Pleroma.Config.TransferTask do
:ok = update_env(group, key, merged_value)
- if group != :logger do
- if group != :pleroma or pleroma_need_restart?(group, key, value) do
- group
- end
- else
- # change logger configuration in runtime, without restart
- if Keyword.keyword?(merged_value) and
- key not in [:compile_time_application, :backends, :compile_time_purge_matching] do
- Logger.configure_backend(key, merged_value)
- else
- Logger.configure([{key, merged_value}])
- end
-
- nil
- end
+ group_for_restart(group, key, value, merged_value)
rescue
error ->
error_msg =
diff --git a/lib/pleroma/gun/api.ex b/lib/pleroma/gun/api.ex
new file mode 100644
index 000000000..a0c3c5415
--- /dev/null
+++ b/lib/pleroma/gun/api.ex
@@ -0,0 +1,26 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Gun.API do
+ @callback open(charlist(), pos_integer(), map()) :: {:ok, pid()}
+ @callback info(pid()) :: map()
+ @callback close(pid()) :: :ok
+ @callback await_up(pid) :: {:ok, atom()} | {:error, atom()}
+ @callback connect(pid(), map()) :: reference()
+ @callback await(pid(), reference()) :: {:response, :fin, 200, []}
+
+ def open(host, port, opts), do: api().open(host, port, opts)
+
+ def info(pid), do: api().info(pid)
+
+ def close(pid), do: api().close(pid)
+
+ def await_up(pid), do: api().await_up(pid)
+
+ def connect(pid, opts), do: api().connect(pid, opts)
+
+ def await(pid, ref), do: api().await(pid, ref)
+
+ defp api, do: Pleroma.Config.get([Pleroma.Gun.API], Pleroma.Gun)
+end
diff --git a/lib/pleroma/gun/api/mock.ex b/lib/pleroma/gun/api/mock.ex
new file mode 100644
index 000000000..0134b016e
--- /dev/null
+++ b/lib/pleroma/gun/api/mock.ex
@@ -0,0 +1,151 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Gun.API.Mock do
+ @behaviour Pleroma.Gun.API
+
+ alias Pleroma.Gun.API
+
+ @impl API
+ def open('some-domain.com', 443, _) do
+ {:ok, conn_pid} = Task.start_link(fn -> Process.sleep(1_000) end)
+
+ Registry.register(API.Mock, conn_pid, %{
+ origin_scheme: "https",
+ origin_host: 'some-domain.com',
+ origin_port: 443
+ })
+
+ {:ok, conn_pid}
+ end
+
+ @impl API
+ def open(ip, port, _)
+ when ip in [{10_755, 10_368, 61_708, 131, 64_206, 45_068, 0, 9_694}, {127, 0, 0, 1}] and
+ port in [80, 443] do
+ {:ok, conn_pid} = Task.start_link(fn -> Process.sleep(1_000) end)
+
+ scheme = if port == 443, do: "https", else: "http"
+
+ Registry.register(API.Mock, conn_pid, %{
+ origin_scheme: scheme,
+ origin_host: ip,
+ origin_port: port
+ })
+
+ {:ok, conn_pid}
+ end
+
+ @impl API
+ def open('localhost', 1234, %{
+ protocols: [:socks],
+ proxy: {:socks5, 'localhost', 1234},
+ socks_opts: %{host: 'proxy-socks.com', port: 80, version: 5}
+ }) do
+ {:ok, conn_pid} = Task.start_link(fn -> Process.sleep(1_000) end)
+
+ Registry.register(API.Mock, conn_pid, %{
+ origin_scheme: "http",
+ origin_host: 'proxy-socks.com',
+ origin_port: 80
+ })
+
+ {:ok, conn_pid}
+ end
+
+ @impl API
+ def open('localhost', 1234, %{
+ protocols: [:socks],
+ proxy: {:socks4, 'localhost', 1234},
+ socks_opts: %{
+ host: 'proxy-socks.com',
+ port: 443,
+ protocols: [:http2],
+ tls_opts: [],
+ transport: :tls,
+ version: 4
+ }
+ }) do
+ {:ok, conn_pid} = Task.start_link(fn -> Process.sleep(1_000) end)
+
+ Registry.register(API.Mock, conn_pid, %{
+ origin_scheme: "https",
+ origin_host: 'proxy-socks.com',
+ origin_port: 443
+ })
+
+ {:ok, conn_pid}
+ end
+
+ @impl API
+ def open('gun-not-up.com', 80, _opts), do: {:error, :timeout}
+
+ @impl API
+ def open('example.com', port, _) when port in [443, 115] do
+ {:ok, conn_pid} = Task.start_link(fn -> Process.sleep(1_000) end)
+
+ Registry.register(API.Mock, conn_pid, %{
+ origin_scheme: "https",
+ origin_host: 'example.com',
+ origin_port: 443
+ })
+
+ {:ok, conn_pid}
+ end
+
+ @impl API
+ def open(domain, 80, _) do
+ {:ok, conn_pid} = Task.start_link(fn -> Process.sleep(1_000) end)
+
+ Registry.register(API.Mock, conn_pid, %{
+ origin_scheme: "http",
+ origin_host: domain,
+ origin_port: 80
+ })
+
+ {:ok, conn_pid}
+ end
+
+ @impl API
+ def open({127, 0, 0, 1}, 8123, _) do
+ Task.start_link(fn -> Process.sleep(1_000) end)
+ end
+
+ @impl API
+ def open('localhost', 9050, _) do
+ Task.start_link(fn -> Process.sleep(1_000) end)
+ end
+
+ @impl API
+ def await_up(_pid), do: {:ok, :http}
+
+ @impl API
+ def connect(pid, %{host: _, port: 80}) do
+ ref = make_ref()
+ Registry.register(API.Mock, ref, pid)
+ ref
+ end
+
+ @impl API
+ def connect(pid, %{host: _, port: 443, protocols: [:http2], transport: :tls}) do
+ ref = make_ref()
+ Registry.register(API.Mock, ref, pid)
+ ref
+ end
+
+ @impl API
+ def await(pid, ref) do
+ [{_, ^pid}] = Registry.lookup(API.Mock, ref)
+ {:response, :fin, 200, []}
+ end
+
+ @impl API
+ def info(pid) do
+ [{_, info}] = Registry.lookup(API.Mock, pid)
+ info
+ end
+
+ @impl API
+ def close(_pid), do: :ok
+end
diff --git a/lib/pleroma/gun/conn.ex b/lib/pleroma/gun/conn.ex
new file mode 100644
index 000000000..2474829d6
--- /dev/null
+++ b/lib/pleroma/gun/conn.ex
@@ -0,0 +1,29 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Gun.Conn do
+ @moduledoc """
+ Struct for gun connection data
+ """
+ @type gun_state :: :up | :down
+ @type conn_state :: :active | :idle
+
+ @type t :: %__MODULE__{
+ conn: pid(),
+ gun_state: gun_state(),
+ conn_state: conn_state(),
+ used_by: [pid()],
+ last_reference: pos_integer(),
+ crf: float(),
+ retries: pos_integer()
+ }
+
+ defstruct conn: nil,
+ gun_state: :open,
+ conn_state: :init,
+ used_by: [],
+ last_reference: 0,
+ crf: 1,
+ retries: 0
+end
diff --git a/lib/pleroma/gun/gun.ex b/lib/pleroma/gun/gun.ex
new file mode 100644
index 000000000..4a1bbc95f
--- /dev/null
+++ b/lib/pleroma/gun/gun.ex
@@ -0,0 +1,45 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Gun do
+ @behaviour Pleroma.Gun.API
+
+ alias Pleroma.Gun.API
+
+ @gun_keys [
+ :connect_timeout,
+ :http_opts,
+ :http2_opts,
+ :protocols,
+ :retry,
+ :retry_timeout,
+ :trace,
+ :transport,
+ :tls_opts,
+ :tcp_opts,
+ :socks_opts,
+ :ws_opts
+ ]
+
+ @impl API
+ def open(host, port, opts \\ %{}), do: :gun.open(host, port, Map.take(opts, @gun_keys))
+
+ @impl API
+ defdelegate info(pid), to: :gun
+
+ @impl API
+ defdelegate close(pid), to: :gun
+
+ @impl API
+ defdelegate await_up(pid), to: :gun
+
+ @impl API
+ defdelegate connect(pid, opts), to: :gun
+
+ @impl API
+ defdelegate await(pid, ref), to: :gun
+
+ @spec flush(pid() | reference()) :: :ok
+ defdelegate flush(pid), to: :gun
+end
diff --git a/lib/pleroma/http/adapter.ex b/lib/pleroma/http/adapter.ex
new file mode 100644
index 000000000..6166a3eb4
--- /dev/null
+++ b/lib/pleroma/http/adapter.ex
@@ -0,0 +1,64 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.HTTP.Adapter do
+ alias Pleroma.HTTP.Connection
+
+ @type proxy ::
+ {Connection.host(), pos_integer()}
+ | {Connection.proxy_type(), pos_integer()}
+ @type host_type :: :domain | :ip
+
+ @callback options(keyword(), URI.t()) :: keyword()
+ @callback after_request(keyword()) :: :ok
+
+ @spec options(keyword(), URI.t()) :: keyword()
+ def options(opts, _uri) do
+ proxy = Pleroma.Config.get([:http, :proxy_url], nil)
+ maybe_add_proxy(opts, format_proxy(proxy))
+ end
+
+ @spec maybe_get_conn(URI.t(), keyword()) :: keyword()
+ def maybe_get_conn(_uri, opts), do: opts
+
+ @spec after_request(keyword()) :: :ok
+ def after_request(_opts), do: :ok
+
+ @spec format_proxy(String.t() | tuple() | nil) :: proxy() | nil
+ def format_proxy(nil), do: nil
+
+ def format_proxy(proxy_url) do
+ with {:ok, host, port} <- Connection.parse_proxy(proxy_url) do
+ {host, port}
+ else
+ {:ok, type, host, port} -> {type, host, port}
+ _ -> nil
+ end
+ end
+
+ @spec maybe_add_proxy(keyword(), proxy() | nil) :: keyword()
+ def maybe_add_proxy(opts, nil), do: opts
+ def maybe_add_proxy(opts, proxy), do: Keyword.put_new(opts, :proxy, proxy)
+
+ @spec domain_or_fallback(String.t()) :: charlist()
+ def domain_or_fallback(host) do
+ case domain_or_ip(host) do
+ {:domain, domain} -> domain
+ {:ip, _ip} -> to_charlist(host)
+ end
+ end
+
+ @spec domain_or_ip(String.t()) :: {host_type(), Connection.host()}
+ def domain_or_ip(host) do
+ charlist = to_charlist(host)
+
+ case :inet.parse_address(charlist) do
+ {:error, :einval} ->
+ {:domain, :idna.encode(charlist)}
+
+ {:ok, ip} when is_tuple(ip) and tuple_size(ip) in [4, 8] ->
+ {:ip, ip}
+ end
+ end
+end
diff --git a/lib/pleroma/http/adapter/gun.ex b/lib/pleroma/http/adapter/gun.ex
new file mode 100644
index 000000000..f25afeda7
--- /dev/null
+++ b/lib/pleroma/http/adapter/gun.ex
@@ -0,0 +1,123 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.HTTP.Adapter.Gun do
+ @behaviour Pleroma.HTTP.Adapter
+
+ alias Pleroma.HTTP.Adapter
+
+ require Logger
+
+ alias Pleroma.Pool.Connections
+
+ @defaults [
+ connect_timeout: 20_000,
+ domain_lookup_timeout: 5_000,
+ tls_handshake_timeout: 5_000,
+ retry_timeout: 100,
+ await_up_timeout: 5_000
+ ]
+
+ @spec options(keyword(), URI.t()) :: keyword()
+ def options(connection_opts \\ [], %URI{} = uri) do
+ proxy = Pleroma.Config.get([:http, :proxy_url], nil)
+
+ @defaults
+ |> Keyword.merge(Pleroma.Config.get([:http, :adapter], []))
+ |> add_original(uri)
+ |> add_scheme_opts(uri)
+ |> Adapter.maybe_add_proxy(Adapter.format_proxy(proxy))
+ |> maybe_get_conn(uri, connection_opts)
+ end
+
+ @spec after_request(keyword()) :: :ok
+ def after_request(opts) do
+ with conn when not is_nil(conn) <- opts[:conn],
+ body_as when body_as != :chunks <- opts[:body_as] do
+ Connections.checkout(conn, self(), :gun_connections)
+ end
+
+ :ok
+ end
+
+ defp add_original(opts, %URI{host: host, port: port}) do
+ formatted_host = Adapter.domain_or_fallback(host)
+
+ Keyword.put(opts, :original, "#{formatted_host}:#{port}")
+ end
+
+ defp add_scheme_opts(opts, %URI{scheme: "http"}), do: opts
+
+ defp add_scheme_opts(opts, %URI{scheme: "https", host: host, port: port}) do
+ adapter_opts = [
+ certificates_verification: true,
+ tls_opts: [
+ verify: :verify_peer,
+ cacertfile: CAStore.file_path(),
+ depth: 20,
+ reuse_sessions: false,
+ verify_fun:
+ {&:ssl_verify_hostname.verify_fun/3, [check_hostname: Adapter.domain_or_fallback(host)]}
+ ]
+ ]
+
+ adapter_opts =
+ if port != 443 do
+ Keyword.put(adapter_opts, :transport, :tls)
+ else
+ adapter_opts
+ end
+
+ Keyword.merge(opts, adapter_opts)
+ end
+
+ defp maybe_get_conn(adapter_opts, uri, connection_opts) do
+ {receive_conn?, opts} =
+ adapter_opts
+ |> Keyword.merge(connection_opts)
+ |> Keyword.pop(:receive_conn, true)
+
+ if Connections.alive?(:gun_connections) and receive_conn? do
+ try_to_get_conn(uri, opts)
+ else
+ opts
+ end
+ end
+
+ defp try_to_get_conn(uri, opts) do
+ try do
+ case Connections.checkin(uri, :gun_connections) do
+ nil ->
+ Logger.info(
+ "Gun connections pool checkin was not succesfull. Trying to open conn for next request."
+ )
+
+ :ok = Connections.open_conn(uri, :gun_connections, opts)
+ opts
+
+ conn when is_pid(conn) ->
+ Logger.debug("received conn #{inspect(conn)} #{Connections.compose_uri(uri)}")
+
+ opts
+ |> Keyword.put(:conn, conn)
+ |> Keyword.put(:close_conn, false)
+ end
+ rescue
+ error ->
+ Logger.warn("Gun connections pool checkin caused error #{inspect(error)}")
+ opts
+ catch
+ :exit, {:timeout, _} ->
+ Logger.info(
+ "Gun connections pool checkin with timeout error #{Connections.compose_uri(uri)}"
+ )
+
+ opts
+
+ :exit, error ->
+ Logger.warn("Gun pool checkin exited with error #{inspect(error)}")
+ opts
+ end
+ end
+end
diff --git a/lib/pleroma/http/adapter/hackney.ex b/lib/pleroma/http/adapter/hackney.ex
new file mode 100644
index 000000000..00db30083
--- /dev/null
+++ b/lib/pleroma/http/adapter/hackney.ex
@@ -0,0 +1,41 @@
+defmodule Pleroma.HTTP.Adapter.Hackney do
+ @behaviour Pleroma.HTTP.Adapter
+
+ @defaults [
+ connect_timeout: 10_000,
+ recv_timeout: 20_000,
+ follow_redirect: true,
+ force_redirect: true,
+ pool: :federation
+ ]
+
+ @spec options(keyword(), URI.t()) :: keyword()
+ def options(connection_opts \\ [], %URI{} = uri) do
+ proxy = Pleroma.Config.get([:http, :proxy_url], nil)
+
+ @defaults
+ |> Keyword.merge(Pleroma.Config.get([:http, :adapter], []))
+ |> Keyword.merge(connection_opts)
+ |> add_scheme_opts(uri)
+ |> Pleroma.HTTP.Adapter.maybe_add_proxy(proxy)
+ end
+
+ defp add_scheme_opts(opts, %URI{scheme: "http"}), do: opts
+
+ defp add_scheme_opts(opts, %URI{scheme: "https", host: host}) do
+ ssl_opts = [
+ ssl_options: [
+ # Workaround for remote server certificate chain issues
+ partial_chain: &:hackney_connect.partial_chain/1,
+
+ # We don't support TLS v1.3 yet
+ versions: [:tlsv1, :"tlsv1.1", :"tlsv1.2"],
+ server_name_indication: to_charlist(host)
+ ]
+ ]
+
+ Keyword.merge(opts, ssl_opts)
+ end
+
+ def after_request(_), do: :ok
+end
diff --git a/lib/pleroma/http/connection.ex b/lib/pleroma/http/connection.ex
index 7e2c6f5e8..85918341a 100644
--- a/lib/pleroma/http/connection.ex
+++ b/lib/pleroma/http/connection.ex
@@ -4,40 +4,99 @@
defmodule Pleroma.HTTP.Connection do
@moduledoc """
- Connection for http-requests.
+ Configure Tesla.Client with default and customized adapter options.
"""
+ @type ip_address :: ipv4_address() | ipv6_address()
+ @type ipv4_address :: {0..255, 0..255, 0..255, 0..255}
+ @type ipv6_address ::
+ {0..65_535, 0..65_535, 0..65_535, 0..65_535, 0..65_535, 0..65_535, 0..65_535, 0..65_535}
+ @type proxy_type() :: :socks4 | :socks5
+ @type host() :: charlist() | ip_address()
- @hackney_options [
- connect_timeout: 10_000,
- recv_timeout: 20_000,
- follow_redirect: true,
- force_redirect: true,
- pool: :federation
- ]
- @adapter Application.get_env(:tesla, :adapter)
+ @defaults [pool: :federation]
- @doc """
- Configure a client connection
+ require Logger
- # Returns
+ alias Pleroma.Config
+ alias Pleroma.HTTP.Adapter
- Tesla.Env.client
+ @doc """
+ Merge default connection & adapter options with received ones.
"""
- @spec new(Keyword.t()) :: Tesla.Env.client()
- def new(opts \\ []) do
- Tesla.client([], {@adapter, hackney_options(opts)})
+
+ @spec options(URI.t(), keyword()) :: keyword()
+ def options(%URI{} = uri, opts \\ []) do
+ @defaults
+ |> pool_timeout()
+ |> Keyword.merge(opts)
+ |> adapter().options(uri)
+ end
+
+ defp pool_timeout(opts) do
+ timeout =
+ Config.get([:pools, opts[:pool], :timeout]) || Config.get([:pools, :default, :timeout])
+
+ Keyword.merge(opts, timeout: timeout)
end
- # fetch Hackney options
- #
- def hackney_options(opts) do
- options = Keyword.get(opts, :adapter, [])
- adapter_options = Pleroma.Config.get([:http, :adapter], [])
- proxy_url = Pleroma.Config.get([:http, :proxy_url], nil)
-
- @hackney_options
- |> Keyword.merge(adapter_options)
- |> Keyword.merge(options)
- |> Keyword.merge(proxy: proxy_url)
+ @spec after_request(keyword()) :: :ok
+ def after_request(opts), do: adapter().after_request(opts)
+
+ defp adapter do
+ case Application.get_env(:tesla, :adapter) do
+ Tesla.Adapter.Gun -> Adapter.Gun
+ Tesla.Adapter.Hackney -> Adapter.Hackney
+ _ -> Adapter
+ end
+ end
+
+ @spec parse_proxy(String.t() | tuple() | nil) ::
+ {:ok, host(), pos_integer()}
+ | {:ok, proxy_type(), host(), pos_integer()}
+ | {:error, atom()}
+ | nil
+
+ def parse_proxy(nil), do: nil
+
+ def parse_proxy(proxy) when is_binary(proxy) do
+ with [host, port] <- String.split(proxy, ":"),
+ {port, ""} <- Integer.parse(port) do
+ {:ok, parse_host(host), port}
+ else
+ {_, _} ->
+ Logger.warn("parsing port in proxy fail #{inspect(proxy)}")
+ {:error, :error_parsing_port_in_proxy}
+
+ :error ->
+ Logger.warn("parsing port in proxy fail #{inspect(proxy)}")
+ {:error, :error_parsing_port_in_proxy}
+
+ _ ->
+ Logger.warn("parsing proxy fail #{inspect(proxy)}")
+ {:error, :error_parsing_proxy}
+ end
+ end
+
+ def parse_proxy(proxy) when is_tuple(proxy) do
+ with {type, host, port} <- proxy do
+ {:ok, type, parse_host(host), port}
+ else
+ _ ->
+ Logger.warn("parsing proxy fail #{inspect(proxy)}")
+ {:error, :error_parsing_proxy}
+ end
+ end
+
+ @spec parse_host(String.t() | atom() | charlist()) :: charlist() | ip_address()
+ def parse_host(host) when is_list(host), do: host
+ def parse_host(host) when is_atom(host), do: to_charlist(host)
+
+ def parse_host(host) when is_binary(host) do
+ host = to_charlist(host)
+
+ case :inet.parse_address(host) do
+ {:error, :einval} -> host
+ {:ok, ip} -> ip
+ end
end
end
diff --git a/lib/pleroma/http/http.ex b/lib/pleroma/http/http.ex
index dec24458a..ad47dc936 100644
--- a/lib/pleroma/http/http.ex
+++ b/lib/pleroma/http/http.ex
@@ -4,21 +4,47 @@
defmodule Pleroma.HTTP do
@moduledoc """
-
+ Wrapper for `Tesla.request/2`.
"""
alias Pleroma.HTTP.Connection
+ alias Pleroma.HTTP.Request
alias Pleroma.HTTP.RequestBuilder, as: Builder
+ alias Tesla.Client
+ alias Tesla.Env
+
+ require Logger
@type t :: __MODULE__
@doc """
- Builds and perform http request.
+ Performs GET request.
+
+ See `Pleroma.HTTP.request/5`
+ """
+ @spec get(Request.url() | nil, Request.headers(), keyword()) ::
+ nil | {:ok, Env.t()} | {:error, any()}
+ def get(url, headers \\ [], options \\ [])
+ def get(nil, _, _), do: nil
+ def get(url, headers, options), do: request(:get, url, "", headers, options)
+
+ @doc """
+ Performs POST request.
+
+ See `Pleroma.HTTP.request/5`
+ """
+ @spec post(Request.url(), String.t(), Request.headers(), keyword()) ::
+ {:ok, Env.t()} | {:error, any()}
+ def post(url, body, headers \\ [], options \\ []),
+ do: request(:post, url, body, headers, options)
+
+ @doc """
+ Builds and performs http request.
# Arguments:
`method` - :get, :post, :put, :delete
- `url`
- `body`
+ `url` - full url
+ `body` - request body
`headers` - a keyworld list of headers, e.g. `[{"content-type", "text/plain"}]`
`options` - custom, per-request middleware or adapter options
@@ -26,61 +52,97 @@ defmodule Pleroma.HTTP do
`{:ok, %Tesla.Env{}}` or `{:error, error}`
"""
- def request(method, url, body \\ "", headers \\ [], options \\ []) do
+ @spec request(atom(), Request.url(), String.t(), Request.headers(), keyword()) ::
+ {:ok, Env.t()} | {:error, any()}
+ def request(method, url, body, headers, options) when is_binary(url) do
+ with uri <- URI.parse(url),
+ received_adapter_opts <- Keyword.get(options, :adapter, []),
+ adapter_opts <- Connection.options(uri, received_adapter_opts),
+ options <- put_in(options[:adapter], adapter_opts),
+ params <- Keyword.get(options, :params, []),
+ request <- build_request(method, headers, options, url, body, params),
+ client <- Tesla.client([Tesla.Middleware.FollowRedirects], tesla_adapter()),
+ pid <- Process.whereis(adapter_opts[:pool]) do
+ pool_alive? =
+ if tesla_adapter() == Tesla.Adapter.Gun do
+ if pid, do: Process.alive?(pid), else: false
+ else
+ false
+ end
+
+ request_opts =
+ adapter_opts
+ |> Enum.into(%{})
+ |> Map.put(:env, Pleroma.Config.get([:env]))
+ |> Map.put(:pool_alive?, pool_alive?)
+
+ response =
+ request(
+ client,
+ request,
+ request_opts
+ )
+
+ Connection.after_request(adapter_opts)
+
+ response
+ end
+ end
+
+ @spec request(Client.t(), keyword(), map()) :: {:ok, Env.t()} | {:error, any()}
+ def request(%Client{} = client, request, %{env: :test}), do: request_try(client, request)
+
+ def request(%Client{} = client, request, %{body_as: :chunks}) do
+ request_try(client, request)
+ end
+
+ def request(%Client{} = client, request, %{pool_alive?: false}) do
+ request_try(client, request)
+ end
+
+ def request(%Client{} = client, request, %{pool: pool, timeout: timeout}) do
try do
- options =
- process_request_options(options)
- |> process_sni_options(url)
-
- params = Keyword.get(options, :params, [])
-
- %{}
- |> Builder.method(method)
- |> Builder.headers(headers)
- |> Builder.opts(options)
- |> Builder.url(url)
- |> Builder.add_param(:body, :body, body)
- |> Builder.add_param(:query, :query, params)
- |> Enum.into([])
- |> (&Tesla.request(Connection.new(options), &1)).()
+ :poolboy.transaction(
+ pool,
+ &Pleroma.Pool.Request.execute(&1, client, request, timeout + 500),
+ timeout + 1_000
+ )
rescue
e ->
{:error, e}
catch
+ :exit, {:timeout, _} ->
+ Logger.warn("Receive response from pool failed #{request[:url]}")
+ {:error, :recv_pool_timeout}
+
:exit, e ->
{:error, e}
end
end
- defp process_sni_options(options, nil), do: options
-
- defp process_sni_options(options, url) do
- uri = URI.parse(url)
- host = uri.host |> to_charlist()
-
- case uri.scheme do
- "https" -> options ++ [ssl: [server_name_indication: host]]
- _ -> options
+ @spec request_try(Client.t(), keyword()) :: {:ok, Env.t()} | {:error, any()}
+ def request_try(client, request) do
+ try do
+ Tesla.request(client, request)
+ rescue
+ e ->
+ {:error, e}
+ catch
+ :exit, e ->
+ {:error, e}
end
end
- def process_request_options(options) do
- Keyword.merge(Pleroma.HTTP.Connection.hackney_options([]), options)
+ defp build_request(method, headers, options, url, body, params) do
+ Builder.new()
+ |> Builder.method(method)
+ |> Builder.headers(headers)
+ |> Builder.opts(options)
+ |> Builder.url(url)
+ |> Builder.add_param(:body, :body, body)
+ |> Builder.add_param(:query, :query, params)
+ |> Builder.convert_to_keyword()
end
- @doc """
- Performs GET request.
-
- See `Pleroma.HTTP.request/5`
- """
- def get(url, headers \\ [], options \\ []),
- do: request(:get, url, "", headers, options)
-
- @doc """
- Performs POST request.
-
- See `Pleroma.HTTP.request/5`
- """
- def post(url, body, headers \\ [], options \\ []),
- do: request(:post, url, body, headers, options)
+ defp tesla_adapter, do: Application.get_env(:tesla, :adapter)
end
diff --git a/lib/pleroma/http/request.ex b/lib/pleroma/http/request.ex
new file mode 100644
index 000000000..891d88d53
--- /dev/null
+++ b/lib/pleroma/http/request.ex
@@ -0,0 +1,23 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.HTTP.Request do
+ @moduledoc """
+ Request struct.
+ """
+ defstruct method: :get, url: "", query: [], headers: [], body: "", opts: []
+
+ @type method :: :head | :get | :delete | :trace | :options | :post | :put | :patch
+ @type url :: String.t()
+ @type headers :: [{String.t(), String.t()}]
+
+ @type t :: %__MODULE__{
+ method: method(),
+ url: url(),
+ query: keyword(),
+ headers: headers(),
+ body: String.t(),
+ opts: keyword()
+ }
+end
diff --git a/lib/pleroma/http/request_builder.ex b/lib/pleroma/http/request_builder.ex
index e23457999..491acd0f9 100644
--- a/lib/pleroma/http/request_builder.ex
+++ b/lib/pleroma/http/request_builder.ex
@@ -7,77 +7,54 @@ defmodule Pleroma.HTTP.RequestBuilder do
Helper functions for building Tesla requests
"""
- @doc """
- Specify the request method when building a request
-
- ## Parameters
-
- - request (Map) - Collected request options
- - m (atom) - Request method
+ alias Pleroma.HTTP.Request
+ alias Tesla.Multipart
- ## Returns
-
- Map
+ @doc """
+ Creates new request
"""
- @spec method(map(), atom) :: map()
- def method(request, m) do
- Map.put_new(request, :method, m)
- end
+ @spec new(Request.t()) :: Request.t()
+ def new(%Request{} = request \\ %Request{}), do: request
@doc """
Specify the request method when building a request
+ """
+ @spec method(Request.t(), Request.method()) :: Request.t()
+ def method(request, m), do: %{request | method: m}
- ## Parameters
-
- - request (Map) - Collected request options
- - u (String) - Request URL
-
- ## Returns
-
- Map
+ @doc """
+ Specify the request method when building a request
"""
- @spec url(map(), String.t()) :: map()
- def url(request, u) do
- Map.put_new(request, :url, u)
- end
+ @spec url(Request.t(), Request.url()) :: Request.t()
+ def url(request, u), do: %{request | url: u}
@doc """
Add headers to the request
"""
- @spec headers(map(), list(tuple)) :: map()
- def headers(request, header_list) do
- header_list =
+ @spec headers(Request.t(), Request.headers()) :: Request.t()
+ def headers(request, headers) do
+ headers_list =
if Pleroma.Config.get([:http, :send_user_agent]) do
- header_list ++ [{"User-Agent", Pleroma.Application.user_agent()}]
+ headers ++ [{"user-agent", Pleroma.Application.user_agent()}]
else
- header_list
+ headers
end
- Map.put_new(request, :headers, header_list)
+ %{request | headers: headers_list}
end
@doc """
Add custom, per-request middleware or adapter options to the request
"""
- @spec opts(map(), Keyword.t()) :: map()
- def opts(request, options) do
- Map.put_new(request, :opts, options)
- end
+ @spec opts(Request.t(), keyword()) :: Request.t()
+ def opts(request, options), do: %{request | opts: options}
+ # NOTE: isn't used anywhere
@doc """
Add optional parameters to the request
- ## Parameters
-
- - request (Map) - Collected request options
- - definitions (Map) - Map of parameter name to parameter location.
- - options (KeywordList) - The provided optional parameters
-
- ## Returns
-
- Map
"""
- @spec add_optional_params(map(), %{optional(atom) => atom}, keyword()) :: map()
+ @spec add_optional_params(Request.t(), %{optional(atom) => atom}, keyword()) :: map()
def add_optional_params(request, _, []), do: request
def add_optional_params(request, definitions, [{key, value} | tail]) do
@@ -94,49 +71,43 @@ defmodule Pleroma.HTTP.RequestBuilder do
@doc """
Add optional parameters to the request
-
- ## Parameters
-
- - request (Map) - Collected request options
- - location (atom) - Where to put the parameter
- - key (atom) - The name of the parameter
- - value (any) - The value of the parameter
-
- ## Returns
-
- Map
"""
- @spec add_param(map(), atom, atom, any()) :: map()
- def add_param(request, :query, :query, values), do: Map.put(request, :query, values)
+ @spec add_param(Request.t(), atom(), atom(), any()) :: Request.t()
+ def add_param(request, :query, :query, values), do: %{request | query: values}
- def add_param(request, :body, :body, value), do: Map.put(request, :body, value)
+ def add_param(request, :body, :body, value), do: %{request | body: value}
def add_param(request, :body, key, value) do
request
- |> Map.put_new_lazy(:body, &Tesla.Multipart.new/0)
+ |> Map.put(:body, Multipart.new())
|> Map.update!(
:body,
- &Tesla.Multipart.add_field(
+ &Multipart.add_field(
&1,
key,
Jason.encode!(value),
- headers: [{:"Content-Type", "application/json"}]
+ headers: [{"content-type", "application/json"}]
)
)
end
def add_param(request, :file, name, path) do
request
- |> Map.put_new_lazy(:body, &Tesla.Multipart.new/0)
- |> Map.update!(:body, &Tesla.Multipart.add_file(&1, path, name: name))
+ |> Map.put(:body, Multipart.new())
+ |> Map.update!(:body, &Multipart.add_file(&1, path, name: name))
end
def add_param(request, :form, name, value) do
- request
- |> Map.update(:body, %{name => value}, &Map.put(&1, name, value))
+ Map.update(request, :body, %{name => value}, &Map.put(&1, name, value))
end
def add_param(request, location, key, value) do
Map.update(request, location, [{key, value}], &(&1 ++ [{key, value}]))
end
+
+ def convert_to_keyword(request) do
+ request
+ |> Map.from_struct()
+ |> Enum.into([])
+ end
end
diff --git a/lib/pleroma/object/fetcher.ex b/lib/pleroma/object/fetcher.ex
index 037c42339..5e9bf1574 100644
--- a/lib/pleroma/object/fetcher.ex
+++ b/lib/pleroma/object/fetcher.ex
@@ -137,7 +137,7 @@ defmodule Pleroma.Object.Fetcher do
date: date
})
- [{:Signature, signature}]
+ [{"signature", signature}]
end
defp sign_fetch(headers, id, date) do
@@ -150,7 +150,7 @@ defmodule Pleroma.Object.Fetcher do
defp maybe_date_fetch(headers, date) do
if Pleroma.Config.get([:activitypub, :sign_object_fetches]) do
- headers ++ [{:Date, date}]
+ headers ++ [{"date", date}]
else
headers
end
@@ -162,7 +162,7 @@ defmodule Pleroma.Object.Fetcher do
date = Pleroma.Signature.signed_date()
headers =
- [{:Accept, "application/activity+json"}]
+ [{"accept", "application/activity+json"}]
|> maybe_date_fetch(date)
|> sign_fetch(id, date)
diff --git a/lib/pleroma/otp_version.ex b/lib/pleroma/otp_version.ex
new file mode 100644
index 000000000..0be189304
--- /dev/null
+++ b/lib/pleroma/otp_version.ex
@@ -0,0 +1,63 @@
+defmodule Pleroma.OTPVersion do
+ @type check_status() :: :undefined | {:error, String.t()} | :ok
+
+ require Logger
+
+ @spec check_version() :: check_status()
+ def check_version do
+ # OTP Version https://erlang.org/doc/system_principles/versions.html#otp-version
+ paths = [
+ Path.join(:code.root_dir(), "OTP_VERSION"),
+ Path.join([:code.root_dir(), "releases", :erlang.system_info(:otp_release), "OTP_VERSION"])
+ ]
+
+ :tesla
+ |> Application.get_env(:adapter)
+ |> get_and_check_version(paths)
+ end
+
+ @spec get_and_check_version(module(), [Path.t()]) :: check_status()
+ def get_and_check_version(Tesla.Adapter.Gun, paths) do
+ paths
+ |> check_files()
+ |> check_version()
+ end
+
+ def get_and_check_version(_, _), do: :ok
+
+ defp check_files([]), do: nil
+
+ defp check_files([path | paths]) do
+ if File.exists?(path) do
+ File.read!(path)
+ else
+ check_files(paths)
+ end
+ end
+
+ defp check_version(nil), do: :undefined
+
+ defp check_version(version) do
+ try do
+ version = String.replace(version, ~r/\r|\n|\s/, "")
+
+ formatted =
+ version
+ |> String.split(".")
+ |> Enum.map(&String.to_integer/1)
+ |> Enum.take(2)
+
+ with [major, minor] when length(formatted) == 2 <- formatted,
+ true <- (major == 22 and minor >= 2) or major > 22 do
+ :ok
+ else
+ false -> {:error, version}
+ _ -> :undefined
+ end
+ rescue
+ _ -> :undefined
+ catch
+ _ -> :undefined
+ end
+ end
+end
diff --git a/lib/pleroma/pool/connections.ex b/lib/pleroma/pool/connections.ex
new file mode 100644
index 000000000..1ed16d1c1
--- /dev/null
+++ b/lib/pleroma/pool/connections.ex
@@ -0,0 +1,415 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Pool.Connections do
+ use GenServer
+
+ require Logger
+
+ @type domain :: String.t()
+ @type conn :: Pleroma.Gun.Conn.t()
+
+ @type t :: %__MODULE__{
+ conns: %{domain() => conn()},
+ opts: keyword()
+ }
+
+ defstruct conns: %{}, opts: []
+
+ alias Pleroma.Gun.API
+ alias Pleroma.Gun.Conn
+
+ @spec start_link({atom(), keyword()}) :: {:ok, pid()}
+ def start_link({name, opts}) do
+ GenServer.start_link(__MODULE__, opts, name: name)
+ end
+
+ @impl true
+ def init(opts), do: {:ok, %__MODULE__{conns: %{}, opts: opts}}
+
+ @spec checkin(String.t() | URI.t(), atom()) :: pid() | nil
+ def checkin(url, name)
+ def checkin(url, name) when is_binary(url), do: checkin(URI.parse(url), name)
+
+ def checkin(%URI{} = uri, name) do
+ timeout = Pleroma.Config.get([:connections_pool, :receive_connection_timeout], 250)
+
+ GenServer.call(
+ name,
+ {:checkin, uri},
+ timeout
+ )
+ end
+
+ @spec open_conn(String.t() | URI.t(), atom(), keyword()) :: :ok
+ def open_conn(url, name, opts \\ [])
+ def open_conn(url, name, opts) when is_binary(url), do: open_conn(URI.parse(url), name, opts)
+
+ def open_conn(%URI{} = uri, name, opts) do
+ pool_opts = Pleroma.Config.get([:connections_pool], [])
+
+ opts =
+ opts
+ |> Enum.into(%{})
+ |> Map.put_new(:receive, false)
+ |> Map.put_new(:retry, pool_opts[:retry] || 5)
+ |> Map.put_new(:retry_timeout, pool_opts[:retry_timeout] || 100)
+ |> Map.put_new(:await_up_timeout, pool_opts[:await_up_timeout] || 5_000)
+
+ GenServer.cast(name, {:open_conn, %{opts: opts, uri: uri}})
+ end
+
+ @spec alive?(atom()) :: boolean()
+ def alive?(name) do
+ pid = Process.whereis(name)
+ if pid, do: Process.alive?(pid), else: false
+ end
+
+ @spec get_state(atom()) :: t()
+ def get_state(name) do
+ GenServer.call(name, :state)
+ end
+
+ @spec checkout(pid(), pid(), atom()) :: :ok
+ def checkout(conn, pid, name) do
+ GenServer.cast(name, {:checkout, conn, pid})
+ end
+
+ @impl true
+ def handle_cast({:open_conn, %{opts: opts, uri: uri}}, state) do
+ Logger.debug("opening new #{compose_uri(uri)}")
+ max_connections = state.opts[:max_connections]
+
+ key = compose_key(uri)
+
+ if Enum.count(state.conns) < max_connections do
+ open_conn(key, uri, state, opts)
+ else
+ try_to_open_conn(key, uri, state, opts)
+ end
+ end
+
+ @impl true
+ def handle_cast({:checkout, conn_pid, pid}, state) do
+ Logger.debug("checkout #{inspect(conn_pid)}")
+
+ state =
+ with true <- Process.alive?(conn_pid),
+ {key, conn} <- find_conn(state.conns, conn_pid),
+ used_by <- List.keydelete(conn.used_by, pid, 0) do
+ conn_state =
+ if used_by == [] do
+ :idle
+ else
+ conn.conn_state
+ end
+
+ put_in(state.conns[key], %{conn | conn_state: conn_state, used_by: used_by})
+ else
+ false ->
+ Logger.warn("checkout for closed conn #{inspect(conn_pid)}")
+ state
+
+ nil ->
+ Logger.info("checkout for alive conn #{inspect(conn_pid)}, but is not in state")
+ state
+ end
+
+ {:noreply, state}
+ end
+
+ @impl true
+ def handle_call({:checkin, uri}, from, state) do
+ Logger.debug("checkin #{compose_uri(uri)}")
+ key = compose_key(uri)
+
+ case state.conns[key] do
+ %{conn: conn, gun_state: gun_state} = current_conn when gun_state == :up ->
+ Logger.debug("reusing conn #{compose_uri(uri)}")
+
+ with time <- :os.system_time(:second),
+ last_reference <- time - current_conn.last_reference,
+ current_crf <- crf(last_reference, 100, current_conn.crf),
+ state <-
+ put_in(state.conns[key], %{
+ current_conn
+ | last_reference: time,
+ crf: current_crf,
+ conn_state: :active,
+ used_by: [from | current_conn.used_by]
+ }) do
+ {:reply, conn, state}
+ end
+
+ %{gun_state: gun_state} when gun_state == :down ->
+ {:reply, nil, state}
+
+ nil ->
+ {:reply, nil, state}
+ end
+ end
+
+ @impl true
+ def handle_call(:state, _from, state), do: {:reply, state, state}
+
+ @impl true
+ def handle_info({:gun_up, conn_pid, _protocol}, state) do
+ state =
+ with true <- Process.alive?(conn_pid),
+ conn_key when is_binary(conn_key) <- compose_key_gun_info(conn_pid),
+ {key, conn} <- find_conn(state.conns, conn_pid, conn_key),
+ time <- :os.system_time(:second),
+ last_reference <- time - conn.last_reference,
+ current_crf <- crf(last_reference, 100, conn.crf) do
+ put_in(state.conns[key], %{
+ conn
+ | gun_state: :up,
+ last_reference: time,
+ crf: current_crf,
+ conn_state: :active,
+ retries: 0
+ })
+ else
+ :error_gun_info ->
+ Logger.warn(":gun.info caused error")
+ state
+
+ false ->
+ Logger.warn(":gun_up message for closed conn #{inspect(conn_pid)}")
+ state
+
+ nil ->
+ Logger.warn(
+ ":gun_up message for alive conn #{inspect(conn_pid)}, but deleted from state"
+ )
+
+ :ok = API.close(conn_pid)
+
+ state
+ end
+
+ {:noreply, state}
+ end
+
+ @impl true
+ def handle_info({:gun_down, conn_pid, _protocol, _reason, _killed}, state) do
+ # we can't get info on this pid, because pid is dead
+ state =
+ with true <- Process.alive?(conn_pid),
+ {key, conn} <- find_conn(state.conns, conn_pid) do
+ if conn.retries == 5 do
+ Logger.debug("closing conn if retries is eq 5 #{inspect(conn_pid)}")
+ :ok = API.close(conn.conn)
+
+ put_in(
+ state.conns,
+ Map.delete(state.conns, key)
+ )
+ else
+ put_in(state.conns[key], %{
+ conn
+ | gun_state: :down,
+ retries: conn.retries + 1
+ })
+ end
+ else
+ false ->
+ # gun can send gun_down for closed conn, maybe connection is not closed yet
+ Logger.warn(":gun_down message for closed conn #{inspect(conn_pid)}")
+ state
+
+ nil ->
+ Logger.warn(
+ ":gun_down message for alive conn #{inspect(conn_pid)}, but deleted from state"
+ )
+
+ :ok = API.close(conn_pid)
+
+ state
+ end
+
+ {:noreply, state}
+ end
+
+ defp compose_key(%URI{scheme: scheme, host: host, port: port}), do: "#{scheme}:#{host}:#{port}"
+
+ defp compose_key_gun_info(pid) do
+ try do
+ # sometimes :gun.info can raise MatchError, which lead to pool terminate
+ %{origin_host: origin_host, origin_scheme: scheme, origin_port: port} = API.info(pid)
+
+ host =
+ case :inet.ntoa(origin_host) do
+ {:error, :einval} -> origin_host
+ ip -> ip
+ end
+
+ "#{scheme}:#{host}:#{port}"
+ rescue
+ _ -> :error_gun_info
+ end
+ end
+
+ defp find_conn(conns, conn_pid) do
+ Enum.find(conns, fn {_key, conn} ->
+ conn.conn == conn_pid
+ end)
+ end
+
+ defp find_conn(conns, conn_pid, conn_key) do
+ Enum.find(conns, fn {key, conn} ->
+ key == conn_key and conn.conn == conn_pid
+ end)
+ end
+
+ defp open_conn(key, uri, state, %{proxy: {proxy_host, proxy_port}} = opts) do
+ connect_opts =
+ uri
+ |> destination_opts()
+ |> add_http2_opts(uri.scheme, Map.get(opts, :tls_opts, []))
+
+ with open_opts <- Map.delete(opts, :tls_opts),
+ {:ok, conn} <- API.open(proxy_host, proxy_port, open_opts),
+ {:ok, _} <- API.await_up(conn),
+ stream <- API.connect(conn, connect_opts),
+ {:response, :fin, 200, _} <- API.await(conn, stream),
+ state <-
+ put_in(state.conns[key], %Conn{
+ conn: conn,
+ gun_state: :up,
+ conn_state: :active,
+ last_reference: :os.system_time(:second)
+ }) do
+ {:noreply, state}
+ else
+ error ->
+ Logger.warn(
+ "Received error on opening connection with http proxy #{uri.scheme}://#{
+ compose_uri(uri)
+ }: #{inspect(error)}"
+ )
+
+ {:noreply, state}
+ end
+ end
+
+ defp open_conn(key, uri, state, %{proxy: {proxy_type, proxy_host, proxy_port}} = opts) do
+ version =
+ proxy_type
+ |> to_string()
+ |> String.last()
+ |> case do
+ "4" -> 4
+ _ -> 5
+ end
+
+ socks_opts =
+ uri
+ |> destination_opts()
+ |> add_http2_opts(uri.scheme, Map.get(opts, :tls_opts, []))
+ |> Map.put(:version, version)
+
+ opts =
+ opts
+ |> Map.put(:protocols, [:socks])
+ |> Map.put(:socks_opts, socks_opts)
+
+ with {:ok, conn} <- API.open(proxy_host, proxy_port, opts),
+ {:ok, _} <- API.await_up(conn),
+ state <-
+ put_in(state.conns[key], %Conn{
+ conn: conn,
+ gun_state: :up,
+ conn_state: :active,
+ last_reference: :os.system_time(:second)
+ }) do
+ {:noreply, state}
+ else
+ error ->
+ Logger.warn(
+ "Received error on opening connection with socks proxy #{uri.scheme}://#{
+ compose_uri(uri)
+ }: #{inspect(error)}"
+ )
+
+ {:noreply, state}
+ end
+ end
+
+ defp open_conn(key, %URI{host: host, port: port} = uri, state, opts) do
+ Logger.debug("opening conn #{compose_uri(uri)}")
+ {_type, host} = Pleroma.HTTP.Adapter.domain_or_ip(host)
+
+ with {:ok, conn} <- API.open(host, port, opts),
+ {:ok, _} <- API.await_up(conn),
+ state <-
+ put_in(state.conns[key], %Conn{
+ conn: conn,
+ gun_state: :up,
+ conn_state: :active,
+ last_reference: :os.system_time(:second)
+ }) do
+ Logger.debug("new conn opened #{compose_uri(uri)}")
+ Logger.debug("replying to the call #{compose_uri(uri)}")
+ {:noreply, state}
+ else
+ error ->
+ Logger.warn(
+ "Received error on opening connection #{uri.scheme}://#{compose_uri(uri)}: #{
+ inspect(error)
+ }"
+ )
+
+ {:noreply, state}
+ end
+ end
+
+ defp destination_opts(%URI{host: host, port: port}) do
+ {_type, host} = Pleroma.HTTP.Adapter.domain_or_ip(host)
+ %{host: host, port: port}
+ end
+
+ defp add_http2_opts(opts, "https", tls_opts) do
+ Map.merge(opts, %{protocols: [:http2], transport: :tls, tls_opts: tls_opts})
+ end
+
+ defp add_http2_opts(opts, _, _), do: opts
+
+ @spec get_unused_conns(map()) :: [{domain(), conn()}]
+ def get_unused_conns(conns) do
+ conns
+ |> Enum.filter(fn {_k, v} ->
+ v.conn_state == :idle and v.used_by == []
+ end)
+ |> Enum.sort(fn {_x_k, x}, {_y_k, y} ->
+ x.crf <= y.crf and x.last_reference <= y.last_reference
+ end)
+ end
+
+ defp try_to_open_conn(key, uri, state, opts) do
+ Logger.debug("try to open conn #{compose_uri(uri)}")
+
+ with [{close_key, least_used} | _conns] <- get_unused_conns(state.conns),
+ :ok <- API.close(least_used.conn),
+ state <-
+ put_in(
+ state.conns,
+ Map.delete(state.conns, close_key)
+ ) do
+ Logger.debug(
+ "least used conn found and closed #{inspect(least_used.conn)} #{compose_uri(uri)}"
+ )
+
+ open_conn(key, uri, state, opts)
+ else
+ [] -> {:noreply, state}
+ end
+ end
+
+ def crf(current, steps, crf) do
+ 1 + :math.pow(0.5, current / steps) * crf
+ end
+
+ def compose_uri(%URI{} = uri), do: "#{uri.host}#{uri.path}"
+end
diff --git a/lib/pleroma/pool/pool.ex b/lib/pleroma/pool/pool.ex
new file mode 100644
index 000000000..a7ae64ce4
--- /dev/null
+++ b/lib/pleroma/pool/pool.ex
@@ -0,0 +1,22 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Pool do
+ def child_spec(opts) do
+ poolboy_opts =
+ opts
+ |> Keyword.put(:worker_module, Pleroma.Pool.Request)
+ |> Keyword.put(:name, {:local, opts[:name]})
+ |> Keyword.put(:size, opts[:size])
+ |> Keyword.put(:max_overflow, opts[:max_overflow])
+
+ %{
+ id: opts[:id] || {__MODULE__, make_ref()},
+ start: {:poolboy, :start_link, [poolboy_opts, [name: opts[:name]]]},
+ restart: :permanent,
+ shutdown: 5000,
+ type: :worker
+ }
+ end
+end
diff --git a/lib/pleroma/pool/request.ex b/lib/pleroma/pool/request.ex
new file mode 100644
index 000000000..2c3574561
--- /dev/null
+++ b/lib/pleroma/pool/request.ex
@@ -0,0 +1,72 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Pool.Request do
+ use GenServer
+
+ require Logger
+
+ def start_link(args) do
+ GenServer.start_link(__MODULE__, args)
+ end
+
+ @impl true
+ def init(_), do: {:ok, []}
+
+ @spec execute(pid() | atom(), Tesla.Client.t(), keyword(), pos_integer()) ::
+ {:ok, Tesla.Env.t()} | {:error, any()}
+ def execute(pid, client, request, timeout) do
+ GenServer.call(pid, {:execute, client, request}, timeout)
+ end
+
+ @impl true
+ def handle_call({:execute, client, request}, _from, state) do
+ response = Pleroma.HTTP.request_try(client, request)
+
+ {:reply, response, state}
+ end
+
+ @impl true
+ def handle_info({:gun_data, _conn, stream, _, _}, state) do
+ # in some cases if we reuse conn and got {:error, :body_too_large}
+ # gun continues to send messages to this process,
+ # so we flush messages for this request
+ :ok = :gun.flush(stream)
+
+ {:noreply, state}
+ end
+
+ @impl true
+ def handle_info({:gun_up, _conn, _protocol}, state) do
+ {:noreply, state}
+ end
+
+ @impl true
+ def handle_info({:gun_down, _conn, _protocol, _reason, _killed}, state) do
+ # don't flush messages here, because gun can reconnect
+ {:noreply, state}
+ end
+
+ @impl true
+ def handle_info({:gun_error, _conn, stream, _error}, state) do
+ :ok = :gun.flush(stream)
+ {:noreply, state}
+ end
+
+ @impl true
+ def handle_info({:gun_push, _conn, _stream, _new_stream, _method, _uri, _headers}, state) do
+ {:noreply, state}
+ end
+
+ @impl true
+ def handle_info({:gun_response, _conn, _stream, _, _status, _headers}, state) do
+ {:noreply, state}
+ end
+
+ @impl true
+ def handle_info(msg, state) do
+ Logger.warn("Received unexpected message #{inspect(__MODULE__)} #{inspect(msg)}")
+ {:noreply, state}
+ end
+end
diff --git a/lib/pleroma/pool/supervisor.ex b/lib/pleroma/pool/supervisor.ex
new file mode 100644
index 000000000..32be2264d
--- /dev/null
+++ b/lib/pleroma/pool/supervisor.ex
@@ -0,0 +1,36 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Pool.Supervisor do
+ use Supervisor
+
+ alias Pleroma.Pool
+
+ def start_link(args) do
+ Supervisor.start_link(__MODULE__, args, name: __MODULE__)
+ end
+
+ def init(_) do
+ children =
+ [
+ %{
+ id: Pool.Connections,
+ start:
+ {Pool.Connections, :start_link,
+ [{:gun_connections, Pleroma.Config.get([:connections_pool])}]}
+ }
+ ] ++ pools()
+
+ Supervisor.init(children, strategy: :one_for_one)
+ end
+
+ defp pools do
+ for {pool_name, pool_opts} <- Pleroma.Config.get([:pools]) do
+ pool_opts
+ |> Keyword.put(:id, {Pool, pool_name})
+ |> Keyword.put(:name, pool_name)
+ |> Pool.child_spec()
+ end
+ end
+end
diff --git a/lib/pleroma/reverse_proxy/client.ex b/lib/pleroma/reverse_proxy/client.ex
index 776c4794c..63261b94c 100644
--- a/lib/pleroma/reverse_proxy/client.ex
+++ b/lib/pleroma/reverse_proxy/client.ex
@@ -3,19 +3,23 @@
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.ReverseProxy.Client do
- @callback request(atom(), String.t(), [tuple()], String.t(), list()) ::
- {:ok, pos_integer(), [tuple()], reference() | map()}
- | {:ok, pos_integer(), [tuple()]}
+ @type status :: pos_integer()
+ @type header_name :: String.t()
+ @type header_value :: String.t()
+ @type headers :: [{header_name(), header_value()}]
+
+ @callback request(atom(), String.t(), headers(), String.t(), list()) ::
+ {:ok, status(), headers(), reference() | map()}
+ | {:ok, status(), headers()}
| {:ok, reference()}
| {:error, term()}
- @callback stream_body(reference() | pid() | map()) ::
- {:ok, binary()} | :done | {:error, String.t()}
+ @callback stream_body(map()) :: {:ok, binary(), map()} | :done | {:error, atom() | String.t()}
@callback close(reference() | pid() | map()) :: :ok
- def request(method, url, headers, "", opts \\ []) do
- client().request(method, url, headers, "", opts)
+ def request(method, url, headers, body \\ "", opts \\ []) do
+ client().request(method, url, headers, body, opts)
end
def stream_body(ref), do: client().stream_body(ref)
@@ -23,6 +27,12 @@ defmodule Pleroma.ReverseProxy.Client do
def close(ref), do: client().close(ref)
defp client do
- Pleroma.Config.get([Pleroma.ReverseProxy.Client], :hackney)
+ :tesla
+ |> Application.get_env(:adapter)
+ |> client()
end
+
+ defp client(Tesla.Adapter.Hackney), do: Pleroma.ReverseProxy.Client.Hackney
+ defp client(Tesla.Adapter.Gun), do: Pleroma.ReverseProxy.Client.Tesla
+ defp client(_), do: Pleroma.Config.get!(Pleroma.ReverseProxy.Client)
end
diff --git a/lib/pleroma/reverse_proxy/client/hackney.ex b/lib/pleroma/reverse_proxy/client/hackney.ex
new file mode 100644
index 000000000..e41560ab0
--- /dev/null
+++ b/lib/pleroma/reverse_proxy/client/hackney.ex
@@ -0,0 +1,24 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.ReverseProxy.Client.Hackney do
+ @behaviour Pleroma.ReverseProxy.Client
+
+ @impl true
+ def request(method, url, headers, body, opts \\ []) do
+ :hackney.request(method, url, headers, body, opts)
+ end
+
+ @impl true
+ def stream_body(ref) do
+ case :hackney.stream_body(ref) do
+ :done -> :done
+ {:ok, data} -> {:ok, data, ref}
+ {:error, error} -> {:error, error}
+ end
+ end
+
+ @impl true
+ def close(ref), do: :hackney.close(ref)
+end
diff --git a/lib/pleroma/reverse_proxy/client/tesla.ex b/lib/pleroma/reverse_proxy/client/tesla.ex
new file mode 100644
index 000000000..55a11b4a8
--- /dev/null
+++ b/lib/pleroma/reverse_proxy/client/tesla.ex
@@ -0,0 +1,87 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.ReverseProxy.Client.Tesla do
+ @type headers() :: [{String.t(), String.t()}]
+ @type status() :: pos_integer()
+
+ @behaviour Pleroma.ReverseProxy.Client
+
+ @spec request(atom(), String.t(), headers(), String.t(), keyword()) ::
+ {:ok, status(), headers}
+ | {:ok, status(), headers, map()}
+ | {:error, atom() | String.t()}
+ | no_return()
+
+ @impl true
+ def request(method, url, headers, body, opts \\ []) do
+ _adapter = check_adapter()
+
+ with opts <- Keyword.merge(opts, body_as: :chunks, mode: :passive),
+ {:ok, response} <-
+ Pleroma.HTTP.request(
+ method,
+ url,
+ body,
+ headers,
+ Keyword.put(opts, :adapter, opts)
+ ) do
+ if is_map(response.body) and method != :head do
+ {:ok, response.status, response.headers, response.body}
+ else
+ {:ok, response.status, response.headers}
+ end
+ else
+ {:error, error} -> {:error, error}
+ end
+ end
+
+ @impl true
+ @spec stream_body(map()) :: {:ok, binary(), map()} | {:error, atom() | String.t()} | :done
+ def stream_body(%{pid: pid, opts: opts, fin: true}) do
+ # if connection was sended and there were redirects, we need to close new conn - pid manually
+ if opts[:old_conn], do: Tesla.Adapter.Gun.close(pid)
+ # if there were redirects we need to checkout old conn
+ conn = opts[:old_conn] || opts[:conn]
+
+ if conn, do: :ok = Pleroma.Pool.Connections.checkout(conn, self(), :gun_connections)
+
+ :done
+ end
+
+ def stream_body(client) do
+ case read_chunk!(client) do
+ {:fin, body} ->
+ {:ok, body, Map.put(client, :fin, true)}
+
+ {:nofin, part} ->
+ {:ok, part, client}
+
+ {:error, error} ->
+ {:error, error}
+ end
+ end
+
+ defp read_chunk!(%{pid: pid, stream: stream, opts: opts}) do
+ adapter = check_adapter()
+ adapter.read_chunk(pid, stream, opts)
+ end
+
+ @impl true
+ @spec close(map) :: :ok | no_return()
+ def close(%{pid: pid}) do
+ adapter = check_adapter()
+ adapter.close(pid)
+ end
+
+ defp check_adapter do
+ adapter = Application.get_env(:tesla, :adapter)
+
+ unless adapter == Tesla.Adapter.Gun do
+ raise "#{adapter} doesn't support reading body in chunks"
+ end
+
+ adapter
+ end
+end
diff --git a/lib/pleroma/reverse_proxy/reverse_proxy.ex b/lib/pleroma/reverse_proxy/reverse_proxy.ex
index 2ed719315..9f5710c92 100644
--- a/lib/pleroma/reverse_proxy/reverse_proxy.ex
+++ b/lib/pleroma/reverse_proxy/reverse_proxy.ex
@@ -3,8 +3,6 @@
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.ReverseProxy do
- alias Pleroma.HTTP
-
@keep_req_headers ~w(accept user-agent accept-encoding cache-control if-modified-since) ++
~w(if-unmodified-since if-none-match if-range range)
@resp_cache_headers ~w(etag date last-modified cache-control)
@@ -61,10 +59,10 @@ defmodule Pleroma.ReverseProxy do
* `req_headers`, `resp_headers` additional headers.
- * `http`: options for [hackney](https://github.com/benoitc/hackney).
+ * `http`: options for [gun](https://github.com/ninenines/gun).
"""
- @default_hackney_options [pool: :media]
+ @default_options [pool: :media]
@inline_content_types [
"image/gif",
@@ -97,11 +95,7 @@ defmodule Pleroma.ReverseProxy do
def call(_conn, _url, _opts \\ [])
def call(conn = %{method: method}, url, opts) when method in @methods do
- hackney_opts =
- Pleroma.HTTP.Connection.hackney_options([])
- |> Keyword.merge(@default_hackney_options)
- |> Keyword.merge(Keyword.get(opts, :http, []))
- |> HTTP.process_request_options()
+ client_opts = Keyword.merge(@default_options, Keyword.get(opts, :http, []))
req_headers = build_req_headers(conn.req_headers, opts)
@@ -113,7 +107,7 @@ defmodule Pleroma.ReverseProxy do
end
with {:ok, nil} <- Cachex.get(:failed_proxy_url_cache, url),
- {:ok, code, headers, client} <- request(method, url, req_headers, hackney_opts),
+ {:ok, code, headers, client} <- request(method, url, req_headers, client_opts),
:ok <-
header_length_constraint(
headers,
@@ -159,11 +153,11 @@ defmodule Pleroma.ReverseProxy do
|> halt()
end
- defp request(method, url, headers, hackney_opts) do
+ defp request(method, url, headers, opts) do
Logger.debug("#{__MODULE__} #{method} #{url} #{inspect(headers)}")
method = method |> String.downcase() |> String.to_existing_atom()
- case client().request(method, url, headers, "", hackney_opts) do
+ case client().request(method, url, headers, "", opts) do
{:ok, code, headers, client} when code in @valid_resp_codes ->
{:ok, code, downcase_headers(headers), client}
@@ -213,7 +207,7 @@ defmodule Pleroma.ReverseProxy do
duration,
Keyword.get(opts, :max_read_duration, @max_read_duration)
),
- {:ok, data} <- client().stream_body(client),
+ {:ok, data, client} <- client().stream_body(client),
{:ok, duration} <- increase_read_duration(duration),
sent_so_far = sent_so_far + byte_size(data),
:ok <-
diff --git a/lib/pleroma/web/activity_pub/mrf/media_proxy_warming_policy.ex b/lib/pleroma/web/activity_pub/mrf/media_proxy_warming_policy.ex
index df774b0f7..ade87daf2 100644
--- a/lib/pleroma/web/activity_pub/mrf/media_proxy_warming_policy.ex
+++ b/lib/pleroma/web/activity_pub/mrf/media_proxy_warming_policy.ex
@@ -12,17 +12,23 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do
require Logger
- @hackney_options [
- pool: :media,
- recv_timeout: 10_000
+ @options [
+ pool: :media
]
def perform(:prefetch, url) do
Logger.debug("Prefetching #{inspect(url)}")
+ opts =
+ if Application.get_env(:tesla, :adapter) == Tesla.Adapter.Hackney do
+ Keyword.put(@options, :recv_timeout, 10_000)
+ else
+ @options
+ end
+
url
|> MediaProxy.url()
- |> HTTP.get([], adapter: @hackney_options)
+ |> HTTP.get([], adapter: opts)
end
def perform(:preload, %{"object" => %{"attachment" => attachments}} = _message) do
diff --git a/lib/pleroma/web/rel_me.ex b/lib/pleroma/web/rel_me.ex
index 16b1a53d2..0ae926375 100644
--- a/lib/pleroma/web/rel_me.ex
+++ b/lib/pleroma/web/rel_me.ex
@@ -3,11 +3,9 @@
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.RelMe do
- @hackney_options [
+ @options [
pool: :media,
- recv_timeout: 2_000,
- max_body: 2_000_000,
- with_body: true
+ max_body: 2_000_000
]
if Pleroma.Config.get(:env) == :test do
@@ -25,8 +23,18 @@ defmodule Pleroma.Web.RelMe do
def parse(_), do: {:error, "No URL provided"}
defp parse_url(url) do
+ opts =
+ if Application.get_env(:tesla, :adapter) == Tesla.Adapter.Hackney do
+ Keyword.merge(@options,
+ recv_timeout: 2_000,
+ with_body: true
+ )
+ else
+ @options
+ end
+
with {:ok, %Tesla.Env{body: html, status: status}} when status in 200..299 <-
- Pleroma.HTTP.get(url, [], adapter: @hackney_options),
+ Pleroma.HTTP.get(url, [], adapter: opts),
data <-
Floki.attribute(html, "link[rel~=me]", "href") ++
Floki.attribute(html, "a[rel~=me]", "href") do
diff --git a/lib/pleroma/web/rich_media/parser.ex b/lib/pleroma/web/rich_media/parser.ex
index c06b0a0f2..9deb03845 100644
--- a/lib/pleroma/web/rich_media/parser.ex
+++ b/lib/pleroma/web/rich_media/parser.ex
@@ -3,11 +3,9 @@
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.RichMedia.Parser do
- @hackney_options [
+ @options [
pool: :media,
- recv_timeout: 2_000,
- max_body: 2_000_000,
- with_body: true
+ max_body: 2_000_000
]
defp parsers do
@@ -77,8 +75,18 @@ defmodule Pleroma.Web.RichMedia.Parser do
end
defp parse_url(url) do
+ opts =
+ if Application.get_env(:tesla, :adapter) == Tesla.Adapter.Hackney do
+ Keyword.merge(@options,
+ recv_timeout: 2_000,
+ with_body: true
+ )
+ else
+ @options
+ end
+
try do
- {:ok, %Tesla.Env{body: html}} = Pleroma.HTTP.get(url, [], adapter: @hackney_options)
+ {:ok, %Tesla.Env{body: html}} = Pleroma.HTTP.get(url, [], adapter: opts)
html
|> parse_html
diff --git a/lib/pleroma/web/web_finger/web_finger.ex b/lib/pleroma/web/web_finger/web_finger.ex
index b4cc80179..91e9e2271 100644
--- a/lib/pleroma/web/web_finger/web_finger.ex
+++ b/lib/pleroma/web/web_finger/web_finger.ex
@@ -205,7 +205,7 @@ defmodule Pleroma.Web.WebFinger do
with response <-
HTTP.get(
address,
- Accept: "application/xrd+xml,application/jrd+json"
+ [{"accept", "application/xrd+xml,application/jrd+json"}]
),
{:ok, %{status: status, body: body}} when status in 200..299 <- response do
doc = XML.parse_document(body)