diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/mix/tasks/pleroma/relay.ex | 12 | ||||
-rw-r--r-- | lib/pleroma/application.ex | 14 | ||||
-rw-r--r-- | lib/pleroma/digest_email_worker.ex | 4 | ||||
-rw-r--r-- | lib/pleroma/gun/api/api.ex | 10 | ||||
-rw-r--r-- | lib/pleroma/gun/api/gun.ex | 14 | ||||
-rw-r--r-- | lib/pleroma/gun/api/mock.ex | 55 | ||||
-rw-r--r-- | lib/pleroma/gun/conn.ex | 4 | ||||
-rw-r--r-- | lib/pleroma/gun/connections.ex | 133 | ||||
-rw-r--r-- | lib/pleroma/http/connection.ex | 36 | ||||
-rw-r--r-- | lib/pleroma/http/http.ex | 15 | ||||
-rw-r--r-- | lib/pleroma/reverse_proxy/client/hackney.ex | 4 | ||||
-rw-r--r-- | lib/pleroma/reverse_proxy/reverse_proxy.ex | 12 | ||||
-rw-r--r-- | lib/pleroma/user.ex | 4 |
13 files changed, 232 insertions, 85 deletions
diff --git a/lib/mix/tasks/pleroma/relay.ex b/lib/mix/tasks/pleroma/relay.ex index c7324fff6..a738fae75 100644 --- a/lib/mix/tasks/pleroma/relay.ex +++ b/lib/mix/tasks/pleroma/relay.ex @@ -53,13 +53,11 @@ defmodule Mix.Tasks.Pleroma.Relay do def run(["list"]) do start_pleroma() - with %User{} = user <- Relay.get_actor() do - user.following - |> Enum.each(fn entry -> - URI.parse(entry) - |> Map.get(:host) - |> shell_info() - end) + with %User{following: following} = _user <- Relay.get_actor() do + following + |> Enum.map(fn entry -> URI.parse(entry).host end) + |> Enum.uniq() + |> Enum.each(&shell_info(&1)) else e -> shell_error("Error while fetching relay subscription list: #{inspect(e)}") end diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex index f755a1355..cf0b99fe6 100644 --- a/lib/pleroma/application.ex +++ b/lib/pleroma/application.ex @@ -40,6 +40,7 @@ defmodule Pleroma.Application do ] ++ cachex_children() ++ hackney_pool_children() ++ + gun_pools() ++ [ Pleroma.Web.Federator.RetryQueue, Pleroma.Stats, @@ -164,6 +165,19 @@ defmodule Pleroma.Application do end end + defp gun_pools do + if Application.get_env(:tesla, :adapter) == Tesla.Adapter.Gun || Mix.env() == :test do + for {pool_name, opts} <- Pleroma.Config.get([:gun_pools]) do + %{ + id: :"gun_pool_#{pool_name}", + start: {Pleroma.Gun.Connections, :start_link, [{pool_name, opts}]} + } + end + else + [] + end + end + defp after_supervisor_start do with digest_config <- Application.get_env(:pleroma, :email_notifications)[:digest], true <- digest_config[:active] do diff --git a/lib/pleroma/digest_email_worker.ex b/lib/pleroma/digest_email_worker.ex index 18e67d39b..5644d6a67 100644 --- a/lib/pleroma/digest_email_worker.ex +++ b/lib/pleroma/digest_email_worker.ex @@ -1,3 +1,7 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + defmodule Pleroma.DigestEmailWorker do import Ecto.Query diff --git a/lib/pleroma/gun/api/api.ex b/lib/pleroma/gun/api/api.ex index 19adc1bf0..7e6d2f929 100644 --- a/lib/pleroma/gun/api/api.ex +++ b/lib/pleroma/gun/api/api.ex @@ -4,11 +4,21 @@ defmodule Pleroma.Gun.API do @callback open(charlist(), pos_integer(), map()) :: {:ok, pid()} + @callback info(pid()) :: map() + @callback close(pid()) :: :ok def open(host, port, opts) do api().open(host, port, opts) end + def info(pid) do + api().info(pid) + end + + def close(pid) do + api().close(pid) + end + defp api do Pleroma.Config.get([Pleroma.Gun.API], Pleroma.Gun.API.Gun) end diff --git a/lib/pleroma/gun/api/gun.ex b/lib/pleroma/gun/api/gun.ex index 14a4b7275..d97f5a7c9 100644 --- a/lib/pleroma/gun/api/gun.ex +++ b/lib/pleroma/gun/api/gun.ex @@ -1,6 +1,12 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + defmodule Pleroma.Gun.API.Gun do @behaviour Pleroma.Gun.API + alias Pleroma.Gun.API + @gun_keys [ :connect_timeout, :http_opts, @@ -15,8 +21,14 @@ defmodule Pleroma.Gun.API.Gun do :ws_opts ] - @impl Pleroma.Gun.API + @impl API def open(host, port, opts) do :gun.open(host, port, Map.take(opts, @gun_keys)) end + + @impl API + def info(pid), do: :gun.info(pid) + + @impl API + def close(pid), do: :gun.close(pid) end diff --git a/lib/pleroma/gun/api/mock.ex b/lib/pleroma/gun/api/mock.ex index ff9e13a74..b1a30a73c 100644 --- a/lib/pleroma/gun/api/mock.ex +++ b/lib/pleroma/gun/api/mock.ex @@ -4,37 +4,80 @@ defmodule Pleroma.Gun.API.Mock do @behaviour Pleroma.Gun.API - @impl Pleroma.Gun.API - def open('some-domain.com', 80, %{genserver_pid: genserver_pid}) do + + alias Pleroma.Gun.API + + @impl API + def open(domain, 80, %{genserver_pid: genserver_pid}) + when domain in ['another-domain.com', 'some-domain.com'] do {:ok, conn_pid} = Task.start_link(fn -> Process.sleep(1_000) end) + + Registry.register(API.Mock, conn_pid, %{ + origin_scheme: "http", + origin_host: domain, + origin_port: 80 + }) + send(genserver_pid, {:gun_up, conn_pid, :http}) {:ok, conn_pid} end + @impl API def open('some-domain.com', 443, %{genserver_pid: genserver_pid}) do {:ok, conn_pid} = Task.start_link(fn -> Process.sleep(1_000) end) + + Registry.register(API.Mock, conn_pid, %{ + origin_scheme: "https", + origin_host: 'some-domain.com', + origin_port: 443 + }) + send(genserver_pid, {:gun_up, conn_pid, :http2}) {:ok, conn_pid} end - @impl Pleroma.Gun.API - def open('gun_down.com', _port, %{genserver_pid: genserver_pid}) do + @impl API + def open('gun_down.com', 80, %{genserver_pid: genserver_pid}) do {:ok, conn_pid} = Task.start_link(fn -> Process.sleep(1_000) end) + + Registry.register(API.Mock, conn_pid, %{ + origin_scheme: "http", + origin_host: 'gun_down.com', + origin_port: 80 + }) + send(genserver_pid, {:gun_down, conn_pid, :http, nil, nil, nil}) {:ok, conn_pid} end - @impl Pleroma.Gun.API - def open('gun_down_and_up.com', _port, %{genserver_pid: genserver_pid}) do + @impl API + def open('gun_down_and_up.com', 80, %{genserver_pid: genserver_pid}) do {:ok, conn_pid} = Task.start_link(fn -> Process.sleep(1_000) end) + + Registry.register(API.Mock, conn_pid, %{ + origin_scheme: "http", + origin_host: 'gun_down_and_up.com', + origin_port: 80 + }) + send(genserver_pid, {:gun_down, conn_pid, :http, nil, nil, nil}) {:ok, _} = Task.start_link(fn -> Process.sleep(500) + send(genserver_pid, {:gun_up, conn_pid, :http}) end) {:ok, conn_pid} end + + @impl API + def info(pid) do + [{_, info}] = Registry.lookup(API.Mock, pid) + info + end + + @impl API + def close(_pid), do: :ok end diff --git a/lib/pleroma/gun/conn.ex b/lib/pleroma/gun/conn.ex index 62ef146a1..20ddec64c 100644 --- a/lib/pleroma/gun/conn.ex +++ b/lib/pleroma/gun/conn.ex @@ -10,8 +10,8 @@ defmodule Pleroma.Gun.Conn do conn: pid(), state: atom(), waiting_pids: [pid()], - protocol: atom() + used: pos_integer() } - defstruct conn: nil, state: :open, waiting_pids: [], protocol: :http + defstruct conn: nil, state: :open, waiting_pids: [], used: 0 end diff --git a/lib/pleroma/gun/connections.ex b/lib/pleroma/gun/connections.ex index 695c3c93e..3716d9f74 100644 --- a/lib/pleroma/gun/connections.ex +++ b/lib/pleroma/gun/connections.ex @@ -6,35 +6,29 @@ defmodule Pleroma.Gun.Connections do use GenServer @type domain :: String.t() - @type conn :: Gun.Conn.t() + @type conn :: Pleroma.Gun.Conn.t() + @type t :: %__MODULE__{ - conns: %{domain() => conn()} + conns: %{domain() => conn()}, + opts: keyword() } - defstruct conns: %{} - - def start_link(name \\ __MODULE__) + defstruct conns: %{}, opts: [] - def start_link(name) when is_atom(name) do - GenServer.start_link(__MODULE__, [], name: name) - end + alias Pleroma.Gun.API - def start_link(_) do - if Application.get_env(:tesla, :adapter) == Tesla.Adapter.Gun do - GenServer.start_link(__MODULE__, []) - else - :ignore - end + @spec start_link({atom(), keyword()}) :: {:ok, pid()} | :ignore + def start_link({name, opts}) do + GenServer.start_link(__MODULE__, opts, name: name) end @impl true - def init(_) do - {:ok, %__MODULE__{conns: %{}}} - end + def init(opts), do: {:ok, %__MODULE__{conns: %{}, opts: opts}} @spec get_conn(String.t(), keyword(), atom()) :: pid() - def get_conn(url, opts \\ [], name \\ __MODULE__) do + def get_conn(url, opts \\ [], name \\ :default) do opts = Enum.into(opts, %{}) + uri = URI.parse(url) opts = @@ -42,35 +36,33 @@ defmodule Pleroma.Gun.Connections do do: Map.put(opts, :transport, :tls), else: opts + opts = + if uri.scheme == "https" do + host = uri.host |> to_charlist() + + tls_opts = + Map.get(opts, :tls_opts, []) + |> Keyword.put(:server_name_indication, host) + + Map.put(opts, :tls_opts, tls_opts) + else + opts + end + GenServer.call( name, {:conn, %{opts: opts, uri: uri}} ) end - # TODO: only for testing, add this parameter to the config - @spec try_to_get_gun_conn(String.t(), keyword(), atom()) :: nil | pid() - def try_to_get_gun_conn(url, opts \\ [], name \\ __MODULE__), - do: try_to_get_gun_conn(url, opts, name, 0) - - @spec try_to_get_gun_conn(String.t(), keyword(), atom(), pos_integer()) :: nil | pid() - def try_to_get_gun_conn(_url, _, _, 3), do: nil - - def try_to_get_gun_conn(url, opts, name, acc) do - case Pleroma.Gun.Connections.get_conn(url, opts, name) do - nil -> try_to_get_gun_conn(url, acc + 1) - conn -> conn - end - end - @spec alive?(atom()) :: boolean() - def alive?(name \\ __MODULE__) do + def alive?(name \\ :default) do pid = Process.whereis(name) if pid, do: Process.alive?(pid), else: false end @spec get_state(atom()) :: t() - def get_state(name \\ __MODULE__) do + def get_state(name \\ :default) do GenServer.call(name, {:state}) end @@ -79,7 +71,8 @@ defmodule Pleroma.Gun.Connections do key = compose_key(uri) case state.conns[key] do - %{conn: conn, state: conn_state} when conn_state == :up -> + %{conn: conn, state: conn_state, used: used} when conn_state == :up -> + state = put_in(state.conns[key].used, used + 1) {:reply, conn, state} %{state: conn_state, waiting_pids: pids} when conn_state in [:open, :down] -> @@ -87,16 +80,23 @@ defmodule Pleroma.Gun.Connections do {:noreply, state} nil -> - {:ok, conn} = Pleroma.Gun.API.open(to_charlist(uri.host), uri.port, opts) + max_connections = state.opts[:max_connections] - state = - put_in(state.conns[key], %Pleroma.Gun.Conn{ - conn: conn, - waiting_pids: [from], - protocol: String.to_atom(uri.scheme) - }) + if Enum.count(state.conns) < max_connections do + open_conn(key, uri, from, state, opts) + else + [{close_key, least_used} | _conns] = Enum.sort_by(state.conns, fn {_k, v} -> v.used end) - {:noreply, state} + :ok = API.close(least_used.conn) + + state = + put_in( + state.conns, + Map.delete(state.conns, close_key) + ) + + open_conn(key, uri, from, state, opts) + end end end @@ -105,36 +105,63 @@ defmodule Pleroma.Gun.Connections do @impl true def handle_info({:gun_up, conn_pid, _protocol}, state) do - {key, conn} = find_conn(state.conns, conn_pid) + conn_key = compose_key_gun_info(conn_pid) + {key, conn} = find_conn(state.conns, conn_pid, conn_key) # Send to all waiting processes connection pid Enum.each(conn.waiting_pids, fn waiting_pid -> GenServer.reply(waiting_pid, conn_pid) end) # Update state of the current connection and set waiting_pids to empty list - state = put_in(state.conns[key], %{conn | state: :up, waiting_pids: []}) + state = + put_in(state.conns[key], %{ + conn + | state: :up, + waiting_pids: [], + used: conn.used + length(conn.waiting_pids) + }) + {:noreply, state} end @impl true - # Do we need to do something with killed & unprocessed references? def handle_info({:gun_down, conn_pid, _protocol, _reason, _killed, _unprocessed}, state) do + # we can't get info on this pid, because pid is dead {key, conn} = find_conn(state.conns, conn_pid) - # We don't want to block requests to GenServer. - # If gun sends a down message, return nil, so we can make some - # retries, while the connection is not up. Enum.each(conn.waiting_pids, fn waiting_pid -> GenServer.reply(waiting_pid, nil) end) state = put_in(state.conns[key].state, :down) {:noreply, state} end - defp compose_key(uri), do: uri.host <> ":" <> to_string(uri.port) + defp compose_key(uri), do: "#{uri.scheme}:#{uri.host}:#{uri.port}" + + defp compose_key_gun_info(pid) do + info = API.info(pid) + "#{info.origin_scheme}:#{info.origin_host}:#{info.origin_port}" + end defp find_conn(conns, conn_pid) do + Enum.find(conns, fn {_key, conn} -> + conn.conn == conn_pid + end) + end + + defp find_conn(conns, conn_pid, conn_key) do Enum.find(conns, fn {key, conn} -> - protocol = if String.ends_with?(key, ":443"), do: :https, else: :http - conn.conn == conn_pid and conn.protocol == protocol + key == conn_key and conn.conn == conn_pid end) end + + defp open_conn(key, uri, from, state, opts) do + {:ok, conn} = API.open(to_charlist(uri.host), uri.port, opts) + + state = + put_in(state.conns[key], %Pleroma.Gun.Conn{ + conn: conn, + waiting_pids: [from] + }) + + {:noreply, state} + end end diff --git a/lib/pleroma/http/connection.ex b/lib/pleroma/http/connection.ex index 6cb26c0fe..ef2ee918d 100644 --- a/lib/pleroma/http/connection.ex +++ b/lib/pleroma/http/connection.ex @@ -9,7 +9,9 @@ defmodule Pleroma.HTTP.Connection do @options [ connect_timeout: 10_000, - timeout: 20_000 + timeout: 20_000, + pool: :federation, + version: :master ] @doc """ @@ -33,9 +35,33 @@ defmodule Pleroma.HTTP.Connection do adapter_options = Pleroma.Config.get([:http, :adapter], []) proxy_url = Pleroma.Config.get([:http, :proxy_url], nil) - @options - |> Keyword.merge(adapter_options) - |> Keyword.merge(options) - |> Keyword.merge(proxy: proxy_url) + options = + @options + |> Keyword.merge(adapter_options) + |> Keyword.merge(options) + |> Keyword.merge(proxy: proxy_url) + + pool = options[:pool] + url = options[:url] + + if not is_nil(url) and not is_nil(pool) and Pleroma.Gun.Connections.alive?(pool) do + get_conn_for_gun(url, options, pool) + else + options + end + end + + defp get_conn_for_gun(url, options, pool) do + case Pleroma.Gun.Connections.get_conn(url, options, pool) do + nil -> + options + + conn -> + %{host: host, port: port} = URI.parse(url) + + Keyword.put(options, :conn, conn) + |> Keyword.put(:close_conn, false) + |> Keyword.put(:original, "#{host}:#{port}") + end end end diff --git a/lib/pleroma/http/http.ex b/lib/pleroma/http/http.ex index 3e29b7b6c..00558d70a 100644 --- a/lib/pleroma/http/http.ex +++ b/lib/pleroma/http/http.ex @@ -35,8 +35,12 @@ defmodule Pleroma.HTTP do adapter_gun? = Application.get_env(:tesla, :adapter) == Tesla.Adapter.Gun options = - if adapter_gun? and Pleroma.Gun.Connections.alive?() do - get_conn_for_gun(url, options) + if adapter_gun? do + adapter_opts = + Keyword.get(options, :adapter, []) + |> Keyword.put(:url, url) + + Keyword.put(options, :adapter, adapter_opts) else options end @@ -87,12 +91,7 @@ defmodule Pleroma.HTTP do case uri.scheme do "https" -> - tls_opts = - Keyword.get(options, :tls_opts, []) - |> Keyword.put(:server_name_indication, host) - |> Keyword.put(:versions, [:"tlsv1.2", :"tlsv1.1", :tlsv1]) - - Keyword.put(options, :tls_opts, tls_opts) ++ [ssl: [server_name_indication: host]] + options ++ [ssl: [server_name_indication: host]] _ -> options diff --git a/lib/pleroma/reverse_proxy/client/hackney.ex b/lib/pleroma/reverse_proxy/client/hackney.ex index e6293646a..402c183af 100644 --- a/lib/pleroma/reverse_proxy/client/hackney.ex +++ b/lib/pleroma/reverse_proxy/client/hackney.ex @@ -1,3 +1,7 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + defmodule Pleroma.ReverseProxy.Client.Hackney do @behaviour Pleroma.ReverseProxy.Client diff --git a/lib/pleroma/reverse_proxy/reverse_proxy.ex b/lib/pleroma/reverse_proxy/reverse_proxy.ex index c3ef412a4..a2cdcf393 100644 --- a/lib/pleroma/reverse_proxy/reverse_proxy.ex +++ b/lib/pleroma/reverse_proxy/reverse_proxy.ex @@ -109,7 +109,11 @@ defmodule Pleroma.ReverseProxy do end with {:ok, code, headers, client} <- request(method, url, req_headers, client_opts), - :ok <- header_length_constraint(headers, Keyword.get(opts, :max_body_length)) do + :ok <- + header_length_constraint( + headers, + Keyword.get(opts, :max_body_length, @max_body_length) + ) do response(conn, client, url, code, headers, opts) else {:ok, code, headers} -> @@ -200,7 +204,11 @@ defmodule Pleroma.ReverseProxy do {:ok, data, client} <- client().stream_body(client), {:ok, duration} <- increase_read_duration(duration), sent_so_far = sent_so_far + byte_size(data), - :ok <- body_size_constraint(sent_so_far, Keyword.get(opts, :max_body_size)), + :ok <- + body_size_constraint( + sent_so_far, + Keyword.get(opts, :max_body_length, @max_body_length) + ), {:ok, conn} <- chunk(conn, data) do chunk_reply(conn, client, opts, sent_so_far, duration) else diff --git a/lib/pleroma/user.ex b/lib/pleroma/user.ex index 829de6e31..02011f4e6 100644 --- a/lib/pleroma/user.ex +++ b/lib/pleroma/user.ex @@ -750,6 +750,7 @@ defmodule Pleroma.User do |> update_and_set_cache() end + @spec maybe_fetch_follow_information(User.t()) :: User.t() def maybe_fetch_follow_information(user) do with {:ok, user} <- fetch_follow_information(user) do user @@ -807,9 +808,10 @@ defmodule Pleroma.User do end end + @spec maybe_update_following_count(User.t()) :: User.t() def maybe_update_following_count(%User{local: false} = user) do if Pleroma.Config.get([:instance, :external_user_synchronization]) do - {:ok, maybe_fetch_follow_information(user)} + maybe_fetch_follow_information(user) else user end |