aboutsummaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/pleroma/gun/conn.ex11
-rw-r--r--lib/pleroma/gun/connections.ex132
-rw-r--r--lib/pleroma/http/connection.ex5
-rw-r--r--lib/pleroma/http/http.ex30
4 files changed, 129 insertions, 49 deletions
diff --git a/lib/pleroma/gun/conn.ex b/lib/pleroma/gun/conn.ex
index 9e5c2b184..906607b28 100644
--- a/lib/pleroma/gun/conn.ex
+++ b/lib/pleroma/gun/conn.ex
@@ -6,17 +6,24 @@ defmodule Pleroma.Gun.Conn do
@moduledoc """
Struct for gun connection data
"""
+ @type gun_state :: :open | :up | :down
+ @type conn_state :: :init | :active | :idle
+
@type t :: %__MODULE__{
conn: pid(),
- state: atom(),
+ gun_state: gun_state(),
waiting_pids: [pid()],
+ conn_state: conn_state(),
+ used_by: [pid()],
last_reference: pos_integer(),
crf: float()
}
defstruct conn: nil,
- state: :open,
+ gun_state: :open,
waiting_pids: [],
+ conn_state: :init,
+ used_by: [],
last_reference: :os.system_time(:second),
crf: 1
end
diff --git a/lib/pleroma/gun/connections.ex b/lib/pleroma/gun/connections.ex
index 73d54e94d..e3d392de7 100644
--- a/lib/pleroma/gun/connections.ex
+++ b/lib/pleroma/gun/connections.ex
@@ -14,7 +14,7 @@ defmodule Pleroma.Gun.Connections do
opts: keyword()
}
- defstruct conns: %{}, opts: []
+ defstruct conns: %{}, opts: [], queue: []
alias Pleroma.Gun.API
alias Pleroma.Gun.Conn
@@ -27,8 +27,8 @@ defmodule Pleroma.Gun.Connections do
@impl true
def init(opts), do: {:ok, %__MODULE__{conns: %{}, opts: opts}}
- @spec get_conn(String.t(), keyword(), atom()) :: pid()
- def get_conn(url, opts \\ [], name \\ :default) do
+ @spec checkin(String.t(), keyword(), atom()) :: pid()
+ def checkin(url, opts \\ [], name \\ :default) do
opts = Enum.into(opts, %{})
uri = URI.parse(url)
@@ -53,7 +53,7 @@ defmodule Pleroma.Gun.Connections do
GenServer.call(
name,
- {:conn, %{opts: opts, uri: uri}}
+ {:checkin, %{opts: opts, uri: uri}}
)
end
@@ -68,28 +68,57 @@ defmodule Pleroma.Gun.Connections do
GenServer.call(name, {:state})
end
+ def checkout(conn, pid, name \\ :default) do
+ GenServer.cast(name, {:checkout, conn, pid})
+ end
+
+ def process_queue(name \\ :default) do
+ GenServer.cast(name, {:process_queue})
+ end
+
@impl true
- def handle_call({:conn, %{opts: opts, uri: uri}}, from, state) do
+ def handle_cast({:checkout, conn_pid, pid}, state) do
+ {key, conn} = find_conn(state.conns, conn_pid)
+ used_by = List.keydelete(conn.used_by, pid, 0)
+ conn_state = if used_by == [], do: :idle, else: conn.conn_state
+ state = put_in(state.conns[key], %{conn | conn_state: conn_state, used_by: used_by})
+ {:noreply, state}
+ end
+
+ @impl true
+ def handle_cast({:process_queue}, state) do
+ case state.queue do
+ [{from, key, uri, opts} | _queue] ->
+ try_to_checkin(key, uri, from, state, Map.put(opts, :from_cast, true))
+
+ [] ->
+ {:noreply, state}
+ end
+ end
+
+ @impl true
+ def handle_call({:checkin, %{opts: opts, uri: uri}}, from, state) do
key = compose_key(uri)
case state.conns[key] do
- %{conn: conn, state: conn_state, last_reference: reference, crf: last_crf} = current_conn
- when conn_state == :up ->
+ %{conn: conn, gun_state: gun_state} = current_conn when gun_state == :up ->
time = current_time()
- last_reference = time - reference
+ last_reference = time - current_conn.last_reference
- current_crf = crf(last_reference, 100, last_crf)
+ current_crf = crf(last_reference, 100, current_conn.crf)
state =
put_in(state.conns[key], %{
current_conn
| last_reference: time,
- crf: current_crf
+ crf: current_crf,
+ conn_state: :active,
+ used_by: [from | current_conn.used_by]
})
{:reply, conn, state}
- %{state: conn_state, waiting_pids: pids} when conn_state in [:open, :down] ->
+ %{gun_state: gun_state, waiting_pids: pids} when gun_state in [:open, :down] ->
state = put_in(state.conns[key].waiting_pids, [from | pids])
{:noreply, state}
@@ -99,22 +128,7 @@ defmodule Pleroma.Gun.Connections do
if Enum.count(state.conns) < max_connections do
open_conn(key, uri, from, state, opts)
else
- [{close_key, least_used} | _conns] =
- state.conns
- |> Enum.filter(fn {_k, v} -> v.waiting_pids == [] end)
- |> Enum.sort(fn {_x_k, x}, {_y_k, y} ->
- x.crf < y.crf and x.last_reference < y.last_reference
- end)
-
- :ok = API.close(least_used.conn)
-
- state =
- put_in(
- state.conns,
- Map.delete(state.conns, close_key)
- )
-
- open_conn(key, uri, from, state, opts)
+ try_to_checkin(key, uri, from, state, opts)
end
end
end
@@ -122,14 +136,44 @@ defmodule Pleroma.Gun.Connections do
@impl true
def handle_call({:state}, _from, state), do: {:reply, state, state}
+ defp try_to_checkin(key, uri, from, state, opts) do
+ unused_conns =
+ state.conns
+ |> Enum.filter(fn {_k, v} ->
+ v.conn_state == :idle and v.waiting_pids == [] and v.used_by == []
+ end)
+ |> Enum.sort(fn {_x_k, x}, {_y_k, y} ->
+ x.crf < y.crf and x.last_reference < y.last_reference
+ end)
+
+ case unused_conns do
+ [{close_key, least_used} | _conns] ->
+ :ok = API.close(least_used.conn)
+
+ state =
+ put_in(
+ state.conns,
+ Map.delete(state.conns, close_key)
+ )
+
+ open_conn(key, uri, from, state, opts)
+
+ [] ->
+ queue =
+ if List.keymember?(state.queue, from, 0),
+ do: state.queue,
+ else: state.queue ++ [{from, key, uri, opts}]
+
+ state = put_in(state.queue, queue)
+ {:noreply, state}
+ end
+ end
+
@impl true
def handle_info({:gun_up, conn_pid, _protocol}, state) do
conn_key = compose_key_gun_info(conn_pid)
{key, conn} = find_conn(state.conns, conn_pid, conn_key)
- # Send to all waiting processes connection pid
- Enum.each(conn.waiting_pids, fn waiting_pid -> GenServer.reply(waiting_pid, conn_pid) end)
-
# Update state of the current connection and set waiting_pids to empty list
time = current_time()
last_reference = time - conn.last_reference
@@ -138,12 +182,17 @@ defmodule Pleroma.Gun.Connections do
state =
put_in(state.conns[key], %{
conn
- | state: :up,
+ | gun_state: :up,
waiting_pids: [],
last_reference: time,
- crf: current_crf
+ crf: current_crf,
+ conn_state: :active,
+ used_by: conn.waiting_pids ++ conn.used_by
})
+ # Send to all waiting processes connection pid
+ Enum.each(conn.waiting_pids, fn waiting_pid -> GenServer.reply(waiting_pid, conn_pid) end)
+
{:noreply, state}
end
@@ -154,7 +203,7 @@ defmodule Pleroma.Gun.Connections do
Enum.each(conn.waiting_pids, fn waiting_pid -> GenServer.reply(waiting_pid, nil) end)
- state = put_in(state.conns[key].state, :down)
+ state = put_in(state.conns[key].gun_state, :down)
{:noreply, state}
end
@@ -177,7 +226,7 @@ defmodule Pleroma.Gun.Connections do
end)
end
- defp open_conn(key, uri, _from, state, %{proxy: {proxy_host, proxy_port}} = opts) do
+ defp open_conn(key, uri, from, state, %{proxy: {proxy_host, proxy_port}} = opts) do
host = to_charlist(uri.host)
port = uri.port
@@ -202,9 +251,15 @@ defmodule Pleroma.Gun.Connections do
put_in(state.conns[key], %Conn{
conn: conn,
waiting_pids: [],
- state: :up
+ gun_state: :up,
+ conn_state: :active,
+ used_by: [from]
})
+ if opts[:from_cast] do
+ GenServer.reply(from, conn)
+ end
+
{:reply, conn, state}
else
error ->
@@ -219,6 +274,13 @@ defmodule Pleroma.Gun.Connections do
with {:ok, conn} <- API.open(host, port, opts) do
state =
+ if opts[:from_cast] do
+ put_in(state.queue, List.keydelete(state.queue, from, 0))
+ else
+ state
+ end
+
+ state =
put_in(state.conns[key], %Conn{
conn: conn,
waiting_pids: [from]
diff --git a/lib/pleroma/http/connection.ex b/lib/pleroma/http/connection.ex
index 39c0fff43..d4e6d0f99 100644
--- a/lib/pleroma/http/connection.ex
+++ b/lib/pleroma/http/connection.ex
@@ -10,8 +10,7 @@ defmodule Pleroma.HTTP.Connection do
@options [
connect_timeout: 10_000,
timeout: 20_000,
- pool: :federation,
- version: :master
+ pool: :federation
]
require Logger
@@ -61,7 +60,7 @@ defmodule Pleroma.HTTP.Connection do
end
defp get_conn_for_gun(url, options, pool) do
- case Pleroma.Gun.Connections.get_conn(url, options, pool) do
+ case Pleroma.Gun.Connections.checkin(url, options, pool) do
nil ->
options
diff --git a/lib/pleroma/http/http.ex b/lib/pleroma/http/http.ex
index 5c0d66955..0a7db737f 100644
--- a/lib/pleroma/http/http.ex
+++ b/lib/pleroma/http/http.ex
@@ -45,15 +45,27 @@ defmodule Pleroma.HTTP do
params = Keyword.get(options, :params, [])
- %{}
- |> Builder.method(method)
- |> Builder.url(url)
- |> Builder.headers(headers)
- |> Builder.opts(options)
- |> Builder.add_param(:body, :body, body)
- |> Builder.add_param(:query, :query, params)
- |> Enum.into([])
- |> (&Tesla.request(Connection.new(options), &1)).()
+ request =
+ %{}
+ |> Builder.method(method)
+ |> Builder.url(url)
+ |> Builder.headers(headers)
+ |> Builder.opts(options)
+ |> Builder.add_param(:body, :body, body)
+ |> Builder.add_param(:query, :query, params)
+ |> Enum.into([])
+
+ client = Connection.new(options)
+ response = Tesla.request(client, request)
+
+ if adapter_gun? do
+ %{adapter: {_, _, [adapter_options]}} = client
+ pool = adapter_options[:pool]
+ Pleroma.Gun.Connections.checkout(adapter_options[:conn], self(), pool)
+ Pleroma.Gun.Connections.process_queue(pool)
+ end
+
+ response
rescue
e ->
{:error, e}