aboutsummaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/mix/tasks/pleroma/relay.ex12
-rw-r--r--lib/pleroma/application.ex14
-rw-r--r--lib/pleroma/digest_email_worker.ex4
-rw-r--r--lib/pleroma/gun/api/api.ex10
-rw-r--r--lib/pleroma/gun/api/gun.ex14
-rw-r--r--lib/pleroma/gun/api/mock.ex55
-rw-r--r--lib/pleroma/gun/conn.ex4
-rw-r--r--lib/pleroma/gun/connections.ex133
-rw-r--r--lib/pleroma/http/connection.ex36
-rw-r--r--lib/pleroma/http/http.ex15
-rw-r--r--lib/pleroma/reverse_proxy/client/hackney.ex4
-rw-r--r--lib/pleroma/reverse_proxy/reverse_proxy.ex12
-rw-r--r--lib/pleroma/user.ex4
13 files changed, 232 insertions, 85 deletions
diff --git a/lib/mix/tasks/pleroma/relay.ex b/lib/mix/tasks/pleroma/relay.ex
index c7324fff6..a738fae75 100644
--- a/lib/mix/tasks/pleroma/relay.ex
+++ b/lib/mix/tasks/pleroma/relay.ex
@@ -53,13 +53,11 @@ defmodule Mix.Tasks.Pleroma.Relay do
def run(["list"]) do
start_pleroma()
- with %User{} = user <- Relay.get_actor() do
- user.following
- |> Enum.each(fn entry ->
- URI.parse(entry)
- |> Map.get(:host)
- |> shell_info()
- end)
+ with %User{following: following} = _user <- Relay.get_actor() do
+ following
+ |> Enum.map(fn entry -> URI.parse(entry).host end)
+ |> Enum.uniq()
+ |> Enum.each(&shell_info(&1))
else
e -> shell_error("Error while fetching relay subscription list: #{inspect(e)}")
end
diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex
index f755a1355..cf0b99fe6 100644
--- a/lib/pleroma/application.ex
+++ b/lib/pleroma/application.ex
@@ -40,6 +40,7 @@ defmodule Pleroma.Application do
] ++
cachex_children() ++
hackney_pool_children() ++
+ gun_pools() ++
[
Pleroma.Web.Federator.RetryQueue,
Pleroma.Stats,
@@ -164,6 +165,19 @@ defmodule Pleroma.Application do
end
end
+ defp gun_pools do
+ if Application.get_env(:tesla, :adapter) == Tesla.Adapter.Gun || Mix.env() == :test do
+ for {pool_name, opts} <- Pleroma.Config.get([:gun_pools]) do
+ %{
+ id: :"gun_pool_#{pool_name}",
+ start: {Pleroma.Gun.Connections, :start_link, [{pool_name, opts}]}
+ }
+ end
+ else
+ []
+ end
+ end
+
defp after_supervisor_start do
with digest_config <- Application.get_env(:pleroma, :email_notifications)[:digest],
true <- digest_config[:active] do
diff --git a/lib/pleroma/digest_email_worker.ex b/lib/pleroma/digest_email_worker.ex
index 18e67d39b..5644d6a67 100644
--- a/lib/pleroma/digest_email_worker.ex
+++ b/lib/pleroma/digest_email_worker.ex
@@ -1,3 +1,7 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
defmodule Pleroma.DigestEmailWorker do
import Ecto.Query
diff --git a/lib/pleroma/gun/api/api.ex b/lib/pleroma/gun/api/api.ex
index 19adc1bf0..7e6d2f929 100644
--- a/lib/pleroma/gun/api/api.ex
+++ b/lib/pleroma/gun/api/api.ex
@@ -4,11 +4,21 @@
defmodule Pleroma.Gun.API do
@callback open(charlist(), pos_integer(), map()) :: {:ok, pid()}
+ @callback info(pid()) :: map()
+ @callback close(pid()) :: :ok
def open(host, port, opts) do
api().open(host, port, opts)
end
+ def info(pid) do
+ api().info(pid)
+ end
+
+ def close(pid) do
+ api().close(pid)
+ end
+
defp api do
Pleroma.Config.get([Pleroma.Gun.API], Pleroma.Gun.API.Gun)
end
diff --git a/lib/pleroma/gun/api/gun.ex b/lib/pleroma/gun/api/gun.ex
index 14a4b7275..d97f5a7c9 100644
--- a/lib/pleroma/gun/api/gun.ex
+++ b/lib/pleroma/gun/api/gun.ex
@@ -1,6 +1,12 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
defmodule Pleroma.Gun.API.Gun do
@behaviour Pleroma.Gun.API
+ alias Pleroma.Gun.API
+
@gun_keys [
:connect_timeout,
:http_opts,
@@ -15,8 +21,14 @@ defmodule Pleroma.Gun.API.Gun do
:ws_opts
]
- @impl Pleroma.Gun.API
+ @impl API
def open(host, port, opts) do
:gun.open(host, port, Map.take(opts, @gun_keys))
end
+
+ @impl API
+ def info(pid), do: :gun.info(pid)
+
+ @impl API
+ def close(pid), do: :gun.close(pid)
end
diff --git a/lib/pleroma/gun/api/mock.ex b/lib/pleroma/gun/api/mock.ex
index ff9e13a74..b1a30a73c 100644
--- a/lib/pleroma/gun/api/mock.ex
+++ b/lib/pleroma/gun/api/mock.ex
@@ -4,37 +4,80 @@
defmodule Pleroma.Gun.API.Mock do
@behaviour Pleroma.Gun.API
- @impl Pleroma.Gun.API
- def open('some-domain.com', 80, %{genserver_pid: genserver_pid}) do
+
+ alias Pleroma.Gun.API
+
+ @impl API
+ def open(domain, 80, %{genserver_pid: genserver_pid})
+ when domain in ['another-domain.com', 'some-domain.com'] do
{:ok, conn_pid} = Task.start_link(fn -> Process.sleep(1_000) end)
+
+ Registry.register(API.Mock, conn_pid, %{
+ origin_scheme: "http",
+ origin_host: domain,
+ origin_port: 80
+ })
+
send(genserver_pid, {:gun_up, conn_pid, :http})
{:ok, conn_pid}
end
+ @impl API
def open('some-domain.com', 443, %{genserver_pid: genserver_pid}) do
{:ok, conn_pid} = Task.start_link(fn -> Process.sleep(1_000) end)
+
+ Registry.register(API.Mock, conn_pid, %{
+ origin_scheme: "https",
+ origin_host: 'some-domain.com',
+ origin_port: 443
+ })
+
send(genserver_pid, {:gun_up, conn_pid, :http2})
{:ok, conn_pid}
end
- @impl Pleroma.Gun.API
- def open('gun_down.com', _port, %{genserver_pid: genserver_pid}) do
+ @impl API
+ def open('gun_down.com', 80, %{genserver_pid: genserver_pid}) do
{:ok, conn_pid} = Task.start_link(fn -> Process.sleep(1_000) end)
+
+ Registry.register(API.Mock, conn_pid, %{
+ origin_scheme: "http",
+ origin_host: 'gun_down.com',
+ origin_port: 80
+ })
+
send(genserver_pid, {:gun_down, conn_pid, :http, nil, nil, nil})
{:ok, conn_pid}
end
- @impl Pleroma.Gun.API
- def open('gun_down_and_up.com', _port, %{genserver_pid: genserver_pid}) do
+ @impl API
+ def open('gun_down_and_up.com', 80, %{genserver_pid: genserver_pid}) do
{:ok, conn_pid} = Task.start_link(fn -> Process.sleep(1_000) end)
+
+ Registry.register(API.Mock, conn_pid, %{
+ origin_scheme: "http",
+ origin_host: 'gun_down_and_up.com',
+ origin_port: 80
+ })
+
send(genserver_pid, {:gun_down, conn_pid, :http, nil, nil, nil})
{:ok, _} =
Task.start_link(fn ->
Process.sleep(500)
+
send(genserver_pid, {:gun_up, conn_pid, :http})
end)
{:ok, conn_pid}
end
+
+ @impl API
+ def info(pid) do
+ [{_, info}] = Registry.lookup(API.Mock, pid)
+ info
+ end
+
+ @impl API
+ def close(_pid), do: :ok
end
diff --git a/lib/pleroma/gun/conn.ex b/lib/pleroma/gun/conn.ex
index 62ef146a1..20ddec64c 100644
--- a/lib/pleroma/gun/conn.ex
+++ b/lib/pleroma/gun/conn.ex
@@ -10,8 +10,8 @@ defmodule Pleroma.Gun.Conn do
conn: pid(),
state: atom(),
waiting_pids: [pid()],
- protocol: atom()
+ used: pos_integer()
}
- defstruct conn: nil, state: :open, waiting_pids: [], protocol: :http
+ defstruct conn: nil, state: :open, waiting_pids: [], used: 0
end
diff --git a/lib/pleroma/gun/connections.ex b/lib/pleroma/gun/connections.ex
index 695c3c93e..3716d9f74 100644
--- a/lib/pleroma/gun/connections.ex
+++ b/lib/pleroma/gun/connections.ex
@@ -6,35 +6,29 @@ defmodule Pleroma.Gun.Connections do
use GenServer
@type domain :: String.t()
- @type conn :: Gun.Conn.t()
+ @type conn :: Pleroma.Gun.Conn.t()
+
@type t :: %__MODULE__{
- conns: %{domain() => conn()}
+ conns: %{domain() => conn()},
+ opts: keyword()
}
- defstruct conns: %{}
-
- def start_link(name \\ __MODULE__)
+ defstruct conns: %{}, opts: []
- def start_link(name) when is_atom(name) do
- GenServer.start_link(__MODULE__, [], name: name)
- end
+ alias Pleroma.Gun.API
- def start_link(_) do
- if Application.get_env(:tesla, :adapter) == Tesla.Adapter.Gun do
- GenServer.start_link(__MODULE__, [])
- else
- :ignore
- end
+ @spec start_link({atom(), keyword()}) :: {:ok, pid()} | :ignore
+ def start_link({name, opts}) do
+ GenServer.start_link(__MODULE__, opts, name: name)
end
@impl true
- def init(_) do
- {:ok, %__MODULE__{conns: %{}}}
- end
+ def init(opts), do: {:ok, %__MODULE__{conns: %{}, opts: opts}}
@spec get_conn(String.t(), keyword(), atom()) :: pid()
- def get_conn(url, opts \\ [], name \\ __MODULE__) do
+ def get_conn(url, opts \\ [], name \\ :default) do
opts = Enum.into(opts, %{})
+
uri = URI.parse(url)
opts =
@@ -42,35 +36,33 @@ defmodule Pleroma.Gun.Connections do
do: Map.put(opts, :transport, :tls),
else: opts
+ opts =
+ if uri.scheme == "https" do
+ host = uri.host |> to_charlist()
+
+ tls_opts =
+ Map.get(opts, :tls_opts, [])
+ |> Keyword.put(:server_name_indication, host)
+
+ Map.put(opts, :tls_opts, tls_opts)
+ else
+ opts
+ end
+
GenServer.call(
name,
{:conn, %{opts: opts, uri: uri}}
)
end
- # TODO: only for testing, add this parameter to the config
- @spec try_to_get_gun_conn(String.t(), keyword(), atom()) :: nil | pid()
- def try_to_get_gun_conn(url, opts \\ [], name \\ __MODULE__),
- do: try_to_get_gun_conn(url, opts, name, 0)
-
- @spec try_to_get_gun_conn(String.t(), keyword(), atom(), pos_integer()) :: nil | pid()
- def try_to_get_gun_conn(_url, _, _, 3), do: nil
-
- def try_to_get_gun_conn(url, opts, name, acc) do
- case Pleroma.Gun.Connections.get_conn(url, opts, name) do
- nil -> try_to_get_gun_conn(url, acc + 1)
- conn -> conn
- end
- end
-
@spec alive?(atom()) :: boolean()
- def alive?(name \\ __MODULE__) do
+ def alive?(name \\ :default) do
pid = Process.whereis(name)
if pid, do: Process.alive?(pid), else: false
end
@spec get_state(atom()) :: t()
- def get_state(name \\ __MODULE__) do
+ def get_state(name \\ :default) do
GenServer.call(name, {:state})
end
@@ -79,7 +71,8 @@ defmodule Pleroma.Gun.Connections do
key = compose_key(uri)
case state.conns[key] do
- %{conn: conn, state: conn_state} when conn_state == :up ->
+ %{conn: conn, state: conn_state, used: used} when conn_state == :up ->
+ state = put_in(state.conns[key].used, used + 1)
{:reply, conn, state}
%{state: conn_state, waiting_pids: pids} when conn_state in [:open, :down] ->
@@ -87,16 +80,23 @@ defmodule Pleroma.Gun.Connections do
{:noreply, state}
nil ->
- {:ok, conn} = Pleroma.Gun.API.open(to_charlist(uri.host), uri.port, opts)
+ max_connections = state.opts[:max_connections]
- state =
- put_in(state.conns[key], %Pleroma.Gun.Conn{
- conn: conn,
- waiting_pids: [from],
- protocol: String.to_atom(uri.scheme)
- })
+ if Enum.count(state.conns) < max_connections do
+ open_conn(key, uri, from, state, opts)
+ else
+ [{close_key, least_used} | _conns] = Enum.sort_by(state.conns, fn {_k, v} -> v.used end)
- {:noreply, state}
+ :ok = API.close(least_used.conn)
+
+ state =
+ put_in(
+ state.conns,
+ Map.delete(state.conns, close_key)
+ )
+
+ open_conn(key, uri, from, state, opts)
+ end
end
end
@@ -105,36 +105,63 @@ defmodule Pleroma.Gun.Connections do
@impl true
def handle_info({:gun_up, conn_pid, _protocol}, state) do
- {key, conn} = find_conn(state.conns, conn_pid)
+ conn_key = compose_key_gun_info(conn_pid)
+ {key, conn} = find_conn(state.conns, conn_pid, conn_key)
# Send to all waiting processes connection pid
Enum.each(conn.waiting_pids, fn waiting_pid -> GenServer.reply(waiting_pid, conn_pid) end)
# Update state of the current connection and set waiting_pids to empty list
- state = put_in(state.conns[key], %{conn | state: :up, waiting_pids: []})
+ state =
+ put_in(state.conns[key], %{
+ conn
+ | state: :up,
+ waiting_pids: [],
+ used: conn.used + length(conn.waiting_pids)
+ })
+
{:noreply, state}
end
@impl true
- # Do we need to do something with killed & unprocessed references?
def handle_info({:gun_down, conn_pid, _protocol, _reason, _killed, _unprocessed}, state) do
+ # we can't get info on this pid, because pid is dead
{key, conn} = find_conn(state.conns, conn_pid)
- # We don't want to block requests to GenServer.
- # If gun sends a down message, return nil, so we can make some
- # retries, while the connection is not up.
Enum.each(conn.waiting_pids, fn waiting_pid -> GenServer.reply(waiting_pid, nil) end)
state = put_in(state.conns[key].state, :down)
{:noreply, state}
end
- defp compose_key(uri), do: uri.host <> ":" <> to_string(uri.port)
+ defp compose_key(uri), do: "#{uri.scheme}:#{uri.host}:#{uri.port}"
+
+ defp compose_key_gun_info(pid) do
+ info = API.info(pid)
+ "#{info.origin_scheme}:#{info.origin_host}:#{info.origin_port}"
+ end
defp find_conn(conns, conn_pid) do
+ Enum.find(conns, fn {_key, conn} ->
+ conn.conn == conn_pid
+ end)
+ end
+
+ defp find_conn(conns, conn_pid, conn_key) do
Enum.find(conns, fn {key, conn} ->
- protocol = if String.ends_with?(key, ":443"), do: :https, else: :http
- conn.conn == conn_pid and conn.protocol == protocol
+ key == conn_key and conn.conn == conn_pid
end)
end
+
+ defp open_conn(key, uri, from, state, opts) do
+ {:ok, conn} = API.open(to_charlist(uri.host), uri.port, opts)
+
+ state =
+ put_in(state.conns[key], %Pleroma.Gun.Conn{
+ conn: conn,
+ waiting_pids: [from]
+ })
+
+ {:noreply, state}
+ end
end
diff --git a/lib/pleroma/http/connection.ex b/lib/pleroma/http/connection.ex
index 6cb26c0fe..ef2ee918d 100644
--- a/lib/pleroma/http/connection.ex
+++ b/lib/pleroma/http/connection.ex
@@ -9,7 +9,9 @@ defmodule Pleroma.HTTP.Connection do
@options [
connect_timeout: 10_000,
- timeout: 20_000
+ timeout: 20_000,
+ pool: :federation,
+ version: :master
]
@doc """
@@ -33,9 +35,33 @@ defmodule Pleroma.HTTP.Connection do
adapter_options = Pleroma.Config.get([:http, :adapter], [])
proxy_url = Pleroma.Config.get([:http, :proxy_url], nil)
- @options
- |> Keyword.merge(adapter_options)
- |> Keyword.merge(options)
- |> Keyword.merge(proxy: proxy_url)
+ options =
+ @options
+ |> Keyword.merge(adapter_options)
+ |> Keyword.merge(options)
+ |> Keyword.merge(proxy: proxy_url)
+
+ pool = options[:pool]
+ url = options[:url]
+
+ if not is_nil(url) and not is_nil(pool) and Pleroma.Gun.Connections.alive?(pool) do
+ get_conn_for_gun(url, options, pool)
+ else
+ options
+ end
+ end
+
+ defp get_conn_for_gun(url, options, pool) do
+ case Pleroma.Gun.Connections.get_conn(url, options, pool) do
+ nil ->
+ options
+
+ conn ->
+ %{host: host, port: port} = URI.parse(url)
+
+ Keyword.put(options, :conn, conn)
+ |> Keyword.put(:close_conn, false)
+ |> Keyword.put(:original, "#{host}:#{port}")
+ end
end
end
diff --git a/lib/pleroma/http/http.ex b/lib/pleroma/http/http.ex
index 3e29b7b6c..00558d70a 100644
--- a/lib/pleroma/http/http.ex
+++ b/lib/pleroma/http/http.ex
@@ -35,8 +35,12 @@ defmodule Pleroma.HTTP do
adapter_gun? = Application.get_env(:tesla, :adapter) == Tesla.Adapter.Gun
options =
- if adapter_gun? and Pleroma.Gun.Connections.alive?() do
- get_conn_for_gun(url, options)
+ if adapter_gun? do
+ adapter_opts =
+ Keyword.get(options, :adapter, [])
+ |> Keyword.put(:url, url)
+
+ Keyword.put(options, :adapter, adapter_opts)
else
options
end
@@ -87,12 +91,7 @@ defmodule Pleroma.HTTP do
case uri.scheme do
"https" ->
- tls_opts =
- Keyword.get(options, :tls_opts, [])
- |> Keyword.put(:server_name_indication, host)
- |> Keyword.put(:versions, [:"tlsv1.2", :"tlsv1.1", :tlsv1])
-
- Keyword.put(options, :tls_opts, tls_opts) ++ [ssl: [server_name_indication: host]]
+ options ++ [ssl: [server_name_indication: host]]
_ ->
options
diff --git a/lib/pleroma/reverse_proxy/client/hackney.ex b/lib/pleroma/reverse_proxy/client/hackney.ex
index e6293646a..402c183af 100644
--- a/lib/pleroma/reverse_proxy/client/hackney.ex
+++ b/lib/pleroma/reverse_proxy/client/hackney.ex
@@ -1,3 +1,7 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
defmodule Pleroma.ReverseProxy.Client.Hackney do
@behaviour Pleroma.ReverseProxy.Client
diff --git a/lib/pleroma/reverse_proxy/reverse_proxy.ex b/lib/pleroma/reverse_proxy/reverse_proxy.ex
index c3ef412a4..a2cdcf393 100644
--- a/lib/pleroma/reverse_proxy/reverse_proxy.ex
+++ b/lib/pleroma/reverse_proxy/reverse_proxy.ex
@@ -109,7 +109,11 @@ defmodule Pleroma.ReverseProxy do
end
with {:ok, code, headers, client} <- request(method, url, req_headers, client_opts),
- :ok <- header_length_constraint(headers, Keyword.get(opts, :max_body_length)) do
+ :ok <-
+ header_length_constraint(
+ headers,
+ Keyword.get(opts, :max_body_length, @max_body_length)
+ ) do
response(conn, client, url, code, headers, opts)
else
{:ok, code, headers} ->
@@ -200,7 +204,11 @@ defmodule Pleroma.ReverseProxy do
{:ok, data, client} <- client().stream_body(client),
{:ok, duration} <- increase_read_duration(duration),
sent_so_far = sent_so_far + byte_size(data),
- :ok <- body_size_constraint(sent_so_far, Keyword.get(opts, :max_body_size)),
+ :ok <-
+ body_size_constraint(
+ sent_so_far,
+ Keyword.get(opts, :max_body_length, @max_body_length)
+ ),
{:ok, conn} <- chunk(conn, data) do
chunk_reply(conn, client, opts, sent_so_far, duration)
else
diff --git a/lib/pleroma/user.ex b/lib/pleroma/user.ex
index 829de6e31..02011f4e6 100644
--- a/lib/pleroma/user.ex
+++ b/lib/pleroma/user.ex
@@ -750,6 +750,7 @@ defmodule Pleroma.User do
|> update_and_set_cache()
end
+ @spec maybe_fetch_follow_information(User.t()) :: User.t()
def maybe_fetch_follow_information(user) do
with {:ok, user} <- fetch_follow_information(user) do
user
@@ -807,9 +808,10 @@ defmodule Pleroma.User do
end
end
+ @spec maybe_update_following_count(User.t()) :: User.t()
def maybe_update_following_count(%User{local: false} = user) do
if Pleroma.Config.get([:instance, :external_user_synchronization]) do
- {:ok, maybe_fetch_follow_information(user)}
+ maybe_fetch_follow_information(user)
else
user
end