aboutsummaryrefslogtreecommitdiff
path: root/lib/pleroma
diff options
context:
space:
mode:
authorlain <lain@soykaf.club>2020-06-05 16:47:02 +0200
committerlain <lain@soykaf.club>2020-06-05 16:47:02 +0200
commit115d08a7542b92c5e1d889da41c0ee6837a1235e (patch)
tree9f1d3aa8a65f53d5a42b9ab64013c2d9a3296a19 /lib/pleroma
parent65689ba9bd44e291fc626cce2bd5136b93a5da90 (diff)
downloadpleroma-115d08a7542b92c5e1d889da41c0ee6837a1235e.tar.gz
Pipeline: Add a side effects step after the transaction finishes
This is to run things like streaming notifications out, which will sometimes need data that is created by the transaction, but is streamed out asynchronously.
Diffstat (limited to 'lib/pleroma')
-rw-r--r--lib/pleroma/notification.ex26
-rw-r--r--lib/pleroma/web/activity_pub/pipeline.ex4
-rw-r--r--lib/pleroma/web/activity_pub/side_effects.ex30
3 files changed, 52 insertions, 8 deletions
diff --git a/lib/pleroma/notification.ex b/lib/pleroma/notification.ex
index e5b880b10..49e27c05a 100644
--- a/lib/pleroma/notification.ex
+++ b/lib/pleroma/notification.ex
@@ -334,30 +334,34 @@ defmodule Pleroma.Notification do
end
end
- def create_notifications(%Activity{data: %{"to" => _, "type" => "Create"}} = activity) do
+ def create_notifications(activity, options \\ [])
+
+ def create_notifications(%Activity{data: %{"to" => _, "type" => "Create"}} = activity, options) do
object = Object.normalize(activity, false)
if object && object.data["type"] == "Answer" do
{:ok, []}
else
- do_create_notifications(activity)
+ do_create_notifications(activity, options)
end
end
- def create_notifications(%Activity{data: %{"type" => type}} = activity)
+ def create_notifications(%Activity{data: %{"type" => type}} = activity, options)
when type in ["Follow", "Like", "Announce", "Move", "EmojiReact"] do
- do_create_notifications(activity)
+ do_create_notifications(activity, options)
end
- def create_notifications(_), do: {:ok, []}
+ def create_notifications(_, _), do: {:ok, []}
+
+ defp do_create_notifications(%Activity{} = activity, options) do
+ do_send = Keyword.get(options, :do_send, true)
- defp do_create_notifications(%Activity{} = activity) do
{enabled_receivers, disabled_receivers} = get_notified_from_activity(activity)
potential_receivers = enabled_receivers ++ disabled_receivers
notifications =
Enum.map(potential_receivers, fn user ->
- do_send = user in enabled_receivers
+ do_send = do_send && user in enabled_receivers
create_notification(activity, user, do_send)
end)
@@ -623,4 +627,12 @@ defmodule Pleroma.Notification do
end
def skip?(_, _, _), do: false
+
+ def for_user_and_activity(user, activity) do
+ from(n in __MODULE__,
+ where: n.user_id == ^user.id,
+ where: n.activity_id == ^activity.id
+ )
+ |> Repo.one()
+ end
end
diff --git a/lib/pleroma/web/activity_pub/pipeline.ex b/lib/pleroma/web/activity_pub/pipeline.ex
index 0c54c4b23..6875c47f6 100644
--- a/lib/pleroma/web/activity_pub/pipeline.ex
+++ b/lib/pleroma/web/activity_pub/pipeline.ex
@@ -17,6 +17,10 @@ defmodule Pleroma.Web.ActivityPub.Pipeline do
{:ok, Activity.t() | Object.t(), keyword()} | {:error, any()}
def common_pipeline(object, meta) do
case Repo.transaction(fn -> do_common_pipeline(object, meta) end) do
+ {:ok, {:ok, activity, meta}} ->
+ SideEffects.handle_after_transaction(meta)
+ {:ok, activity, meta}
+
{:ok, value} ->
value
diff --git a/lib/pleroma/web/activity_pub/side_effects.ex b/lib/pleroma/web/activity_pub/side_effects.ex
index b3aacff40..10136789a 100644
--- a/lib/pleroma/web/activity_pub/side_effects.ex
+++ b/lib/pleroma/web/activity_pub/side_effects.ex
@@ -16,6 +16,7 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
alias Pleroma.Web.ActivityPub.Pipeline
alias Pleroma.Web.ActivityPub.Utils
alias Pleroma.Web.Streamer
+ alias Pleroma.Web.Push
def handle(object, meta \\ [])
@@ -37,7 +38,12 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
# - Set up notifications
def handle(%{data: %{"type" => "Create"}} = activity, meta) do
with {:ok, _object, _meta} <- handle_object_creation(meta[:object_data], meta) do
- Notification.create_notifications(activity)
+ {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
+
+ meta =
+ meta
+ |> add_notifications(notifications)
+
{:ok, activity, meta}
else
e -> Repo.rollback(e)
@@ -200,4 +206,26 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
end
def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
+
+ defp send_notifications(meta) do
+ Keyword.get(meta, :created_notifications, [])
+ |> Enum.each(fn notification ->
+ Streamer.stream(["user", "user:notification"], notification)
+ Push.send(notification)
+ end)
+
+ meta
+ end
+
+ defp add_notifications(meta, notifications) do
+ existing = Keyword.get(meta, :created_notifications, [])
+
+ meta
+ |> Keyword.put(:created_notifications, notifications ++ existing)
+ end
+
+ def handle_after_transaction(meta) do
+ meta
+ |> send_notifications()
+ end
end