aboutsummaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorRoger Braun <rbraun@Bobble.local>2017-11-16 16:49:51 +0100
committerRoger Braun <rbraun@Bobble.local>2017-11-16 16:49:51 +0100
commita743940463a7d0a7346f77792310dff6a98e7f31 (patch)
treebc0daebfcfe7231331d955dab6f7e102fef412ef /lib
parent5719f69ae338bce2419a6ea572f34a68fda5d23c (diff)
downloadpleroma-a743940463a7d0a7346f77792310dff6a98e7f31.tar.gz
MastoAPI: Implement all streaming functions.
Diffstat (limited to 'lib')
-rw-r--r--lib/pleroma/notification.ex3
-rw-r--r--lib/pleroma/user.ex11
-rw-r--r--lib/pleroma/web/activity_pub/activity_pub.ex1
-rw-r--r--lib/pleroma/web/mastodon_api/mastodon_api_controller.ex2
-rw-r--r--lib/pleroma/web/mastodon_api/mastodon_socket.ex2
-rw-r--r--lib/pleroma/web/streamer.ex42
6 files changed, 55 insertions, 6 deletions
diff --git a/lib/pleroma/notification.ex b/lib/pleroma/notification.ex
index 039cc7312..65e3265d4 100644
--- a/lib/pleroma/notification.ex
+++ b/lib/pleroma/notification.ex
@@ -78,8 +78,9 @@ defmodule Pleroma.Notification do
# TODO move to sql, too.
def create_notification(%Activity{} = activity, %User{} = user) do
unless User.blocks?(user, %{ap_id: activity.data["actor"]}) do
- notification = %Notification{user_id: user.id, activity_id: activity.id}
+ notification = %Notification{user_id: user.id, activity: activity}
{:ok, notification} = Repo.insert(notification)
+ Pleroma.Web.Streamer.stream("user", notification)
notification
end
end
diff --git a/lib/pleroma/user.ex b/lib/pleroma/user.ex
index 771c54e81..56502e897 100644
--- a/lib/pleroma/user.ex
+++ b/lib/pleroma/user.ex
@@ -284,6 +284,17 @@ defmodule Pleroma.User do
Repo.all(query)
end
+ def get_recipients_from_activity(%Activity{data: %{"to" => to}} = activity) do
+ query = from u in User,
+ where: u.local == true
+
+ query = from u in query,
+ where: u.ap_id in ^to,
+ or_where: fragment("? \\\?| ?", u.following, ^to)
+
+ Repo.all(query)
+ end
+
def search(query, resolve) do
if resolve do
User.get_or_fetch_by_nickname(query)
diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex
index 5cbf14868..b4e59050b 100644
--- a/lib/pleroma/web/activity_pub/activity_pub.ex
+++ b/lib/pleroma/web/activity_pub/activity_pub.ex
@@ -24,6 +24,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
:ok <- maybe_federate(activity) do
if activity.data["type"] == "Create" and Enum.member?(activity.data["to"], "https://www.w3.org/ns/activitystreams#Public") do
Pleroma.Web.Streamer.stream("public", activity)
+ Pleroma.Web.Streamer.stream("user", activity)
if local do
Pleroma.Web.Streamer.stream("public:local", activity)
end
diff --git a/lib/pleroma/web/mastodon_api/mastodon_api_controller.ex b/lib/pleroma/web/mastodon_api/mastodon_api_controller.ex
index 8b5714555..bbd003b06 100644
--- a/lib/pleroma/web/mastodon_api/mastodon_api_controller.ex
+++ b/lib/pleroma/web/mastodon_api/mastodon_api_controller.ex
@@ -595,7 +595,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
json(conn, [])
end
- defp render_notification(user, %{id: id, activity: activity, inserted_at: created_at} = _params) do
+ def render_notification(user, %{id: id, activity: activity, inserted_at: created_at} = _params) do
actor = User.get_cached_by_ap_id(activity.data["actor"])
created_at = NaiveDateTime.to_iso8601(created_at)
|> String.replace(~r/(\.\d+)?$/, ".000Z", global: false)
diff --git a/lib/pleroma/web/mastodon_api/mastodon_socket.ex b/lib/pleroma/web/mastodon_api/mastodon_socket.ex
index af76c8701..1d276e64a 100644
--- a/lib/pleroma/web/mastodon_api/mastodon_socket.ex
+++ b/lib/pleroma/web/mastodon_api/mastodon_socket.ex
@@ -11,7 +11,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonSocket do
with token when not is_nil(token) <- params["access_token"],
%Token{user_id: user_id} <- Repo.get_by(Token, token: token),
%User{} = user <- Repo.get(User, user_id),
- stream when stream in ["public", "public:local"] <- params["stream"] do
+ stream when stream in ["public", "public:local", "user"] <- params["stream"] do
socket = socket
|> assign(:topic, params["stream"])
|> assign(:user, user)
diff --git a/lib/pleroma/web/streamer.ex b/lib/pleroma/web/streamer.ex
index 3b2938676..9f1080015 100644
--- a/lib/pleroma/web/streamer.ex
+++ b/lib/pleroma/web/streamer.ex
@@ -2,6 +2,7 @@ defmodule Pleroma.Web.Streamer do
use GenServer
require Logger
import Plug.Conn
+ alias Pleroma.{User, Notification}
def start_link do
spawn(fn ->
@@ -37,9 +38,7 @@ defmodule Pleroma.Web.Streamer do
{:noreply, topics}
end
- def handle_cast(%{action: :stream, topic: topic, item: item}, topics) do
- Logger.debug("Trying to push to #{topic}")
- Logger.debug("Pushing item to #{topic}")
+ def push_to_socket(topics, topic, item) do
Enum.each(topics[topic] || [], fn (socket) ->
json = %{
event: "update",
@@ -48,10 +47,46 @@ defmodule Pleroma.Web.Streamer do
send socket.transport_pid, {:text, json}
end)
+ end
+
+ def handle_cast(%{action: :stream, topic: "user", item: %Notification{} = item}, topics) do
+ topic = "user:#{item.user_id}"
+ Enum.each(topics[topic] || [], fn (socket) ->
+ json = %{
+ event: "notification",
+ payload: Pleroma.Web.MastodonAPI.MastodonAPIController.render_notification(socket.assigns["user"], item) |> Poison.encode!
+ } |> Poison.encode!
+
+ send socket.transport_pid, {:text, json}
+ end)
+ {:noreply, topics}
+ end
+
+ def handle_cast(%{action: :stream, topic: "user", item: item}, topics) do
+ Logger.debug("Trying to push to users")
+ recipient_topics = User.get_recipients_from_activity(item)
+ |> Enum.map(fn (%{id: id}) -> "user:#{id}" end)
+
+ Enum.each(recipient_topics, fn (topic) ->
+ push_to_socket(topics, topic, item)
+ end)
+ {:noreply, topics}
+ end
+
+ def handle_cast(%{action: :stream, topic: topic, item: item}, topics) do
+ Logger.debug("Trying to push to #{topic}")
+ Logger.debug("Pushing item to #{topic}")
+ push_to_socket(topics, topic, item)
{:noreply, topics}
end
+ defp internal_topic("user", socket) do
+ "user:#{socket.assigns[:user].id}"
+ end
+ defp internal_topic(topic, socket), do: topic
+
def handle_cast(%{action: :add, topic: topic, socket: socket}, sockets) do
+ topic = internal_topic(topic, socket)
sockets_for_topic = sockets[topic] || []
sockets_for_topic = Enum.uniq([socket | sockets_for_topic])
sockets = Map.put(sockets, topic, sockets_for_topic)
@@ -61,6 +96,7 @@ defmodule Pleroma.Web.Streamer do
end
def handle_cast(%{action: :remove, topic: topic, socket: socket}, sockets) do
+ topic = internal_topic(topic, socket)
sockets_for_topic = sockets[topic] || []
sockets_for_topic = List.delete(sockets_for_topic, socket)
sockets = Map.put(sockets, topic, sockets_for_topic)