aboutsummaryrefslogtreecommitdiff
path: root/lib/pleroma/web/activity_pub/side_effects.ex
diff options
context:
space:
mode:
authorAlex Gleason <alex@alexgleason.me>2022-01-03 13:40:19 -0600
committerAlex Gleason <alex@alexgleason.me>2022-01-03 13:40:19 -0600
commit4081be0001332bac402faec7565807df088b0117 (patch)
treea5305404e9bb31b3613dbc9631d36f8827be81c2 /lib/pleroma/web/activity_pub/side_effects.ex
parentd00f74e036735c1c238f661076f2925b39daa6ac (diff)
parenta3094b64df344622f1bcb03091ef2ff4dce6da82 (diff)
downloadpleroma-matrix.tar.gz
Merge remote-tracking branch 'origin/develop' into matrixmatrix
Diffstat (limited to 'lib/pleroma/web/activity_pub/side_effects.ex')
-rw-r--r--lib/pleroma/web/activity_pub/side_effects.ex159
1 files changed, 132 insertions, 27 deletions
diff --git a/lib/pleroma/web/activity_pub/side_effects.ex b/lib/pleroma/web/activity_pub/side_effects.ex
index f2a1dc6e8..774c4abb1 100644
--- a/lib/pleroma/web/activity_pub/side_effects.ex
+++ b/lib/pleroma/web/activity_pub/side_effects.ex
@@ -1,3 +1,7 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
defmodule Pleroma.Web.ActivityPub.SideEffects do
@moduledoc """
This module looks at an inserted object and executes the side effects that it
@@ -6,8 +10,6 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
collection, and so on.
"""
alias Pleroma.Activity
- alias Pleroma.Activity.Ir.Topics
- alias Pleroma.ActivityExpiration
alias Pleroma.Chat
alias Pleroma.Chat.MessageReference
alias Pleroma.FollowingRelationship
@@ -21,15 +23,24 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
alias Pleroma.Web.ActivityPub.Utils
alias Pleroma.Web.Push
alias Pleroma.Web.Streamer
- alias Pleroma.Workers.BackgroundWorker
+ alias Pleroma.Workers.PollWorker
require Logger
+ @cachex Pleroma.Config.get([:cachex, :provider], Cachex)
+ @logger Pleroma.Config.get([:side_effects, :logger], Logger)
+
+ @behaviour Pleroma.Web.ActivityPub.SideEffects.Handling
+
+ defp ap_streamer, do: Pleroma.Config.get([:side_effects, :ap_streamer], ActivityPub)
+
+ @impl true
def handle(object, meta \\ [])
# Task this handles
# - Follows
# - Sends a notification
+ @impl true
def handle(
%{
data: %{
@@ -45,10 +56,9 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
%User{} = followed <- User.get_cached_by_ap_id(actor),
%User{} = follower <- User.get_cached_by_ap_id(follower_id),
{:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
- {:ok, _relationship} <- FollowingRelationship.update(follower, followed, :follow_accept) do
+ {:ok, _follower, followed} <-
+ FollowingRelationship.update(follower, followed, :follow_accept) do
Notification.update_notification_type(followed, follow_activity)
- User.update_follower_count(followed)
- User.update_following_count(follower)
end
{:ok, object, meta}
@@ -58,6 +68,7 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
# - Rejects all existing follow activities for this person
# - Updates the follow state
# - Dismisses notification
+ @impl true
def handle(
%{
data: %{
@@ -84,6 +95,7 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
# - Follows if possible
# - Sends a notification
# - Generates accept or reject if appropriate
+ @impl true
def handle(
%{
data: %{
@@ -97,9 +109,9 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
) do
with %User{} = follower <- User.get_cached_by_ap_id(following_user),
%User{} = followed <- User.get_cached_by_ap_id(followed_user),
- {_, {:ok, _}, _, _} <-
+ {_, {:ok, _, _}, _, _} <-
{:following, User.follow(follower, followed, :follow_pending), follower, followed} do
- if followed.local && !followed.locked do
+ if followed.local && !followed.is_locked do
{:ok, accept_data, _} = Builder.accept(followed, object)
{:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
end
@@ -125,6 +137,7 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
# Tasks this handles:
# - Unfollow and block
+ @impl true
def handle(
%{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
object,
@@ -143,6 +156,7 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
#
# For a local user, we also get a changeset with the full information, so we
# can update non-federating, non-activitypub settings as well.
+ @impl true
def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
if changeset = Keyword.get(meta, :user_update_changeset) do
changeset
@@ -161,6 +175,7 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
# Tasks this handles:
# - Add like to object
# - Set up notification
+ @impl true
def handle(%{data: %{"type" => "Like"}} = object, meta) do
liked_object = Object.get_by_ap_id(object.data["object"])
Utils.add_like_to_object(object, liked_object)
@@ -178,26 +193,41 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
# - Increase replies count
# - Set up ActivityExpiration
# - Set up notifications
+ @impl true
def handle(%{data: %{"type" => "Create"}} = activity, meta) do
- with {:ok, object, meta} <- handle_object_creation(meta[:object_data], meta),
+ with {:ok, object, meta} <- handle_object_creation(meta[:object_data], activity, meta),
%User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
{:ok, notifications} = Notification.create_notifications(activity, do_send: false)
{:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
+ {:ok, _user} = ActivityPub.update_last_status_at_if_public(user, object)
- if in_reply_to = object.data["inReplyTo"] do
+ if in_reply_to = object.data["type"] != "Answer" && object.data["inReplyTo"] do
Object.increase_replies_count(in_reply_to)
end
- if expires_at = activity.data["expires_at"] do
- ActivityExpiration.create(activity, expires_at)
+ reply_depth = (meta[:depth] || 0) + 1
+
+ # FIXME: Force inReplyTo to replies
+ if Pleroma.Web.Federator.allowed_thread_distance?(reply_depth) and
+ object.data["replies"] != nil do
+ for reply_id <- object.data["replies"] do
+ Pleroma.Workers.RemoteFetcherWorker.enqueue("fetch_remote", %{
+ "id" => reply_id,
+ "depth" => reply_depth
+ })
+ end
end
- BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
+ ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
+ Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
+ end)
meta =
meta
|> add_notifications(notifications)
+ ap_streamer().stream_out(activity)
+
{:ok, activity, meta}
else
e -> Repo.rollback(e)
@@ -208,6 +238,7 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
# - Add announce to object
# - Set up notification
# - Stream out the announce
+ @impl true
def handle(%{data: %{"type" => "Announce"}} = object, meta) do
announced_object = Object.get_by_ap_id(object.data["object"])
user = User.get_cached_by_ap_id(object.data["actor"])
@@ -217,14 +248,13 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
if !User.is_internal_user?(user) do
Notification.create_notifications(object)
- object
- |> Topics.get_activity_topics()
- |> Streamer.stream(object)
+ ap_streamer().stream_out(object)
end
{:ok, object, meta}
end
+ @impl true
def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
with undone_object <- Activity.get_by_ap_id(undone_object),
:ok <- handle_undoing(undone_object) do
@@ -235,6 +265,7 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
# Tasks this handles:
# - Add reaction to object
# - Set up notification
+ @impl true
def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
reacted_object = Object.get_by_ap_id(object.data["object"])
Utils.add_emoji_reaction_to_object(object, reacted_object)
@@ -251,18 +282,19 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
# - Reduce the user note count
# - Reduce the reply count
# - Stream out the activity
+ @impl true
def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
deleted_object =
- Object.normalize(deleted_object, false) ||
+ Object.normalize(deleted_object, fetch: false) ||
User.get_cached_by_ap_id(deleted_object)
result =
case deleted_object do
%Object{} ->
- with {:ok, deleted_object, activity} <- Object.delete(deleted_object),
+ with {:ok, deleted_object, _activity} <- Object.delete(deleted_object),
{_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
%User{} = user <- User.get_cached_by_ap_id(actor) do
- User.remove_pinnned_activity(user, activity)
+ User.remove_pinned_object_id(user, deleted_object.data["id"])
{:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
@@ -272,12 +304,12 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
MessageReference.delete_for_object(deleted_object)
- ActivityPub.stream_out(object)
- ActivityPub.stream_out_participations(deleted_object, user)
+ ap_streamer().stream_out(object)
+ ap_streamer().stream_out_participations(deleted_object, user)
:ok
else
{:actor, _} ->
- Logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
+ @logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
:no_object_actor
end
@@ -295,23 +327,88 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
end
end
+ # Tasks this handles:
+ # - adds pin to user
+ # - removes expiration job for pinned activity, if was set for expiration
+ @impl true
+ def handle(%{data: %{"type" => "Add"} = data} = object, meta) do
+ with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
+ {:ok, _user} <- User.add_pinned_object_id(user, data["object"]) do
+ # if pinned activity was scheduled for deletion, we remove job
+ if expiration = Pleroma.Workers.PurgeExpiredActivity.get_expiration(meta[:activity_id]) do
+ Oban.cancel_job(expiration.id)
+ end
+
+ {:ok, object, meta}
+ else
+ nil ->
+ {:error, :user_not_found}
+
+ {:error, changeset} ->
+ if changeset.errors[:pinned_objects] do
+ {:error, :pinned_statuses_limit_reached}
+ else
+ changeset.errors
+ end
+ end
+ end
+
+ # Tasks this handles:
+ # - removes pin from user
+ # - removes corresponding Add activity
+ # - if activity had expiration, recreates activity expiration job
+ @impl true
+ def handle(%{data: %{"type" => "Remove"} = data} = object, meta) do
+ with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
+ {:ok, _user} <- User.remove_pinned_object_id(user, data["object"]) do
+ data["object"]
+ |> Activity.add_by_params_query(user.ap_id, user.featured_address)
+ |> Repo.delete_all()
+
+ # if pinned activity was scheduled for deletion, we reschedule it for deletion
+ if meta[:expires_at] do
+ # MRF.ActivityExpirationPolicy used UTC timestamps for expires_at in original implementation
+ {:ok, expires_at} =
+ Pleroma.EctoType.ActivityPub.ObjectValidators.DateTime.cast(meta[:expires_at])
+
+ Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
+ activity_id: meta[:activity_id],
+ expires_at: expires_at
+ })
+ end
+
+ {:ok, object, meta}
+ else
+ nil -> {:error, :user_not_found}
+ error -> error
+ end
+ end
+
# Nothing to do
+ @impl true
def handle(object, meta) do
{:ok, object, meta}
end
- def handle_object_creation(%{"type" => "ChatMessage"} = object, meta) do
+ def handle_object_creation(%{"type" => "ChatMessage"} = object, _activity, meta) do
with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
actor = User.get_cached_by_ap_id(object.data["actor"])
recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
streamables =
[[actor, recipient], [recipient, actor]]
+ |> Enum.uniq()
|> Enum.map(fn [user, other_user] ->
if user.local do
{:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
{:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
+ @cachex.put(
+ :chat_message_id_idempotency_key_cache,
+ cm_ref.id,
+ meta[:idempotency_key]
+ )
+
{
["user", "user:pleroma_chat"],
{user, %{cm_ref | chat: chat, object: object}}
@@ -328,7 +425,14 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
end
end
- def handle_object_creation(%{"type" => "Answer"} = object_map, meta) do
+ def handle_object_creation(%{"type" => "Question"} = object, activity, meta) do
+ with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
+ PollWorker.schedule_poll_end(activity)
+ {:ok, object, meta}
+ end
+ end
+
+ def handle_object_creation(%{"type" => "Answer"} = object_map, _activity, meta) do
with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
Object.increase_vote_count(
object.data["inReplyTo"],
@@ -340,15 +444,15 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
end
end
- def handle_object_creation(%{"type" => objtype} = object, meta)
- when objtype in ~w[Audio Question Event] do
+ def handle_object_creation(%{"type" => objtype} = object, _activity, meta)
+ when objtype in ~w[Audio Video Event Article Note Page] do
with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
{:ok, object, meta}
end
end
# Nothing to do
- def handle_object_creation(object, meta) do
+ def handle_object_creation(object, _activity, meta) do
{:ok, object, meta}
end
@@ -439,6 +543,7 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
|> Keyword.put(:notifications, notifications ++ existing)
end
+ @impl true
def handle_after_transaction(meta) do
meta
|> send_notifications()