aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexander Strizhakov <alex.strizhakov@gmail.com>2020-02-24 19:56:27 +0300
committerAlexander Strizhakov <alex.strizhakov@gmail.com>2020-02-24 19:56:27 +0300
commit8efae966b1e87fe448a13d04eae0898c4a102c29 (patch)
tree9c2cb158991e7ac582a9e6b4e2979016ec3a4428
parentd44f9e3b6cfd5a0dae07f6194bfd05360afd6560 (diff)
downloadpleroma-8efae966b1e87fe448a13d04eae0898c4a102c29.tar.gz
open conn in separate task
-rw-r--r--lib/mix/tasks/pleroma/benchmark.ex2
-rw-r--r--lib/pleroma/gun/api.ex7
-rw-r--r--lib/pleroma/gun/api/mock.ex5
-rw-r--r--lib/pleroma/gun/conn.ex146
-rw-r--r--lib/pleroma/gun/gun.ex5
-rw-r--r--lib/pleroma/http/adapter/gun.ex21
-rw-r--r--lib/pleroma/pool/connections.ex287
-rw-r--r--restarter/lib/pleroma.ex4
-rw-r--r--test/gun/gun_test.exs6
-rw-r--r--test/http/adapter/gun_test.exs17
-rw-r--r--test/http/connection_test.exs2
-rw-r--r--test/pool/connections_test.exs188
12 files changed, 385 insertions, 305 deletions
diff --git a/lib/mix/tasks/pleroma/benchmark.ex b/lib/mix/tasks/pleroma/benchmark.ex
index 01e079136..7a7430289 100644
--- a/lib/mix/tasks/pleroma/benchmark.ex
+++ b/lib/mix/tasks/pleroma/benchmark.ex
@@ -79,7 +79,7 @@ defmodule Mix.Tasks.Pleroma.Benchmark do
start_pleroma()
:ok =
- Pleroma.Pool.Connections.open_conn(
+ Pleroma.Gun.Conn.open(
"https://httpbin.org/stream-bytes/1500",
:gun_connections
)
diff --git a/lib/pleroma/gun/api.ex b/lib/pleroma/gun/api.ex
index a0c3c5415..f79c9f443 100644
--- a/lib/pleroma/gun/api.ex
+++ b/lib/pleroma/gun/api.ex
@@ -6,9 +6,10 @@ defmodule Pleroma.Gun.API do
@callback open(charlist(), pos_integer(), map()) :: {:ok, pid()}
@callback info(pid()) :: map()
@callback close(pid()) :: :ok
- @callback await_up(pid) :: {:ok, atom()} | {:error, atom()}
+ @callback await_up(pid, pos_integer()) :: {:ok, atom()} | {:error, atom()}
@callback connect(pid(), map()) :: reference()
@callback await(pid(), reference()) :: {:response, :fin, 200, []}
+ @callback set_owner(pid(), pid()) :: :ok
def open(host, port, opts), do: api().open(host, port, opts)
@@ -16,11 +17,13 @@ defmodule Pleroma.Gun.API do
def close(pid), do: api().close(pid)
- def await_up(pid), do: api().await_up(pid)
+ def await_up(pid, timeout \\ 5_000), do: api().await_up(pid, timeout)
def connect(pid, opts), do: api().connect(pid, opts)
def await(pid, ref), do: api().await(pid, ref)
+ def set_owner(pid, owner), do: api().set_owner(pid, owner)
+
defp api, do: Pleroma.Config.get([Pleroma.Gun.API], Pleroma.Gun)
end
diff --git a/lib/pleroma/gun/api/mock.ex b/lib/pleroma/gun/api/mock.ex
index 0134b016e..6d24b0e69 100644
--- a/lib/pleroma/gun/api/mock.ex
+++ b/lib/pleroma/gun/api/mock.ex
@@ -118,7 +118,10 @@ defmodule Pleroma.Gun.API.Mock do
end
@impl API
- def await_up(_pid), do: {:ok, :http}
+ def await_up(_pid, _timeout), do: {:ok, :http}
+
+ @impl API
+ def set_owner(_pid, _owner), do: :ok
@impl API
def connect(pid, %{host: _, port: 80}) do
diff --git a/lib/pleroma/gun/conn.ex b/lib/pleroma/gun/conn.ex
index 2474829d6..ddb9f30b0 100644
--- a/lib/pleroma/gun/conn.ex
+++ b/lib/pleroma/gun/conn.ex
@@ -6,6 +6,11 @@ defmodule Pleroma.Gun.Conn do
@moduledoc """
Struct for gun connection data
"""
+ alias Pleroma.Gun.API
+ alias Pleroma.Pool.Connections
+
+ require Logger
+
@type gun_state :: :up | :down
@type conn_state :: :active | :idle
@@ -26,4 +31,145 @@ defmodule Pleroma.Gun.Conn do
last_reference: 0,
crf: 1,
retries: 0
+
+ @spec open(String.t() | URI.t(), atom(), keyword()) :: :ok | nil
+ def open(url, name, opts \\ [])
+ def open(url, name, opts) when is_binary(url), do: open(URI.parse(url), name, opts)
+
+ def open(%URI{} = uri, name, opts) do
+ pool_opts = Pleroma.Config.get([:connections_pool], [])
+
+ opts =
+ opts
+ |> Enum.into(%{})
+ |> Map.put_new(:retry, pool_opts[:retry] || 0)
+ |> Map.put_new(:retry_timeout, pool_opts[:retry_timeout] || 100)
+ |> Map.put_new(:await_up_timeout, pool_opts[:await_up_timeout] || 5_000)
+
+ key = "#{uri.scheme}:#{uri.host}:#{uri.port}"
+
+ Logger.debug("opening new connection #{Connections.compose_uri_log(uri)}")
+
+ conn_pid =
+ if Connections.count(name) < opts[:max_connection] do
+ do_open(uri, opts)
+ else
+ try_do_open(name, uri, opts)
+ end
+
+ if is_pid(conn_pid) do
+ conn = %Pleroma.Gun.Conn{
+ conn: conn_pid,
+ gun_state: :up,
+ conn_state: :active,
+ last_reference: :os.system_time(:second)
+ }
+
+ :ok = API.set_owner(conn_pid, Process.whereis(name))
+ Connections.add_conn(name, key, conn)
+ end
+ end
+
+ defp do_open(uri, %{proxy: {proxy_host, proxy_port}} = opts) do
+ connect_opts =
+ uri
+ |> destination_opts()
+ |> add_http2_opts(uri.scheme, Map.get(opts, :tls_opts, []))
+
+ with open_opts <- Map.delete(opts, :tls_opts),
+ {:ok, conn} <- API.open(proxy_host, proxy_port, open_opts),
+ {:ok, _} <- API.await_up(conn, opts[:await_up_timeout]),
+ stream <- API.connect(conn, connect_opts),
+ {:response, :fin, 200, _} <- API.await(conn, stream) do
+ conn
+ else
+ error ->
+ Logger.warn(
+ "Received error on opening connection with http proxy #{
+ Connections.compose_uri_log(uri)
+ } #{inspect(error)}"
+ )
+
+ nil
+ end
+ end
+
+ defp do_open(uri, %{proxy: {proxy_type, proxy_host, proxy_port}} = opts) do
+ version =
+ proxy_type
+ |> to_string()
+ |> String.last()
+ |> case do
+ "4" -> 4
+ _ -> 5
+ end
+
+ socks_opts =
+ uri
+ |> destination_opts()
+ |> add_http2_opts(uri.scheme, Map.get(opts, :tls_opts, []))
+ |> Map.put(:version, version)
+
+ opts =
+ opts
+ |> Map.put(:protocols, [:socks])
+ |> Map.put(:socks_opts, socks_opts)
+
+ with {:ok, conn} <- API.open(proxy_host, proxy_port, opts),
+ {:ok, _} <- API.await_up(conn, opts[:await_up_timeout]) do
+ conn
+ else
+ error ->
+ Logger.warn(
+ "Received error on opening connection with socks proxy #{
+ Connections.compose_uri_log(uri)
+ } #{inspect(error)}"
+ )
+
+ nil
+ end
+ end
+
+ defp do_open(%URI{host: host, port: port} = uri, opts) do
+ {_type, host} = Pleroma.HTTP.Adapter.domain_or_ip(host)
+
+ with {:ok, conn} <- API.open(host, port, opts),
+ {:ok, _} <- API.await_up(conn, opts[:await_up_timeout]) do
+ conn
+ else
+ error ->
+ Logger.warn(
+ "Received error on opening connection #{Connections.compose_uri_log(uri)} #{
+ inspect(error)
+ }"
+ )
+
+ nil
+ end
+ end
+
+ defp destination_opts(%URI{host: host, port: port}) do
+ {_type, host} = Pleroma.HTTP.Adapter.domain_or_ip(host)
+ %{host: host, port: port}
+ end
+
+ defp add_http2_opts(opts, "https", tls_opts) do
+ Map.merge(opts, %{protocols: [:http2], transport: :tls, tls_opts: tls_opts})
+ end
+
+ defp add_http2_opts(opts, _, _), do: opts
+
+ defp try_do_open(name, uri, opts) do
+ Logger.debug("try to open conn #{Connections.compose_uri_log(uri)}")
+
+ with [{close_key, least_used} | _conns] <-
+ Connections.get_unused_conns(name),
+ :ok <- Pleroma.Gun.API.close(least_used.conn) do
+ Connections.remove_conn(name, close_key)
+
+ do_open(uri, opts)
+ else
+ [] -> nil
+ end
+ end
end
diff --git a/lib/pleroma/gun/gun.ex b/lib/pleroma/gun/gun.ex
index 4a1bbc95f..da82983b1 100644
--- a/lib/pleroma/gun/gun.ex
+++ b/lib/pleroma/gun/gun.ex
@@ -32,7 +32,7 @@ defmodule Pleroma.Gun do
defdelegate close(pid), to: :gun
@impl API
- defdelegate await_up(pid), to: :gun
+ defdelegate await_up(pid, timeout \\ 5_000), to: :gun
@impl API
defdelegate connect(pid, opts), to: :gun
@@ -42,4 +42,7 @@ defmodule Pleroma.Gun do
@spec flush(pid() | reference()) :: :ok
defdelegate flush(pid), to: :gun
+
+ @impl API
+ defdelegate set_owner(pid, owner), to: :gun
end
diff --git a/lib/pleroma/http/adapter/gun.ex b/lib/pleroma/http/adapter/gun.ex
index 7b7e38d8c..908d71898 100644
--- a/lib/pleroma/http/adapter/gun.ex
+++ b/lib/pleroma/http/adapter/gun.ex
@@ -12,7 +12,7 @@ defmodule Pleroma.HTTP.Adapter.Gun do
alias Pleroma.Pool.Connections
@defaults [
- connect_timeout: 20_000,
+ connect_timeout: 5_000,
domain_lookup_timeout: 5_000,
tls_handshake_timeout: 5_000,
retry: 0,
@@ -94,13 +94,11 @@ defmodule Pleroma.HTTP.Adapter.Gun do
"Gun connections pool checkin was not successful. Trying to open conn for next request."
)
- :ok = Connections.open_conn(uri, :gun_connections, opts)
+ Task.start(fn -> Pleroma.Gun.Conn.open(uri, :gun_connections, opts) end)
opts
conn when is_pid(conn) ->
- Logger.debug(
- "received conn #{inspect(conn)} #{uri.scheme}://#{Connections.compose_uri(uri)}"
- )
+ Logger.debug("received conn #{inspect(conn)} #{Connections.compose_uri_log(uri)}")
opts
|> Keyword.put(:conn, conn)
@@ -109,13 +107,14 @@ defmodule Pleroma.HTTP.Adapter.Gun do
rescue
error ->
Logger.warn(
- "Gun connections pool checkin caused error #{uri.scheme}://#{
- Connections.compose_uri(uri)
- } #{inspect(error)}"
+ "Gun connections pool checkin caused error #{Connections.compose_uri_log(uri)} #{
+ inspect(error)
+ }"
)
opts
catch
+ # TODO: here must be no timeouts
:exit, {:timeout, {_, operation, [_, {method, _}, _]}} ->
{:message_queue_len, messages_len} =
:gun_connections
@@ -124,15 +123,15 @@ defmodule Pleroma.HTTP.Adapter.Gun do
Logger.warn(
"Gun connections pool checkin with timeout error for #{operation} #{method} #{
- uri.scheme
- }://#{Connections.compose_uri(uri)}. Messages length: #{messages_len}"
+ Connections.compose_uri_log(uri)
+ }. Messages length: #{messages_len}"
)
opts
:exit, error ->
Logger.warn(
- "Gun pool checkin exited with error #{uri.scheme}://#{Connections.compose_uri(uri)} #{
+ "Gun pool checkin exited with error #{Connections.compose_uri_log(uri)} #{
inspect(error)
}"
)
diff --git a/lib/pleroma/pool/connections.ex b/lib/pleroma/pool/connections.ex
index d20927580..a444f822f 100644
--- a/lib/pleroma/pool/connections.ex
+++ b/lib/pleroma/pool/connections.ex
@@ -20,7 +20,6 @@ defmodule Pleroma.Pool.Connections do
defstruct conns: %{}, opts: []
alias Pleroma.Gun.API
- alias Pleroma.Gun.Conn
@spec start_link({atom(), keyword()}) :: {:ok, pid()}
def start_link({name, opts}) do
@@ -44,23 +43,6 @@ defmodule Pleroma.Pool.Connections do
)
end
- @spec open_conn(String.t() | URI.t(), atom(), keyword()) :: :ok
- def open_conn(url, name, opts \\ [])
- def open_conn(url, name, opts) when is_binary(url), do: open_conn(URI.parse(url), name, opts)
-
- def open_conn(%URI{} = uri, name, opts) do
- pool_opts = Config.get([:connections_pool], [])
-
- opts =
- opts
- |> Enum.into(%{})
- |> Map.put_new(:retry, pool_opts[:retry] || 0)
- |> Map.put_new(:retry_timeout, pool_opts[:retry_timeout] || 100)
- |> Map.put_new(:await_up_timeout, pool_opts[:await_up_timeout] || 5_000)
-
- GenServer.cast(name, {:open_conn, %{opts: opts, uri: uri}})
- end
-
@spec alive?(atom()) :: boolean()
def alive?(name) do
pid = Process.whereis(name)
@@ -72,23 +54,37 @@ defmodule Pleroma.Pool.Connections do
GenServer.call(name, :state)
end
+ @spec count(atom()) :: pos_integer()
+ def count(name) do
+ GenServer.call(name, :count)
+ end
+
+ @spec get_unused_conns(atom()) :: [{domain(), conn()}]
+ def get_unused_conns(name) do
+ GenServer.call(name, :unused_conns)
+ end
+
@spec checkout(pid(), pid(), atom()) :: :ok
def checkout(conn, pid, name) do
GenServer.cast(name, {:checkout, conn, pid})
end
- @impl true
- def handle_cast({:open_conn, %{opts: opts, uri: uri}}, state) do
- Logger.debug("opening new #{compose_uri(uri)}")
- max_connections = state.opts[:max_connections]
+ @spec add_conn(atom(), String.t(), Pleroma.Gun.Conn.t()) :: :ok
+ def add_conn(name, key, conn) do
+ GenServer.cast(name, {:add_conn, key, conn})
+ end
- key = compose_key(uri)
+ @spec remove_conn(atom(), String.t()) :: :ok
+ def remove_conn(name, key) do
+ GenServer.cast(name, {:remove_conn, key})
+ end
- if Enum.count(state.conns) < max_connections do
- open_conn(key, uri, state, opts)
- else
- try_to_open_conn(key, uri, state, opts)
- end
+ @impl true
+ def handle_cast({:add_conn, key, conn}, state) do
+ state = put_in(state.conns[key], conn)
+
+ Process.monitor(conn.conn)
+ {:noreply, state}
end
@impl true
@@ -121,13 +117,19 @@ defmodule Pleroma.Pool.Connections do
end
@impl true
+ def handle_cast({:remove_conn, key}, state) do
+ state = put_in(state.conns, Map.delete(state.conns, key))
+ {:noreply, state}
+ end
+
+ @impl true
def handle_call({:checkin, uri}, from, state) do
- Logger.debug("checkin #{compose_uri(uri)}")
- key = compose_key(uri)
+ key = "#{uri.scheme}:#{uri.host}:#{uri.port}"
+ Logger.debug("checkin #{key}")
case state.conns[key] do
%{conn: conn, gun_state: gun_state} = current_conn when gun_state == :up ->
- Logger.debug("reusing conn #{compose_uri(uri)}")
+ Logger.debug("reusing conn #{key}")
with time <- :os.system_time(:second),
last_reference <- time - current_conn.last_reference,
@@ -155,11 +157,30 @@ defmodule Pleroma.Pool.Connections do
def handle_call(:state, _from, state), do: {:reply, state, state}
@impl true
+ def handle_call(:count, _from, state) do
+ {:reply, Enum.count(state.conns), state}
+ end
+
+ @impl true
+ def handle_call(:unused_conns, _from, state) do
+ unused_conns =
+ state.conns
+ |> Enum.filter(fn {_k, v} ->
+ v.conn_state == :idle and v.used_by == []
+ end)
+ |> Enum.sort(fn {_x_k, x}, {_y_k, y} ->
+ x.crf <= y.crf and x.last_reference <= y.last_reference
+ end)
+
+ {:reply, unused_conns, state}
+ end
+
+ @impl true
def handle_info({:gun_up, conn_pid, _protocol}, state) do
state =
- with true <- Process.alive?(conn_pid),
- conn_key when is_binary(conn_key) <- compose_key_gun_info(conn_pid),
+ with conn_key when is_binary(conn_key) <- compose_key_gun_info(conn_pid),
{key, conn} <- find_conn(state.conns, conn_pid, conn_key),
+ {true, key} <- {Process.alive?(conn_pid), key},
time <- :os.system_time(:second),
last_reference <- time - conn.last_reference,
current_crf <- crf(last_reference, 100, conn.crf) do
@@ -176,15 +197,17 @@ defmodule Pleroma.Pool.Connections do
Logger.debug(":gun.info caused error")
state
- false ->
+ {false, key} ->
Logger.debug(":gun_up message for closed conn #{inspect(conn_pid)}")
- state
- nil ->
- Logger.debug(
- ":gun_up message for alive conn #{inspect(conn_pid)}, but deleted from state"
+ put_in(
+ state.conns,
+ Map.delete(state.conns, key)
)
+ nil ->
+ Logger.debug(":gun_up message for conn which is not found in state")
+
:ok = API.close(conn_pid)
state
@@ -198,8 +221,8 @@ defmodule Pleroma.Pool.Connections do
retries = Config.get([:connections_pool, :retry], 0)
# we can't get info on this pid, because pid is dead
state =
- with true <- Process.alive?(conn_pid),
- {key, conn} <- find_conn(state.conns, conn_pid) do
+ with {key, conn} <- find_conn(state.conns, conn_pid),
+ {true, key} <- {Process.alive?(conn_pid), key} do
if conn.retries == retries do
Logger.debug("closing conn if retries is eq #{inspect(conn_pid)}")
:ok = API.close(conn.conn)
@@ -216,16 +239,18 @@ defmodule Pleroma.Pool.Connections do
})
end
else
- false ->
+ {false, key} ->
# gun can send gun_down for closed conn, maybe connection is not closed yet
Logger.debug(":gun_down message for closed conn #{inspect(conn_pid)}")
- state
- nil ->
- Logger.debug(
- ":gun_down message for alive conn #{inspect(conn_pid)}, but deleted from state"
+ put_in(
+ state.conns,
+ Map.delete(state.conns, key)
)
+ nil ->
+ Logger.debug(":gun_down message for conn which is not found in state")
+
:ok = API.close(conn_pid)
state
@@ -234,7 +259,29 @@ defmodule Pleroma.Pool.Connections do
{:noreply, state}
end
- defp compose_key(%URI{scheme: scheme, host: host, port: port}), do: "#{scheme}:#{host}:#{port}"
+ @impl true
+ def handle_info({:DOWN, _ref, :process, conn_pid, reason}, state) do
+ Logger.debug("received DOWM message for #{inspect(conn_pid)} reason -> #{inspect(reason)}")
+
+ state =
+ with {key, conn} <- find_conn(state.conns, conn_pid) do
+ Enum.each(conn.used_by, fn {pid, _ref} ->
+ Process.exit(pid, reason)
+ end)
+
+ put_in(
+ state.conns,
+ Map.delete(state.conns, key)
+ )
+ else
+ nil ->
+ Logger.debug(":DOWN message for conn which is not found in state")
+
+ state
+ end
+
+ {:noreply, state}
+ end
defp compose_key_gun_info(pid) do
try do
@@ -265,153 +312,11 @@ defmodule Pleroma.Pool.Connections do
end)
end
- defp open_conn(key, uri, state, %{proxy: {proxy_host, proxy_port}} = opts) do
- connect_opts =
- uri
- |> destination_opts()
- |> add_http2_opts(uri.scheme, Map.get(opts, :tls_opts, []))
-
- with open_opts <- Map.delete(opts, :tls_opts),
- {:ok, conn} <- API.open(proxy_host, proxy_port, open_opts),
- {:ok, _} <- API.await_up(conn),
- stream <- API.connect(conn, connect_opts),
- {:response, :fin, 200, _} <- API.await(conn, stream),
- state <-
- put_in(state.conns[key], %Conn{
- conn: conn,
- gun_state: :up,
- conn_state: :active,
- last_reference: :os.system_time(:second)
- }) do
- {:noreply, state}
- else
- error ->
- Logger.warn(
- "Received error on opening connection with http proxy #{uri.scheme}://#{
- compose_uri(uri)
- }: #{inspect(error)}"
- )
-
- {:noreply, state}
- end
- end
-
- defp open_conn(key, uri, state, %{proxy: {proxy_type, proxy_host, proxy_port}} = opts) do
- version =
- proxy_type
- |> to_string()
- |> String.last()
- |> case do
- "4" -> 4
- _ -> 5
- end
-
- socks_opts =
- uri
- |> destination_opts()
- |> add_http2_opts(uri.scheme, Map.get(opts, :tls_opts, []))
- |> Map.put(:version, version)
-
- opts =
- opts
- |> Map.put(:protocols, [:socks])
- |> Map.put(:socks_opts, socks_opts)
-
- with {:ok, conn} <- API.open(proxy_host, proxy_port, opts),
- {:ok, _} <- API.await_up(conn),
- state <-
- put_in(state.conns[key], %Conn{
- conn: conn,
- gun_state: :up,
- conn_state: :active,
- last_reference: :os.system_time(:second)
- }) do
- {:noreply, state}
- else
- error ->
- Logger.warn(
- "Received error on opening connection with socks proxy #{uri.scheme}://#{
- compose_uri(uri)
- }: #{inspect(error)}"
- )
-
- {:noreply, state}
- end
- end
-
- defp open_conn(key, %URI{host: host, port: port} = uri, state, opts) do
- Logger.debug("opening conn #{compose_uri(uri)}")
- {_type, host} = Pleroma.HTTP.Adapter.domain_or_ip(host)
-
- with {:ok, conn} <- API.open(host, port, opts),
- {:ok, _} <- API.await_up(conn),
- state <-
- put_in(state.conns[key], %Conn{
- conn: conn,
- gun_state: :up,
- conn_state: :active,
- last_reference: :os.system_time(:second)
- }) do
- Logger.debug("new conn opened #{compose_uri(uri)}")
- Logger.debug("replying to the call #{compose_uri(uri)}")
- {:noreply, state}
- else
- error ->
- Logger.warn(
- "Received error on opening connection #{uri.scheme}://#{compose_uri(uri)}: #{
- inspect(error)
- }"
- )
-
- {:noreply, state}
- end
- end
-
- defp destination_opts(%URI{host: host, port: port}) do
- {_type, host} = Pleroma.HTTP.Adapter.domain_or_ip(host)
- %{host: host, port: port}
- end
-
- defp add_http2_opts(opts, "https", tls_opts) do
- Map.merge(opts, %{protocols: [:http2], transport: :tls, tls_opts: tls_opts})
- end
-
- defp add_http2_opts(opts, _, _), do: opts
-
- @spec get_unused_conns(map()) :: [{domain(), conn()}]
- def get_unused_conns(conns) do
- conns
- |> Enum.filter(fn {_k, v} ->
- v.conn_state == :idle and v.used_by == []
- end)
- |> Enum.sort(fn {_x_k, x}, {_y_k, y} ->
- x.crf <= y.crf and x.last_reference <= y.last_reference
- end)
- end
-
- defp try_to_open_conn(key, uri, state, opts) do
- Logger.debug("try to open conn #{compose_uri(uri)}")
-
- with [{close_key, least_used} | _conns] <- get_unused_conns(state.conns),
- :ok <- API.close(least_used.conn),
- state <-
- put_in(
- state.conns,
- Map.delete(state.conns, close_key)
- ) do
- Logger.debug(
- "least used conn found and closed #{inspect(least_used.conn)} #{compose_uri(uri)}"
- )
-
- open_conn(key, uri, state, opts)
- else
- [] -> {:noreply, state}
- end
- end
-
def crf(current, steps, crf) do
1 + :math.pow(0.5, current / steps) * crf
end
- def compose_uri(%URI{} = uri), do: "#{uri.host}#{uri.path}"
+ def compose_uri_log(%URI{scheme: scheme, host: host, path: path}) do
+ "#{scheme}://#{host}#{path}"
+ end
end
diff --git a/restarter/lib/pleroma.ex b/restarter/lib/pleroma.ex
index d7817909d..4ade890f9 100644
--- a/restarter/lib/pleroma.ex
+++ b/restarter/lib/pleroma.ex
@@ -44,7 +44,7 @@ defmodule Restarter.Pleroma do
end
def handle_cast({:restart, :test, _}, state) do
- Logger.warn("pleroma restarted")
+ Logger.warn("pleroma manually restarted")
{:noreply, Map.put(state, :need_reboot?, false)}
end
@@ -57,7 +57,7 @@ defmodule Restarter.Pleroma do
def handle_cast({:after_boot, _}, %{after_boot: true} = state), do: {:noreply, state}
def handle_cast({:after_boot, :test}, state) do
- Logger.warn("pleroma restarted")
+ Logger.warn("pleroma restarted after boot")
{:noreply, Map.put(state, :after_boot, true)}
end
diff --git a/test/gun/gun_test.exs b/test/gun/gun_test.exs
index 7f185617c..9f3e0f938 100644
--- a/test/gun/gun_test.exs
+++ b/test/gun/gun_test.exs
@@ -19,6 +19,12 @@ defmodule Pleroma.GunTest do
assert json = receive_response(conn, ref)
assert %{"args" => %{"a" => "b", "c" => "d"}} = Jason.decode!(json)
+
+ {:ok, pid} = Task.start(fn -> Process.sleep(50) end)
+
+ :ok = :gun.set_owner(conn, pid)
+
+ assert :gun.info(conn).owner == pid
end
defp receive_response(conn, ref, acc \\ "") do
diff --git a/test/http/adapter/gun_test.exs b/test/http/adapter/gun_test.exs
index ef1b4a882..a8dcbae04 100644
--- a/test/http/adapter/gun_test.exs
+++ b/test/http/adapter/gun_test.exs
@@ -7,6 +7,7 @@ defmodule Pleroma.HTTP.Adapter.GunTest do
use Pleroma.Tests.Helpers
import ExUnit.CaptureLog
alias Pleroma.Config
+ alias Pleroma.Gun.Conn
alias Pleroma.HTTP.Adapter.Gun
alias Pleroma.Pool.Connections
@@ -72,7 +73,7 @@ defmodule Pleroma.HTTP.Adapter.GunTest do
test "receive conn by default" do
uri = URI.parse("http://another-domain.com")
- :ok = Connections.open_conn(uri, :gun_connections)
+ :ok = Conn.open(uri, :gun_connections)
received_opts = Gun.options(uri)
assert received_opts[:close_conn] == false
@@ -81,7 +82,7 @@ defmodule Pleroma.HTTP.Adapter.GunTest do
test "don't receive conn if receive_conn is false" do
uri = URI.parse("http://another-domain2.com")
- :ok = Connections.open_conn(uri, :gun_connections)
+ :ok = Conn.open(uri, :gun_connections)
opts = [receive_conn: false]
received_opts = Gun.options(opts, uri)
@@ -118,7 +119,7 @@ defmodule Pleroma.HTTP.Adapter.GunTest do
test "default ssl adapter opts with connection" do
uri = URI.parse("https://some-domain.com")
- :ok = Connections.open_conn(uri, :gun_connections)
+ :ok = Conn.open(uri, :gun_connections)
opts = Gun.options(uri)
@@ -167,7 +168,7 @@ defmodule Pleroma.HTTP.Adapter.GunTest do
describe "after_request/1" do
test "body_as not chunks" do
uri = URI.parse("http://some-domain.com")
- :ok = Connections.open_conn(uri, :gun_connections)
+ :ok = Conn.open(uri, :gun_connections)
opts = Gun.options(uri)
:ok = Gun.after_request(opts)
conn = opts[:conn]
@@ -185,7 +186,7 @@ defmodule Pleroma.HTTP.Adapter.GunTest do
test "body_as chunks" do
uri = URI.parse("http://some-domain.com")
- :ok = Connections.open_conn(uri, :gun_connections)
+ :ok = Conn.open(uri, :gun_connections)
opts = Gun.options([body_as: :chunks], uri)
:ok = Gun.after_request(opts)
conn = opts[:conn]
@@ -205,7 +206,7 @@ defmodule Pleroma.HTTP.Adapter.GunTest do
test "with no connection" do
uri = URI.parse("http://uniq-domain.com")
- :ok = Connections.open_conn(uri, :gun_connections)
+ :ok = Conn.open(uri, :gun_connections)
opts = Gun.options([body_as: :chunks], uri)
conn = opts[:conn]
@@ -227,7 +228,7 @@ defmodule Pleroma.HTTP.Adapter.GunTest do
test "with ipv4" do
uri = URI.parse("http://127.0.0.1")
- :ok = Connections.open_conn(uri, :gun_connections)
+ :ok = Conn.open(uri, :gun_connections)
opts = Gun.options(uri)
send(:gun_connections, {:gun_up, opts[:conn], :http})
:ok = Gun.after_request(opts)
@@ -246,7 +247,7 @@ defmodule Pleroma.HTTP.Adapter.GunTest do
test "with ipv6" do
uri = URI.parse("http://[2a03:2880:f10c:83:face:b00c:0:25de]")
- :ok = Connections.open_conn(uri, :gun_connections)
+ :ok = Conn.open(uri, :gun_connections)
opts = Gun.options(uri)
send(:gun_connections, {:gun_up, opts[:conn], :http})
:ok = Gun.after_request(opts)
diff --git a/test/http/connection_test.exs b/test/http/connection_test.exs
index c1ff0cc21..53ccbc9cd 100644
--- a/test/http/connection_test.exs
+++ b/test/http/connection_test.exs
@@ -124,7 +124,7 @@ defmodule Pleroma.HTTP.ConnectionTest do
uri = URI.parse("https://some-domain.com")
pid = Process.whereis(:federation)
- :ok = Pleroma.Pool.Connections.open_conn(uri, :gun_connections, genserver_pid: pid)
+ :ok = Pleroma.Gun.Conn.open(uri, :gun_connections, genserver_pid: pid)
opts = Connection.options(uri)
diff --git a/test/pool/connections_test.exs b/test/pool/connections_test.exs
index d0d711c55..f766e3b5f 100644
--- a/test/pool/connections_test.exs
+++ b/test/pool/connections_test.exs
@@ -45,7 +45,7 @@ defmodule Pleroma.Pool.ConnectionsTest do
url = "http://some-domain.com"
key = "http:some-domain.com:80"
refute Connections.checkin(url, name)
- :ok = Connections.open_conn(url, name)
+ :ok = Conn.open(url, name)
conn = Connections.checkin(url, name)
assert is_pid(conn)
@@ -110,7 +110,7 @@ defmodule Pleroma.Pool.ConnectionsTest do
url = "http://ですsome-domain.com"
refute Connections.checkin(url, name)
- :ok = Connections.open_conn(url, name)
+ :ok = Conn.open(url, name)
conn = Connections.checkin(url, name)
assert is_pid(conn)
@@ -139,7 +139,7 @@ defmodule Pleroma.Pool.ConnectionsTest do
refute Connections.checkin(url, name)
- :ok = Connections.open_conn(url, name)
+ :ok = Conn.open(url, name)
conn = Connections.checkin(url, name)
assert is_pid(conn)
@@ -182,7 +182,7 @@ defmodule Pleroma.Pool.ConnectionsTest do
refute Connections.checkin(url, name)
- :ok = Connections.open_conn(url, name)
+ :ok = Conn.open(url, name)
conn = Connections.checkin(url, name)
assert is_pid(conn)
@@ -209,7 +209,7 @@ defmodule Pleroma.Pool.ConnectionsTest do
test "up and down ipv4", %{name: name} do
self = self()
url = "http://127.0.0.1"
- :ok = Connections.open_conn(url, name)
+ :ok = Conn.open(url, name)
conn = Connections.checkin(url, name)
send(name, {:gun_down, conn, nil, nil, nil})
send(name, {:gun_up, conn, nil})
@@ -229,7 +229,7 @@ defmodule Pleroma.Pool.ConnectionsTest do
test "up and down ipv6", %{name: name} do
self = self()
url = "http://[2a03:2880:f10c:83:face:b00c:0:25de]"
- :ok = Connections.open_conn(url, name)
+ :ok = Conn.open(url, name)
conn = Connections.checkin(url, name)
send(name, {:gun_down, conn, nil, nil, nil})
send(name, {:gun_up, conn, nil})
@@ -253,13 +253,13 @@ defmodule Pleroma.Pool.ConnectionsTest do
https_key = "https:some-domain.com:443"
refute Connections.checkin(http_url, name)
- :ok = Connections.open_conn(http_url, name)
+ :ok = Conn.open(http_url, name)
conn = Connections.checkin(http_url, name)
assert is_pid(conn)
assert Process.alive?(conn)
refute Connections.checkin(https_url, name)
- :ok = Connections.open_conn(https_url, name)
+ :ok = Conn.open(https_url, name)
https_conn = Connections.checkin(https_url, name)
refute conn == https_conn
@@ -288,17 +288,17 @@ defmodule Pleroma.Pool.ConnectionsTest do
url = "http://gun-not-up.com"
assert capture_log(fn ->
- :ok = Connections.open_conn(url, name)
+ refute Conn.open(url, name)
refute Connections.checkin(url, name)
end) =~
- "Received error on opening connection http://gun-not-up.com: {:error, :timeout}"
+ "Received error on opening connection http://gun-not-up.com {:error, :timeout}"
end
test "process gun_down message and then gun_up", %{name: name} do
self = self()
url = "http://gun-down-and-up.com"
key = "http:gun-down-and-up.com:80"
- :ok = Connections.open_conn(url, name)
+ :ok = Conn.open(url, name)
conn = Connections.checkin(url, name)
assert is_pid(conn)
@@ -347,7 +347,7 @@ defmodule Pleroma.Pool.ConnectionsTest do
test "async processes get same conn for same domain", %{name: name} do
url = "http://some-domain.com"
- :ok = Connections.open_conn(url, name)
+ :ok = Conn.open(url, name)
tasks =
for _ <- 1..5 do
@@ -381,8 +381,8 @@ defmodule Pleroma.Pool.ConnectionsTest do
self = self()
http_url = "http://some-domain.com"
https_url = "https://some-domain.com"
- :ok = Connections.open_conn(https_url, name)
- :ok = Connections.open_conn(http_url, name)
+ :ok = Conn.open(https_url, name)
+ :ok = Conn.open(http_url, name)
conn1 = Connections.checkin(https_url, name)
@@ -413,7 +413,7 @@ defmodule Pleroma.Pool.ConnectionsTest do
:ok = Connections.checkout(conn1, self, name)
another_url = "http://another-domain.com"
- :ok = Connections.open_conn(another_url, name)
+ :ok = Conn.open(another_url, name)
conn = Connections.checkin(another_url, name)
%Connections{
@@ -437,9 +437,19 @@ defmodule Pleroma.Pool.ConnectionsTest do
Pleroma.Config.put(API, Pleroma.Gun)
end
+ test "opens connection and change owner", %{name: name} do
+ url = "https://httpbin.org"
+ :ok = Conn.open(url, name)
+ conn = Connections.checkin(url, name)
+
+ pid = Process.whereis(name)
+
+ assert :gun.info(conn).owner == pid
+ end
+
test "opens connection and reuse it on next request", %{name: name} do
url = "http://httpbin.org"
- :ok = Connections.open_conn(url, name)
+ :ok = Conn.open(url, name)
Process.sleep(250)
conn = Connections.checkin(url, name)
@@ -462,7 +472,7 @@ defmodule Pleroma.Pool.ConnectionsTest do
test "opens ssl connection and reuse it on next request", %{name: name} do
url = "https://httpbin.org"
- :ok = Connections.open_conn(url, name)
+ :ok = Conn.open(url, name)
Process.sleep(1_000)
conn = Connections.checkin(url, name)
@@ -488,8 +498,8 @@ defmodule Pleroma.Pool.ConnectionsTest do
https1 = "https://www.google.com"
https2 = "https://httpbin.org"
- :ok = Connections.open_conn(https1, name)
- :ok = Connections.open_conn(https2, name)
+ :ok = Conn.open(https1, name)
+ :ok = Conn.open(https2, name)
Process.sleep(1_500)
conn = Connections.checkin(https1, name)
@@ -513,7 +523,7 @@ defmodule Pleroma.Pool.ConnectionsTest do
:ok = Connections.checkout(conn, self, name)
http = "http://httpbin.org"
Process.sleep(1_000)
- :ok = Connections.open_conn(http, name)
+ :ok = Conn.open(http, name)
conn = Connections.checkin(http, name)
%Connections{
@@ -535,8 +545,8 @@ defmodule Pleroma.Pool.ConnectionsTest do
https1 = "https://www.google.com"
https2 = "https://httpbin.org"
- :ok = Connections.open_conn(https1, name)
- :ok = Connections.open_conn(https2, name)
+ :ok = Conn.open(https1, name)
+ :ok = Conn.open(https2, name)
Process.sleep(1_500)
Connections.checkin(https1, name)
@@ -563,7 +573,7 @@ defmodule Pleroma.Pool.ConnectionsTest do
:ok = Connections.checkout(conn, self, name)
http = "http://httpbin.org"
- :ok = Connections.open_conn(http, name)
+ :ok = Conn.open(http, name)
Process.sleep(1_000)
conn = Connections.checkin(http, name)
@@ -587,8 +597,8 @@ defmodule Pleroma.Pool.ConnectionsTest do
https1 = "https://www.google.com"
https2 = "https://httpbin.org"
- :ok = Connections.open_conn(https1, name)
- :ok = Connections.open_conn(https2, name)
+ :ok = Conn.open(https1, name)
+ :ok = Conn.open(https2, name)
Process.sleep(1_000)
Connections.checkin(https1, name)
conn1 = Connections.checkin(https1, name)
@@ -639,8 +649,8 @@ defmodule Pleroma.Pool.ConnectionsTest do
https1 = "https://www.google.com"
https2 = "https://httpbin.org"
- :ok = Connections.open_conn(https1, name)
- :ok = Connections.open_conn(https2, name)
+ :ok = Conn.open(https1, name)
+ :ok = Conn.open(https2, name)
Process.sleep(1_500)
Connections.checkin(https1, name)
Connections.checkin(https2, name)
@@ -694,7 +704,7 @@ defmodule Pleroma.Pool.ConnectionsTest do
} = Connections.get_state(name)
http = "http://httpbin.org"
- :ok = Connections.open_conn(http, name)
+ :ok = Conn.open(http, name)
Process.sleep(1_000)
conn = Connections.checkin(http, name)
@@ -725,7 +735,7 @@ defmodule Pleroma.Pool.ConnectionsTest do
test "as ip", %{name: name} do
url = "http://proxy-string.com"
key = "http:proxy-string.com:80"
- :ok = Connections.open_conn(url, name, proxy: {{127, 0, 0, 1}, 8123})
+ :ok = Conn.open(url, name, proxy: {{127, 0, 0, 1}, 8123})
conn = Connections.checkin(url, name)
@@ -745,7 +755,7 @@ defmodule Pleroma.Pool.ConnectionsTest do
test "as host", %{name: name} do
url = "http://proxy-tuple-atom.com"
- :ok = Connections.open_conn(url, name, proxy: {'localhost', 9050})
+ :ok = Conn.open(url, name, proxy: {'localhost', 9050})
conn = Connections.checkin(url, name)
%Connections{
@@ -765,7 +775,7 @@ defmodule Pleroma.Pool.ConnectionsTest do
test "as ip and ssl", %{name: name} do
url = "https://proxy-string.com"
- :ok = Connections.open_conn(url, name, proxy: {{127, 0, 0, 1}, 8123})
+ :ok = Conn.open(url, name, proxy: {{127, 0, 0, 1}, 8123})
conn = Connections.checkin(url, name)
%Connections{
@@ -784,7 +794,7 @@ defmodule Pleroma.Pool.ConnectionsTest do
test "as host and ssl", %{name: name} do
url = "https://proxy-tuple-atom.com"
- :ok = Connections.open_conn(url, name, proxy: {'localhost', 9050})
+ :ok = Conn.open(url, name, proxy: {'localhost', 9050})
conn = Connections.checkin(url, name)
%Connections{
@@ -804,7 +814,7 @@ defmodule Pleroma.Pool.ConnectionsTest do
test "with socks type", %{name: name} do
url = "http://proxy-socks.com"
- :ok = Connections.open_conn(url, name, proxy: {:socks5, 'localhost', 1234})
+ :ok = Conn.open(url, name, proxy: {:socks5, 'localhost', 1234})
conn = Connections.checkin(url, name)
@@ -825,7 +835,7 @@ defmodule Pleroma.Pool.ConnectionsTest do
test "with socks4 type and ssl", %{name: name} do
url = "https://proxy-socks.com"
- :ok = Connections.open_conn(url, name, proxy: {:socks4, 'localhost', 1234})
+ :ok = Conn.open(url, name, proxy: {:socks4, 'localhost', 1234})
conn = Connections.checkin(url, name)
@@ -892,71 +902,75 @@ defmodule Pleroma.Pool.ConnectionsTest do
end
describe "get_unused_conns/1" do
- test "crf is equalent, sorting by reference" do
- conns = %{
- "1" => %Conn{
- conn_state: :idle,
- last_reference: now() - 1
- },
- "2" => %Conn{
- conn_state: :idle,
- last_reference: now()
- }
- }
-
- assert [{"1", _unused_conn} | _others] = Connections.get_unused_conns(conns)
+ test "crf is equalent, sorting by reference", %{name: name} do
+ Connections.add_conn(name, "1", %Conn{
+ conn_state: :idle,
+ last_reference: now() - 1
+ })
+
+ Connections.add_conn(name, "2", %Conn{
+ conn_state: :idle,
+ last_reference: now()
+ })
+
+ assert [{"1", _unused_conn} | _others] = Connections.get_unused_conns(name)
end
- test "reference is equalent, sorting by crf" do
- conns = %{
- "1" => %Conn{
- conn_state: :idle,
- crf: 1.999
- },
- "2" => %Conn{
- conn_state: :idle,
- crf: 2
- }
- }
+ test "reference is equalent, sorting by crf", %{name: name} do
+ Connections.add_conn(name, "1", %Conn{
+ conn_state: :idle,
+ crf: 1.999
+ })
+
+ Connections.add_conn(name, "2", %Conn{
+ conn_state: :idle,
+ crf: 2
+ })
- assert [{"1", _unused_conn} | _others] = Connections.get_unused_conns(conns)
+ assert [{"1", _unused_conn} | _others] = Connections.get_unused_conns(name)
end
- test "higher crf and lower reference" do
- conns = %{
- "1" => %Conn{
- conn_state: :idle,
- crf: 3,
- last_reference: now() - 1
- },
- "2" => %Conn{
- conn_state: :idle,
- crf: 2,
- last_reference: now()
- }
- }
+ test "higher crf and lower reference", %{name: name} do
+ Connections.add_conn(name, "1", %Conn{
+ conn_state: :idle,
+ crf: 3,
+ last_reference: now() - 1
+ })
+
+ Connections.add_conn(name, "2", %Conn{
+ conn_state: :idle,
+ crf: 2,
+ last_reference: now()
+ })
- assert [{"2", _unused_conn} | _others] = Connections.get_unused_conns(conns)
+ assert [{"2", _unused_conn} | _others] = Connections.get_unused_conns(name)
end
- test "lower crf and lower reference" do
- conns = %{
- "1" => %Conn{
- conn_state: :idle,
- crf: 1.99,
- last_reference: now() - 1
- },
- "2" => %Conn{
- conn_state: :idle,
- crf: 2,
- last_reference: now()
- }
- }
+ test "lower crf and lower reference", %{name: name} do
+ Connections.add_conn(name, "1", %Conn{
+ conn_state: :idle,
+ crf: 1.99,
+ last_reference: now() - 1
+ })
- assert [{"1", _unused_conn} | _others] = Connections.get_unused_conns(conns)
+ Connections.add_conn(name, "2", %Conn{
+ conn_state: :idle,
+ crf: 2,
+ last_reference: now()
+ })
+
+ assert [{"1", _unused_conn} | _others] = Connections.get_unused_conns(name)
end
end
+ test "count/1", %{name: name} do
+ assert Connections.count(name) == 0
+ Connections.add_conn(name, "1", %Conn{conn: self()})
+ assert Connections.count(name) == 1
+ Connections.remove_conn(name, "1")
+ assert Connections.count(name) == 0
+ end
+
defp now do
:os.system_time(:second)
end