1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
|
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.Federator do
alias Pleroma.Activity
alias Pleroma.Object.Containment
alias Pleroma.User
alias Pleroma.Web.ActivityPub.ActivityPub
alias Pleroma.Web.ActivityPub.Transmogrifier
alias Pleroma.Web.ActivityPub.Utils
alias Pleroma.Web.Federator.Publisher
alias Pleroma.Web.OStatus
alias Pleroma.Web.Websub
alias Pleroma.Workers.PublisherWorker
alias Pleroma.Workers.ReceiverWorker
alias Pleroma.Workers.SubscriberWorker
require Logger
def init do
# To do: consider removing this call in favor of scheduled execution (`quantum`-based)
refresh_subscriptions(schedule_in: 60)
end
@doc "Addresses [memory leaks on recursive replies fetching](https://git.pleroma.social/pleroma/pleroma/issues/161)"
# credo:disable-for-previous-line Credo.Check.Readability.MaxLineLength
def allowed_incoming_reply_depth?(depth) do
max_replies_depth = Pleroma.Config.get([:instance, :federation_incoming_replies_max_depth])
if max_replies_depth do
(depth || 1) <= max_replies_depth
else
true
end
end
# Client API
def incoming_doc(doc) do
ReceiverWorker.enqueue("incoming_doc", %{"body" => doc})
end
def incoming_ap_doc(params) do
ReceiverWorker.enqueue("incoming_ap_doc", %{"params" => params})
end
def publish(%{id: "pleroma:fakeid"} = activity) do
perform(:publish, activity)
end
def publish(activity) do
PublisherWorker.enqueue("publish", %{"activity_id" => activity.id})
end
def verify_websub(websub) do
SubscriberWorker.enqueue("verify_websub", %{"websub_id" => websub.id})
end
def request_subscription(websub) do
SubscriberWorker.enqueue("request_subscription", %{"websub_id" => websub.id})
end
def refresh_subscriptions(worker_args \\ []) do
SubscriberWorker.enqueue("refresh_subscriptions", %{}, worker_args ++ [max_attempts: 1])
end
# Job Worker Callbacks
@spec perform(atom(), module(), any()) :: {:ok, any()} | {:error, any()}
def perform(:publish_one, module, params) do
apply(module, :publish_one, [params])
end
def perform(:publish, activity) do
Logger.debug(fn -> "Running publish for #{activity.data["id"]}" end)
with %User{} = actor <- User.get_cached_by_ap_id(activity.data["actor"]),
{:ok, actor} <- User.ensure_keys_present(actor) do
Publisher.publish(actor, activity)
end
end
def perform(:incoming_doc, doc) do
Logger.info("Got document, trying to parse")
OStatus.handle_incoming(doc)
end
def perform(:incoming_ap_doc, params) do
Logger.info("Handling incoming AP activity")
params = Utils.normalize_params(params)
# NOTE: we use the actor ID to do the containment, this is fine because an
# actor shouldn't be acting on objects outside their own AP server.
with {:ok, _user} <- ap_enabled_actor(params["actor"]),
nil <- Activity.normalize(params["id"]),
:ok <- Containment.contain_origin_from_id(params["actor"], params),
{:ok, activity} <- Transmogrifier.handle_incoming(params) do
{:ok, activity}
else
%Activity{} ->
Logger.info("Already had #{params["id"]}")
:error
_e ->
# Just drop those for now
Logger.info("Unhandled activity")
Logger.info(Jason.encode!(params, pretty: true))
:error
end
end
def perform(:request_subscription, websub) do
Logger.debug("Refreshing #{websub.topic}")
with {:ok, websub} <- Websub.request_subscription(websub) do
Logger.debug("Successfully refreshed #{websub.topic}")
else
_e -> Logger.debug("Couldn't refresh #{websub.topic}")
end
end
def perform(:verify_websub, websub) do
Logger.debug(fn ->
"Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})"
end)
Websub.verify(websub)
end
def perform(:refresh_subscriptions) do
Logger.debug("Federator running refresh subscriptions")
Websub.refresh_subscriptions()
end
def ap_enabled_actor(id) do
user = User.get_cached_by_ap_id(id)
if User.ap_enabled?(user) do
{:ok, user}
else
ActivityPub.make_user_from_ap_id(id)
end
end
end
|