aboutsummaryrefslogtreecommitdiff
path: root/lib/pleroma/web
diff options
context:
space:
mode:
authorRoger Braun <roger@rogerbraun.net>2017-11-11 14:59:25 +0100
committerRoger Braun <roger@rogerbraun.net>2017-11-11 14:59:25 +0100
commitbd5bdc4c247e2ebb239215540a51b69c356da65c (patch)
treecf9376c31e2a26286dc38da78fab4d9f82089313 /lib/pleroma/web
parenta1923d20e850c6b4f187928dd739314df84047b6 (diff)
downloadpleroma-bd5bdc4c247e2ebb239215540a51b69c356da65c.tar.gz
MastoAPI: Basic streaming.
Diffstat (limited to 'lib/pleroma/web')
-rw-r--r--lib/pleroma/web/activity_pub/activity_pub.ex3
-rw-r--r--lib/pleroma/web/endpoint.ex1
-rw-r--r--lib/pleroma/web/mastodon_api/mastodon_socket.ex27
-rw-r--r--lib/pleroma/web/streamer.ex45
4 files changed, 76 insertions, 0 deletions
diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex
index 1624c6545..35536a1e4 100644
--- a/lib/pleroma/web/activity_pub/activity_pub.ex
+++ b/lib/pleroma/web/activity_pub/activity_pub.ex
@@ -22,6 +22,9 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
with create_data <- make_create_data(%{to: to, actor: actor, published: published, context: context, object: object}, additional),
{:ok, activity} <- insert(create_data, local),
:ok <- maybe_federate(activity) do
+ if activity.data["type"] == "Create" and Enum.member?(activity.data["to"], "https://www.w3.org/ns/activitystreams#Public") do
+ Pleroma.Web.Streamer.stream("public", activity)
+ end
{:ok, activity}
end
end
diff --git a/lib/pleroma/web/endpoint.ex b/lib/pleroma/web/endpoint.ex
index a1b4108cd..dc1ba2a05 100644
--- a/lib/pleroma/web/endpoint.ex
+++ b/lib/pleroma/web/endpoint.ex
@@ -2,6 +2,7 @@ defmodule Pleroma.Web.Endpoint do
use Phoenix.Endpoint, otp_app: :pleroma
socket "/socket", Pleroma.Web.UserSocket
+ socket "/api/v1", Pleroma.Web.MastodonAPI.MastodonSocket
# Serve at "/" the static files from "priv/static" directory.
#
diff --git a/lib/pleroma/web/mastodon_api/mastodon_socket.ex b/lib/pleroma/web/mastodon_api/mastodon_socket.ex
new file mode 100644
index 000000000..c27d025c4
--- /dev/null
+++ b/lib/pleroma/web/mastodon_api/mastodon_socket.ex
@@ -0,0 +1,27 @@
+defmodule Pleroma.Web.MastodonAPI.MastodonSocket do
+ use Phoenix.Socket
+
+ transport :streaming, Phoenix.Transports.WebSocket.Raw
+
+ def connect(params, socket) do
+ IO.inspect(params)
+ Pleroma.Web.Streamer.add_socket(params["stream"], socket)
+ {:ok, socket}
+ end
+
+ def id(socket), do: nil
+
+ def handle(:text, message, state) do
+ IO.inspect message
+ #| :ok
+ #| state
+ #| {:text, message}
+ #| {:text, message, state}
+ #| {:close, "Goodbye!"}
+ {:text, message}
+ end
+
+ def handle(:closed, reason, _state) do
+ IO.inspect reason
+ end
+end
diff --git a/lib/pleroma/web/streamer.ex b/lib/pleroma/web/streamer.ex
new file mode 100644
index 000000000..cc3805894
--- /dev/null
+++ b/lib/pleroma/web/streamer.ex
@@ -0,0 +1,45 @@
+defmodule Pleroma.Web.Streamer do
+ use GenServer
+ require Logger
+ import Plug.Conn
+
+ def start_link do
+ GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
+ end
+
+ def add_socket(topic, socket) do
+ GenServer.cast(__MODULE__, %{action: :add, socket: socket, topic: topic})
+ end
+
+ def stream(topic, item) do
+ GenServer.cast(__MODULE__, %{action: :stream, topic: topic, item: item})
+ end
+
+ def handle_cast(%{action: :stream, topic: topic, item: item}, topics) do
+ Logger.debug("Trying to push to #{topic}")
+ Logger.debug("Pushing item to #{topic}")
+ Enum.each(topics[topic] || [], fn (socket) ->
+ json = %{
+ event: "update",
+ payload: Pleroma.Web.MastodonAPI.StatusView.render("status.json", activity: item) |> Poison.encode!
+ } |> Poison.encode!
+
+ send socket.transport_pid, {:text, json}
+ end)
+ {:noreply, topics}
+ end
+
+ def handle_cast(%{action: :add, topic: topic, socket: socket}, sockets) do
+ sockets_for_topic = sockets[topic] || []
+ sockets_for_topic = Enum.uniq([socket | sockets_for_topic])
+ sockets = Map.put(sockets, topic, sockets_for_topic)
+ Logger.debug("Got new conn for #{topic}")
+ IO.inspect(sockets)
+ {:noreply, sockets}
+ end
+
+ def handle_cast(m, state) do
+ IO.inspect("Unknown: #{inspect(m)}, #{inspect(state)}")
+ {:noreply, state}
+ end
+end