aboutsummaryrefslogtreecommitdiff
path: root/lib/pleroma/web/streamer/state.ex
diff options
context:
space:
mode:
authorkaniini <ariadne@dereferenced.org>2019-10-04 17:39:28 +0000
committerkaniini <ariadne@dereferenced.org>2019-10-04 17:39:28 +0000
commitca6f1644aa72e9f3b56e57b8b6dfc0830d44648a (patch)
tree3ee414167996edf5bc39851482c7972673020147 /lib/pleroma/web/streamer/state.ex
parent222c238e7b807853dc02a79e00273b5f6b70eb4b (diff)
parentd9e0108baacbe26a2f594d6f2badec58010ff1f9 (diff)
downloadpleroma-ca6f1644aa72e9f3b56e57b8b6dfc0830d44648a.tar.gz
Merge branch 'bugfix/widen-streamer-blocks-for-1.1' into 'maint/1.1'
widen streaming API blocks (for 1.1) See merge request pleroma/pleroma!1785
Diffstat (limited to 'lib/pleroma/web/streamer/state.ex')
-rw-r--r--lib/pleroma/web/streamer/state.ex82
1 files changed, 82 insertions, 0 deletions
diff --git a/lib/pleroma/web/streamer/state.ex b/lib/pleroma/web/streamer/state.ex
new file mode 100644
index 000000000..5ce3ebb8a
--- /dev/null
+++ b/lib/pleroma/web/streamer/state.ex
@@ -0,0 +1,82 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Web.Streamer.State do
+ use GenServer
+ require Logger
+
+ alias Pleroma.Web.Streamer.StreamerSocket
+
+ @env Mix.env()
+
+ def start_link(_) do
+ GenServer.start_link(__MODULE__, %{sockets: %{}}, name: __MODULE__)
+ end
+
+ def add_socket(topic, socket) do
+ GenServer.call(__MODULE__, {:add, topic, socket})
+ end
+
+ def remove_socket(topic, socket) do
+ do_remove_socket(@env, topic, socket)
+ end
+
+ def get_sockets do
+ %{sockets: stream_sockets} = GenServer.call(__MODULE__, :get_state)
+ stream_sockets
+ end
+
+ def init(init_arg) do
+ {:ok, init_arg}
+ end
+
+ def handle_call(:get_state, _from, state) do
+ {:reply, state, state}
+ end
+
+ def handle_call({:add, topic, socket}, _from, %{sockets: sockets} = state) do
+ internal_topic = internal_topic(topic, socket)
+ stream_socket = StreamerSocket.from_socket(socket)
+
+ sockets_for_topic =
+ sockets
+ |> Map.get(internal_topic, [])
+ |> List.insert_at(0, stream_socket)
+ |> Enum.uniq()
+
+ state = put_in(state, [:sockets, internal_topic], sockets_for_topic)
+ Logger.debug("Got new conn for #{topic}")
+ {:reply, state, state}
+ end
+
+ def handle_call({:remove, topic, socket}, _from, %{sockets: sockets} = state) do
+ internal_topic = internal_topic(topic, socket)
+ stream_socket = StreamerSocket.from_socket(socket)
+
+ sockets_for_topic =
+ sockets
+ |> Map.get(internal_topic, [])
+ |> List.delete(stream_socket)
+
+ state = Kernel.put_in(state, [:sockets, internal_topic], sockets_for_topic)
+ {:reply, state, state}
+ end
+
+ defp do_remove_socket(:test, _, _) do
+ :ok
+ end
+
+ defp do_remove_socket(_env, topic, socket) do
+ GenServer.call(__MODULE__, {:remove, topic, socket})
+ end
+
+ defp internal_topic(topic, socket)
+ when topic in ~w[user user:notification direct] do
+ "#{topic}:#{socket.assigns[:user].id}"
+ end
+
+ defp internal_topic(topic, _) do
+ topic
+ end
+end