diff options
author | Alex S <alex.strizhakov@gmail.com> | 2019-08-13 14:37:19 +0300 |
---|---|---|
committer | Ariadne Conill <ariadne@dereferenced.org> | 2019-08-18 22:34:13 +0000 |
commit | 246906165776874ca7e4064b197ddf1237af2d1c (patch) | |
tree | 4d9de16d6e434770af3b2283e714e70704b029d7 /lib | |
parent | 26691b1b35da6d192263fbd0938eefe2657cf25d (diff) | |
download | pleroma-246906165776874ca7e4064b197ddf1237af2d1c.tar.gz |
added gun connections genserver
Diffstat (limited to 'lib')
-rw-r--r-- | lib/pleroma/gun/api/api.ex | 15 | ||||
-rw-r--r-- | lib/pleroma/gun/api/mock.ex | 40 | ||||
-rw-r--r-- | lib/pleroma/gun/conn.ex | 17 | ||||
-rw-r--r-- | lib/pleroma/gun/connections.ex | 104 | ||||
-rw-r--r-- | lib/pleroma/http/connection.ex | 1 | ||||
-rw-r--r-- | lib/pleroma/reverse_proxy/client.ex | 2 | ||||
-rw-r--r-- | lib/pleroma/reverse_proxy/client/tesla.ex | 10 |
7 files changed, 184 insertions, 5 deletions
diff --git a/lib/pleroma/gun/api/api.ex b/lib/pleroma/gun/api/api.ex new file mode 100644 index 000000000..c69ab890e --- /dev/null +++ b/lib/pleroma/gun/api/api.ex @@ -0,0 +1,15 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Gun.API do + @callback open(charlist(), pos_integer(), map()) :: {:ok, pid()} + + def open(host, port, opts) do + api().open(host, port, opts) + end + + defp api do + Pleroma.Config.get([Pleroma.Gun.API], :gun) + end +end diff --git a/lib/pleroma/gun/api/mock.ex b/lib/pleroma/gun/api/mock.ex new file mode 100644 index 000000000..3348715c4 --- /dev/null +++ b/lib/pleroma/gun/api/mock.ex @@ -0,0 +1,40 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Gun.API.Mock do + @behaviour Pleroma.Gun.API + @impl Pleroma.Gun.API + def open('some-domain.com', 80, %{genserver_pid: genserver_pid}) do + {:ok, conn_pid} = Task.start_link(fn -> Process.sleep(1_000) end) + send(genserver_pid, {:gun_up, conn_pid, :http}) + {:ok, conn_pid} + end + + def open('some-domain.com', 443, %{genserver_pid: genserver_pid}) do + {:ok, conn_pid} = Task.start_link(fn -> Process.sleep(1_000) end) + send(genserver_pid, {:gun_up, conn_pid, :https}) + {:ok, conn_pid} + end + + @impl Pleroma.Gun.API + def open('gun_down.com', _port, %{genserver_pid: genserver_pid}) do + {:ok, conn_pid} = Task.start_link(fn -> Process.sleep(1_000) end) + send(genserver_pid, {:gun_down, conn_pid, :http, nil, nil, nil}) + {:ok, conn_pid} + end + + @impl Pleroma.Gun.API + def open('gun_down_and_up.com', _port, %{genserver_pid: genserver_pid}) do + {:ok, conn_pid} = Task.start_link(fn -> Process.sleep(1_000) end) + send(genserver_pid, {:gun_down, conn_pid, :http, nil, nil, nil}) + + {:ok, _} = + Task.start_link(fn -> + Process.sleep(500) + send(genserver_pid, {:gun_up, conn_pid, :http}) + end) + + {:ok, conn_pid} + end +end diff --git a/lib/pleroma/gun/conn.ex b/lib/pleroma/gun/conn.ex new file mode 100644 index 000000000..62ef146a1 --- /dev/null +++ b/lib/pleroma/gun/conn.ex @@ -0,0 +1,17 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Gun.Conn do + @moduledoc """ + Struct for gun connection data + """ + @type t :: %__MODULE__{ + conn: pid(), + state: atom(), + waiting_pids: [pid()], + protocol: atom() + } + + defstruct conn: nil, state: :open, waiting_pids: [], protocol: :http +end diff --git a/lib/pleroma/gun/connections.ex b/lib/pleroma/gun/connections.ex new file mode 100644 index 000000000..60ec68d89 --- /dev/null +++ b/lib/pleroma/gun/connections.ex @@ -0,0 +1,104 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Gun.Connections do + use GenServer + + @type domain :: String.t() + @type conn :: Gun.Conn.t() + @type t :: %__MODULE__{ + conns: %{domain() => conn()} + } + + defstruct conns: %{} + + def start_link(name \\ __MODULE__) do + if Application.get_env(:tesla, :adapter) == Tesla.Adapter.Gun do + GenServer.start_link(__MODULE__, [], name: name) + else + :ignore + end + end + + @impl true + def init(_) do + {:ok, %__MODULE__{conns: %{}}} + end + + @spec get_conn(atom(), String.t(), keyword()) :: pid() + def get_conn(name \\ __MODULE__, url, opts \\ []) do + opts = Enum.into(opts, %{}) + uri = URI.parse(url) + + opts = if uri.scheme == "https", do: Map.put(opts, :transport, :tls), else: opts + + GenServer.call( + name, + {:conn, %{opts: opts, uri: uri}} + ) + end + + @spec get_state(atom()) :: t() + def get_state(name \\ __MODULE__) do + GenServer.call(name, {:state}) + end + + @impl true + def handle_call({:conn, %{opts: opts, uri: uri}}, from, state) do + key = compose_key(uri) + + case state.conns[key] do + %{conn: conn, state: conn_state} when conn_state == :up -> + {:reply, conn, state} + + %{state: conn_state, waiting_pids: pids} when conn_state in [:open, :down] -> + state = put_in(state.conns[key].waiting_pids, [from | pids]) + {:noreply, state} + + nil -> + {:ok, conn} = Pleroma.Gun.API.open(to_charlist(uri.host), uri.port, opts) + + state = + put_in(state.conns[key], %Pleroma.Gun.Conn{ + conn: conn, + waiting_pids: [from], + protocol: String.to_atom(uri.scheme) + }) + + {:noreply, state} + end + end + + @impl true + def handle_call({:state}, _from, state), do: {:reply, state, state} + + @impl true + def handle_info({:gun_up, conn_pid, protocol}, state) do + {key, conn} = find_conn(state.conns, conn_pid, protocol) + + # Send to all waiting processes connection pid + Enum.each(conn.waiting_pids, fn waiting_pid -> GenServer.reply(waiting_pid, conn_pid) end) + + # Update state of the current connection and set waiting_pids to empty list + state = put_in(state.conns[key], %{conn | state: :up, waiting_pids: []}) + {:noreply, state} + end + + @impl true + # Do we need to do something with killed & unprocessed references? + def handle_info({:gun_down, conn_pid, protocol, _reason, _killed, _unprocessed}, state) do + {key, conn} = find_conn(state.conns, conn_pid, protocol) + + # We don't want to block requests to GenServer if gun send down message, return nil, so we can make some retries, while connection is not up + Enum.each(conn.waiting_pids, fn waiting_pid -> GenServer.reply(waiting_pid, nil) end) + + state = put_in(state.conns[key].state, :down) + {:noreply, state} + end + + defp compose_key(uri), do: uri.host <> ":" <> to_string(uri.port) + + defp find_conn(conns, conn_pid, protocol), + do: Enum.find(conns, fn {_, conn} -> conn.conn == conn_pid and conn.protocol == protocol end) +end diff --git a/lib/pleroma/http/connection.ex b/lib/pleroma/http/connection.ex index ff03a3ee3..6cb26c0fe 100644 --- a/lib/pleroma/http/connection.ex +++ b/lib/pleroma/http/connection.ex @@ -9,7 +9,6 @@ defmodule Pleroma.HTTP.Connection do @options [ connect_timeout: 10_000, - protocols: [:http], timeout: 20_000 ] diff --git a/lib/pleroma/reverse_proxy/client.ex b/lib/pleroma/reverse_proxy/client.ex index 42f2ff13b..71c2b2911 100644 --- a/lib/pleroma/reverse_proxy/client.ex +++ b/lib/pleroma/reverse_proxy/client.ex @@ -28,6 +28,6 @@ defmodule Pleroma.ReverseProxy.Client do def close(ref), do: client().close(ref) defp client do - Pleroma.Config.get([Pleroma.ReverseProxy.Client], Pleroma.ReverseProxy.Client.Hackney) + Pleroma.Config.get([Pleroma.ReverseProxy.Client], Pleroma.ReverseProxy.Client.Tesla) end end diff --git a/lib/pleroma/reverse_proxy/client/tesla.ex b/lib/pleroma/reverse_proxy/client/tesla.ex index b1498a5a4..fad577ec1 100644 --- a/lib/pleroma/reverse_proxy/client/tesla.ex +++ b/lib/pleroma/reverse_proxy/client/tesla.ex @@ -1,3 +1,7 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-onl + defmodule Pleroma.ReverseProxy.Client.Tesla do @behaviour Pleroma.ReverseProxy.Client @@ -6,7 +10,7 @@ defmodule Pleroma.ReverseProxy.Client.Tesla do def request(method, url, headers, body, opts \\ []) do adapter_opts = Keyword.get(opts, :adapter, []) - |> Keyword.put(:chunks_response, true) + |> Keyword.put(:body_as, :chunks) with {:ok, response} <- Pleroma.HTTP.request( @@ -44,13 +48,13 @@ defmodule Pleroma.ReverseProxy.Client.Tesla do adapter.read_chunk(pid, stream, opts) end - def close(client) do + def close(pid) do adapter = Application.get_env(:tesla, :adapter) unless adapter in @adapters do raise "#{adapter} doesn't support closing connection" end - adapter.close(client) + adapter.close(pid) end end |