aboutsummaryrefslogtreecommitdiff
path: root/lib/pleroma
diff options
context:
space:
mode:
authorMaxim Filippov <colixer@gmail.com>2019-09-16 19:44:06 +0300
committerMaxim Filippov <colixer@gmail.com>2019-09-16 19:44:06 +0300
commitdf15ed13d15db5b5a371345fcb9968b5af4100af (patch)
tree02f8834a25733ed7ee02fa9aa4dd66b606f78030 /lib/pleroma
parentd1abf7a3585e4bc1ebea4f615ae2e149d5a56918 (diff)
parenta58f29b826333c1ecb0907228f0e087a3ecd9778 (diff)
downloadpleroma-df15ed13d15db5b5a371345fcb9968b5af4100af.tar.gz
Merge branch 'develop' into feature/moderation-log-filters
Diffstat (limited to 'lib/pleroma')
-rw-r--r--lib/pleroma/activity.ex196
-rw-r--r--lib/pleroma/activity/ir/topics.ex63
-rw-r--r--lib/pleroma/activity/queries.ex32
-rw-r--r--lib/pleroma/application.ex26
-rw-r--r--lib/pleroma/daemons/activity_expiration_daemon.ex (renamed from lib/pleroma/activity_expiration_worker.ex)8
-rw-r--r--lib/pleroma/daemons/digest_email_daemon.ex (renamed from lib/pleroma/digest_email_worker.ex)13
-rw-r--r--lib/pleroma/daemons/scheduled_activity_daemon.ex (renamed from lib/pleroma/scheduled_activity_worker.ex)8
-rw-r--r--lib/pleroma/delivery.ex51
-rw-r--r--lib/pleroma/docs/generator.ex73
-rw-r--r--lib/pleroma/docs/json.ex20
-rw-r--r--lib/pleroma/docs/markdown.ex78
-rw-r--r--lib/pleroma/emails/mailer.ex8
-rw-r--r--lib/pleroma/healthcheck.ex65
-rw-r--r--lib/pleroma/instances/instance.ex8
-rw-r--r--lib/pleroma/notification.ex6
-rw-r--r--lib/pleroma/plugs/cache.ex18
-rw-r--r--lib/pleroma/plugs/http_signature.ex3
-rw-r--r--lib/pleroma/scheduler.ex7
-rw-r--r--lib/pleroma/user.ex82
-rw-r--r--lib/pleroma/user/info.ex17
-rw-r--r--lib/pleroma/web/activity_pub/activity_pub.ex53
-rw-r--r--lib/pleroma/web/activity_pub/activity_pub_controller.ex27
-rw-r--r--lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex5
-rw-r--r--lib/pleroma/web/activity_pub/publisher.ex31
-rw-r--r--lib/pleroma/web/activity_pub/transmogrifier.ex7
-rw-r--r--lib/pleroma/web/activity_pub/utils.ex176
-rw-r--r--lib/pleroma/web/admin_api/config.ex15
-rw-r--r--lib/pleroma/web/controller_helper.ex95
-rw-r--r--lib/pleroma/web/federator/federator.ex90
-rw-r--r--lib/pleroma/web/federator/publisher.ex24
-rw-r--r--lib/pleroma/web/federator/retry_queue.ex239
-rw-r--r--lib/pleroma/web/mastodon_api/controllers/mastodon_api_controller.ex46
-rw-r--r--lib/pleroma/web/mastodon_api/views/status_view.ex6
-rw-r--r--lib/pleroma/web/mastodon_api/websocket_handler.ex7
-rw-r--r--lib/pleroma/web/oauth/token/clean_worker.ex5
-rw-r--r--lib/pleroma/web/pleroma_api/pleroma_api_controller.ex27
-rw-r--r--lib/pleroma/web/push/push.ex7
-rw-r--r--lib/pleroma/web/rich_media/parser.ex6
-rw-r--r--lib/pleroma/web/router.ex4
-rw-r--r--lib/pleroma/web/salmon/salmon.ex11
-rw-r--r--lib/pleroma/web/streamer.ex318
-rw-r--r--lib/pleroma/web/streamer/ping.ex33
-rw-r--r--lib/pleroma/web/streamer/state.ex68
-rw-r--r--lib/pleroma/web/streamer/streamer.ex55
-rw-r--r--lib/pleroma/web/streamer/streamer_socket.ex31
-rw-r--r--lib/pleroma/web/streamer/supervisor.ex33
-rw-r--r--lib/pleroma/web/streamer/worker.ex220
-rw-r--r--lib/pleroma/web/twitter_api/controllers/util_controller.ex33
-rw-r--r--lib/pleroma/web/views/streamer_view.ex66
-rw-r--r--lib/pleroma/web/web.ex18
-rw-r--r--lib/pleroma/workers/activity_expiration_worker.ex18
-rw-r--r--lib/pleroma/workers/background_worker.ex69
-rw-r--r--lib/pleroma/workers/digest_emails_worker.ex16
-rw-r--r--lib/pleroma/workers/mailer_worker.ex15
-rw-r--r--lib/pleroma/workers/publisher_worker.ex25
-rw-r--r--lib/pleroma/workers/receiver_worker.ex18
-rw-r--r--lib/pleroma/workers/scheduled_activity_worker.ex12
-rw-r--r--lib/pleroma/workers/subscriber_worker.ex26
-rw-r--r--lib/pleroma/workers/transmogrifier_worker.ex15
-rw-r--r--lib/pleroma/workers/web_pusher_worker.ex16
-rw-r--r--lib/pleroma/workers/worker_helper.ex46
61 files changed, 1664 insertions, 1150 deletions
diff --git a/lib/pleroma/activity.ex b/lib/pleroma/activity.ex
index 6a51d4cf3..ec558168a 100644
--- a/lib/pleroma/activity.ex
+++ b/lib/pleroma/activity.ex
@@ -6,6 +6,7 @@ defmodule Pleroma.Activity do
use Ecto.Schema
alias Pleroma.Activity
+ alias Pleroma.Activity.Queries
alias Pleroma.ActivityExpiration
alias Pleroma.Bookmark
alias Pleroma.Notification
@@ -65,8 +66,8 @@ defmodule Pleroma.Activity do
timestamps()
end
- def with_joined_object(query) do
- join(query, :inner, [activity], o in Object,
+ def with_joined_object(query, join_type \\ :inner) do
+ join(query, join_type, [activity], o in Object,
on:
fragment(
"(?->>'id') = COALESCE(?->'object'->>'id', ?->>'object')",
@@ -78,10 +79,10 @@ defmodule Pleroma.Activity do
)
end
- def with_preloaded_object(query) do
+ def with_preloaded_object(query, join_type \\ :inner) do
query
|> has_named_binding?(:object)
- |> if(do: query, else: with_joined_object(query))
+ |> if(do: query, else: with_joined_object(query, join_type))
|> preload([activity, object: object], object: object)
end
@@ -107,12 +108,9 @@ defmodule Pleroma.Activity do
def with_set_thread_muted_field(query, _), do: query
def get_by_ap_id(ap_id) do
- Repo.one(
- from(
- activity in Activity,
- where: fragment("(?)->>'id' = ?", activity.data, ^to_string(ap_id))
- )
- )
+ ap_id
+ |> Queries.by_ap_id()
+ |> Repo.one()
end
def get_bookmark(%Activity{} = activity, %User{} = user) do
@@ -133,21 +131,10 @@ defmodule Pleroma.Activity do
end
def get_by_ap_id_with_object(ap_id) do
- Repo.one(
- from(
- activity in Activity,
- where: fragment("(?)->>'id' = ?", activity.data, ^to_string(ap_id)),
- left_join: o in Object,
- on:
- fragment(
- "(?->>'id') = COALESCE(?->'object'->>'id', ?->>'object')",
- o.data,
- activity.data,
- activity.data
- ),
- preload: [object: o]
- )
- )
+ ap_id
+ |> Queries.by_ap_id()
+ |> with_preloaded_object(:left)
+ |> Repo.one()
end
def get_by_id(id) do
@@ -158,66 +145,34 @@ defmodule Pleroma.Activity do
end
def get_by_id_with_object(id) do
- from(activity in Activity,
- where: activity.id == ^id,
- inner_join: o in Object,
- on:
- fragment(
- "(?->>'id') = COALESCE(?->'object'->>'id', ?->>'object')",
- o.data,
- activity.data,
- activity.data
- ),
- preload: [object: o]
- )
+ Activity
+ |> where(id: ^id)
+ |> with_preloaded_object()
|> Repo.one()
end
- def by_object_ap_id(ap_id) do
- from(
- activity in Activity,
- where:
- fragment(
- "coalesce((?)->'object'->>'id', (?)->>'object') = ?",
- activity.data,
- activity.data,
- ^to_string(ap_id)
- )
- )
- end
-
- def create_by_object_ap_id(ap_ids) when is_list(ap_ids) do
- from(
- activity in Activity,
- where:
- fragment(
- "coalesce((?)->'object'->>'id', (?)->>'object') = ANY(?)",
- activity.data,
- activity.data,
- ^ap_ids
- ),
- where: fragment("(?)->>'type' = 'Create'", activity.data)
- )
+ def all_by_ids_with_object(ids) do
+ Activity
+ |> where([a], a.id in ^ids)
+ |> with_preloaded_object()
+ |> Repo.all()
end
- def create_by_object_ap_id(ap_id) when is_binary(ap_id) do
- from(
- activity in Activity,
- where:
- fragment(
- "coalesce((?)->'object'->>'id', (?)->>'object') = ?",
- activity.data,
- activity.data,
- ^to_string(ap_id)
- ),
- where: fragment("(?)->>'type' = 'Create'", activity.data)
- )
+ @doc """
+ Accepts `ap_id` or list of `ap_id`.
+ Returns a query.
+ """
+ @spec create_by_object_ap_id(String.t() | [String.t()]) :: Ecto.Queryable.t()
+ def create_by_object_ap_id(ap_id) do
+ ap_id
+ |> Queries.by_object_id()
+ |> Queries.by_type("Create")
end
- def create_by_object_ap_id(_), do: nil
-
def get_all_create_by_object_ap_id(ap_id) do
- Repo.all(create_by_object_ap_id(ap_id))
+ ap_id
+ |> create_by_object_ap_id()
+ |> Repo.all()
end
def get_create_by_object_ap_id(ap_id) when is_binary(ap_id) do
@@ -228,54 +183,17 @@ defmodule Pleroma.Activity do
def get_create_by_object_ap_id(_), do: nil
- def create_by_object_ap_id_with_object(ap_ids) when is_list(ap_ids) do
- from(
- activity in Activity,
- where:
- fragment(
- "coalesce((?)->'object'->>'id', (?)->>'object') = ANY(?)",
- activity.data,
- activity.data,
- ^ap_ids
- ),
- where: fragment("(?)->>'type' = 'Create'", activity.data),
- inner_join: o in Object,
- on:
- fragment(
- "(?->>'id') = COALESCE(?->'object'->>'id', ?->>'object')",
- o.data,
- activity.data,
- activity.data
- ),
- preload: [object: o]
- )
- end
-
- def create_by_object_ap_id_with_object(ap_id) when is_binary(ap_id) do
- from(
- activity in Activity,
- where:
- fragment(
- "coalesce((?)->'object'->>'id', (?)->>'object') = ?",
- activity.data,
- activity.data,
- ^to_string(ap_id)
- ),
- where: fragment("(?)->>'type' = 'Create'", activity.data),
- inner_join: o in Object,
- on:
- fragment(
- "(?->>'id') = COALESCE(?->'object'->>'id', ?->>'object')",
- o.data,
- activity.data,
- activity.data
- ),
- preload: [object: o]
- )
+ @doc """
+ Accepts `ap_id` or list of `ap_id`.
+ Returns a query.
+ """
+ @spec create_by_object_ap_id_with_object(String.t() | [String.t()]) :: Ecto.Queryable.t()
+ def create_by_object_ap_id_with_object(ap_id) do
+ ap_id
+ |> create_by_object_ap_id()
+ |> with_preloaded_object()
end
- def create_by_object_ap_id_with_object(_), do: nil
-
def get_create_by_object_ap_id_with_object(ap_id) when is_binary(ap_id) do
ap_id
|> create_by_object_ap_id_with_object()
@@ -299,7 +217,8 @@ defmodule Pleroma.Activity do
def normalize(_), do: nil
def delete_by_ap_id(id) when is_binary(id) do
- by_object_ap_id(id)
+ id
+ |> Queries.by_object_id()
|> select([u], u)
|> Repo.delete_all()
|> elem(1)
@@ -343,31 +262,10 @@ defmodule Pleroma.Activity do
end
def follow_requests_for_actor(%Pleroma.User{ap_id: ap_id}) do
- from(
- a in Activity,
- where:
- fragment(
- "? ->> 'type' = 'Follow'",
- a.data
- ),
- where:
- fragment(
- "? ->> 'state' = 'pending'",
- a.data
- ),
- where:
- fragment(
- "coalesce((?)->'object'->>'id', (?)->>'object') = ?",
- a.data,
- a.data,
- ^ap_id
- )
- )
- end
-
- @spec query_by_actor(actor()) :: Ecto.Query.t()
- def query_by_actor(actor) do
- from(a in Activity, where: a.actor == ^actor)
+ ap_id
+ |> Queries.by_object_id()
+ |> Queries.by_type("Follow")
+ |> where([a], fragment("? ->> 'state' = 'pending'", a.data))
end
def restrict_deactivated_users(query) do
diff --git a/lib/pleroma/activity/ir/topics.ex b/lib/pleroma/activity/ir/topics.ex
new file mode 100644
index 000000000..010897abc
--- /dev/null
+++ b/lib/pleroma/activity/ir/topics.ex
@@ -0,0 +1,63 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Activity.Ir.Topics do
+ alias Pleroma.Object
+ alias Pleroma.Web.ActivityPub.Visibility
+
+ def get_activity_topics(activity) do
+ activity
+ |> Object.normalize()
+ |> generate_topics(activity)
+ |> List.flatten()
+ end
+
+ defp generate_topics(%{data: %{"type" => "Answer"}}, _) do
+ []
+ end
+
+ defp generate_topics(object, activity) do
+ ["user", "list"] ++ visibility_tags(object, activity)
+ end
+
+ defp visibility_tags(object, activity) do
+ case Visibility.get_visibility(activity) do
+ "public" ->
+ if activity.local do
+ ["public", "public:local"]
+ else
+ ["public"]
+ end
+ |> item_creation_tags(object, activity)
+
+ "direct" ->
+ ["direct"]
+
+ _ ->
+ []
+ end
+ end
+
+ defp item_creation_tags(tags, %{data: %{"type" => "Create"}} = object, activity) do
+ tags ++ hashtags_to_topics(object) ++ attachment_topics(object, activity)
+ end
+
+ defp item_creation_tags(tags, _, _) do
+ tags
+ end
+
+ defp hashtags_to_topics(%{data: %{"tag" => tags}}) do
+ tags
+ |> Enum.filter(&is_bitstring(&1))
+ |> Enum.map(fn tag -> "hashtag:" <> tag end)
+ end
+
+ defp hashtags_to_topics(_), do: []
+
+ defp attachment_topics(%{data: %{"attachment" => []}}, _act), do: []
+
+ defp attachment_topics(_object, %{local: true}), do: ["public:media", "public:local:media"]
+
+ defp attachment_topics(_object, _act), do: ["public:media"]
+end
diff --git a/lib/pleroma/activity/queries.ex b/lib/pleroma/activity/queries.ex
index aa5b29566..13fa33831 100644
--- a/lib/pleroma/activity/queries.ex
+++ b/lib/pleroma/activity/queries.ex
@@ -13,6 +13,14 @@ defmodule Pleroma.Activity.Queries do
alias Pleroma.Activity
+ @spec by_ap_id(query, String.t()) :: query
+ def by_ap_id(query \\ Activity, ap_id) do
+ from(
+ activity in query,
+ where: fragment("(?)->>'id' = ?", activity.data, ^to_string(ap_id))
+ )
+ end
+
@spec by_actor(query, String.t()) :: query
def by_actor(query \\ Activity, actor) do
from(
@@ -21,8 +29,23 @@ defmodule Pleroma.Activity.Queries do
)
end
- @spec by_object_id(query, String.t()) :: query
- def by_object_id(query \\ Activity, object_id) do
+ @spec by_object_id(query, String.t() | [String.t()]) :: query
+ def by_object_id(query \\ Activity, object_id)
+
+ def by_object_id(query, object_ids) when is_list(object_ids) do
+ from(
+ activity in query,
+ where:
+ fragment(
+ "coalesce((?)->'object'->>'id', (?)->>'object') = ANY(?)",
+ activity.data,
+ activity.data,
+ ^object_ids
+ )
+ )
+ end
+
+ def by_object_id(query, object_id) when is_binary(object_id) do
from(activity in query,
where:
fragment(
@@ -41,9 +64,4 @@ defmodule Pleroma.Activity.Queries do
where: fragment("(?)->>'type' = ?", activity.data, ^activity_type)
)
end
-
- @spec limit(query, pos_integer()) :: query
- def limit(query \\ Activity, limit) do
- from(activity in query, limit: ^limit)
- end
end
diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex
index 1d46925f8..3b37ce630 100644
--- a/lib/pleroma/application.ex
+++ b/lib/pleroma/application.ex
@@ -31,18 +31,19 @@ defmodule Pleroma.Application do
children =
[
Pleroma.Repo,
+ Pleroma.Scheduler,
Pleroma.Config.TransferTask,
Pleroma.Emoji,
Pleroma.Captcha,
Pleroma.FlakeId,
- Pleroma.ScheduledActivityWorker,
- Pleroma.ActivityExpirationWorker
+ Pleroma.Daemons.ScheduledActivityDaemon,
+ Pleroma.Daemons.ActivityExpirationDaemon
] ++
cachex_children() ++
hackney_pool_children() ++
[
- Pleroma.Web.Federator.RetryQueue,
Pleroma.Stats,
+ {Oban, Pleroma.Config.get(Oban)},
%{
id: :web_push_init,
start: {Task, :start_link, [&Pleroma.Web.Push.init/0]},
@@ -70,9 +71,7 @@ defmodule Pleroma.Application do
# See http://elixir-lang.org/docs/stable/elixir/Supervisor.html
# for other strategies and supported options
opts = [strategy: :one_for_one, name: Pleroma.Supervisor]
- result = Supervisor.start_link(children, opts)
- :ok = after_supervisor_start()
- result
+ Supervisor.start_link(children, opts)
end
defp setup_instrumenters do
@@ -142,7 +141,7 @@ defmodule Pleroma.Application do
defp streamer_child(:test), do: []
defp streamer_child(_) do
- [Pleroma.Web.Streamer]
+ [Pleroma.Web.Streamer.supervisor()]
end
defp oauth_cleanup_child(true),
@@ -164,17 +163,4 @@ defmodule Pleroma.Application do
:hackney_pool.child_spec(pool, options)
end
end
-
- defp after_supervisor_start do
- with digest_config <- Application.get_env(:pleroma, :email_notifications)[:digest],
- true <- digest_config[:active] do
- PleromaJobQueue.schedule(
- digest_config[:schedule],
- :digest_emails,
- Pleroma.DigestEmailWorker
- )
- end
-
- :ok
- end
end
diff --git a/lib/pleroma/activity_expiration_worker.ex b/lib/pleroma/daemons/activity_expiration_daemon.ex
index 0f9e715f8..cab7628c4 100644
--- a/lib/pleroma/activity_expiration_worker.ex
+++ b/lib/pleroma/daemons/activity_expiration_daemon.ex
@@ -2,13 +2,14 @@
# Copyright © 2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
-defmodule Pleroma.ActivityExpirationWorker do
+defmodule Pleroma.Daemons.ActivityExpirationDaemon do
alias Pleroma.Activity
alias Pleroma.ActivityExpiration
alias Pleroma.Config
alias Pleroma.Repo
alias Pleroma.User
alias Pleroma.Web.CommonAPI
+
require Logger
use GenServer
import Ecto.Query
@@ -49,7 +50,10 @@ defmodule Pleroma.ActivityExpirationWorker do
def handle_info(:perform, state) do
ActivityExpiration.due_expirations(@schedule_interval)
|> Enum.each(fn expiration ->
- PleromaJobQueue.enqueue(:activity_expiration, __MODULE__, [:execute, expiration.id])
+ Pleroma.Workers.ActivityExpirationWorker.enqueue(
+ "activity_expiration",
+ %{"activity_expiration_id" => expiration.id}
+ )
end)
schedule_next()
diff --git a/lib/pleroma/digest_email_worker.ex b/lib/pleroma/daemons/digest_email_daemon.ex
index 5644d6a67..462ad2c55 100644
--- a/lib/pleroma/digest_email_worker.ex
+++ b/lib/pleroma/daemons/digest_email_daemon.ex
@@ -2,10 +2,11 @@
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
-defmodule Pleroma.DigestEmailWorker do
- import Ecto.Query
+defmodule Pleroma.Daemons.DigestEmailDaemon do
+ alias Pleroma.Repo
+ alias Pleroma.Workers.DigestEmailsWorker
- @queue_name :digest_emails
+ import Ecto.Query
def perform do
config = Pleroma.Config.get([:email_notifications, :digest])
@@ -20,8 +21,10 @@ defmodule Pleroma.DigestEmailWorker do
where: u.last_digest_emailed_at < datetime_add(^now, ^negative_interval, "day"),
select: u
)
- |> Pleroma.Repo.all()
- |> Enum.each(&PleromaJobQueue.enqueue(@queue_name, __MODULE__, [&1]))
+ |> Repo.all()
+ |> Enum.each(fn user ->
+ DigestEmailsWorker.enqueue("digest_email", %{"user_id" => user.id})
+ end)
end
@doc """
diff --git a/lib/pleroma/scheduled_activity_worker.ex b/lib/pleroma/daemons/scheduled_activity_daemon.ex
index 8578cab5e..aee5f723a 100644
--- a/lib/pleroma/scheduled_activity_worker.ex
+++ b/lib/pleroma/daemons/scheduled_activity_daemon.ex
@@ -2,7 +2,7 @@
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
-defmodule Pleroma.ScheduledActivityWorker do
+defmodule Pleroma.Daemons.ScheduledActivityDaemon do
@moduledoc """
Sends scheduled activities to the job queue.
"""
@@ -11,6 +11,7 @@ defmodule Pleroma.ScheduledActivityWorker do
alias Pleroma.ScheduledActivity
alias Pleroma.User
alias Pleroma.Web.CommonAPI
+
use GenServer
require Logger
@@ -45,7 +46,10 @@ defmodule Pleroma.ScheduledActivityWorker do
def handle_info(:perform, state) do
ScheduledActivity.due_activities(@schedule_interval)
|> Enum.each(fn scheduled_activity ->
- PleromaJobQueue.enqueue(:scheduled_activities, __MODULE__, [:execute, scheduled_activity.id])
+ Pleroma.Workers.ScheduledActivityWorker.enqueue(
+ "execute",
+ %{"activity_id" => scheduled_activity.id}
+ )
end)
schedule_next()
diff --git a/lib/pleroma/delivery.ex b/lib/pleroma/delivery.ex
new file mode 100644
index 000000000..29a1e5a77
--- /dev/null
+++ b/lib/pleroma/delivery.ex
@@ -0,0 +1,51 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Delivery do
+ use Ecto.Schema
+
+ alias Pleroma.Delivery
+ alias Pleroma.FlakeId
+ alias Pleroma.Object
+ alias Pleroma.Repo
+ alias Pleroma.User
+ alias Pleroma.User
+
+ import Ecto.Changeset
+ import Ecto.Query
+
+ schema "deliveries" do
+ belongs_to(:user, User, type: FlakeId)
+ belongs_to(:object, Object)
+ end
+
+ def changeset(delivery, params \\ %{}) do
+ delivery
+ |> cast(params, [:user_id, :object_id])
+ |> validate_required([:user_id, :object_id])
+ |> foreign_key_constraint(:object_id)
+ |> foreign_key_constraint(:user_id)
+ |> unique_constraint(:user_id, name: :deliveries_user_id_object_id_index)
+ end
+
+ def create(object_id, user_id) do
+ %Delivery{}
+ |> changeset(%{user_id: user_id, object_id: object_id})
+ |> Repo.insert(on_conflict: :nothing)
+ end
+
+ def get(object_id, user_id) do
+ from(d in Delivery, where: d.user_id == ^user_id and d.object_id == ^object_id)
+ |> Repo.one()
+ end
+
+ # A hack because user delete activities have a fake id for whatever reason
+ # TODO: Get rid of this
+ def delete_all_by_object_id("pleroma:fake_object_id"), do: {0, []}
+
+ def delete_all_by_object_id(object_id) do
+ from(d in Delivery, where: d.object_id == ^object_id)
+ |> Repo.delete_all()
+ end
+end
diff --git a/lib/pleroma/docs/generator.ex b/lib/pleroma/docs/generator.ex
new file mode 100644
index 000000000..aa578eee2
--- /dev/null
+++ b/lib/pleroma/docs/generator.ex
@@ -0,0 +1,73 @@
+defmodule Pleroma.Docs.Generator do
+ @callback process(keyword()) :: {:ok, String.t()}
+
+ @spec process(module(), keyword()) :: {:ok, String.t()}
+ def process(implementation, descriptions) do
+ implementation.process(descriptions)
+ end
+
+ @spec uploaders_list() :: [module()]
+ def uploaders_list do
+ {:ok, modules} = :application.get_key(:pleroma, :modules)
+
+ Enum.filter(modules, fn module ->
+ name_as_list = Module.split(module)
+
+ List.starts_with?(name_as_list, ["Pleroma", "Uploaders"]) and
+ List.last(name_as_list) != "Uploader"
+ end)
+ end
+
+ @spec filters_list() :: [module()]
+ def filters_list do
+ {:ok, modules} = :application.get_key(:pleroma, :modules)
+
+ Enum.filter(modules, fn module ->
+ name_as_list = Module.split(module)
+
+ List.starts_with?(name_as_list, ["Pleroma", "Upload", "Filter"])
+ end)
+ end
+
+ @spec mrf_list() :: [module()]
+ def mrf_list do
+ {:ok, modules} = :application.get_key(:pleroma, :modules)
+
+ Enum.filter(modules, fn module ->
+ name_as_list = Module.split(module)
+
+ List.starts_with?(name_as_list, ["Pleroma", "Web", "ActivityPub", "MRF"]) and
+ length(name_as_list) > 4
+ end)
+ end
+
+ @spec richmedia_parsers() :: [module()]
+ def richmedia_parsers do
+ {:ok, modules} = :application.get_key(:pleroma, :modules)
+
+ Enum.filter(modules, fn module ->
+ name_as_list = Module.split(module)
+
+ List.starts_with?(name_as_list, ["Pleroma", "Web", "RichMedia", "Parsers"]) and
+ length(name_as_list) == 5
+ end)
+ end
+end
+
+defimpl Jason.Encoder, for: Tuple do
+ def encode(tuple, opts) do
+ Jason.Encode.list(Tuple.to_list(tuple), opts)
+ end
+end
+
+defimpl Jason.Encoder, for: [Regex, Function] do
+ def encode(term, opts) do
+ Jason.Encode.string(inspect(term), opts)
+ end
+end
+
+defimpl String.Chars, for: Regex do
+ def to_string(term) do
+ inspect(term)
+ end
+end
diff --git a/lib/pleroma/docs/json.ex b/lib/pleroma/docs/json.ex
new file mode 100644
index 000000000..18ba01d58
--- /dev/null
+++ b/lib/pleroma/docs/json.ex
@@ -0,0 +1,20 @@
+defmodule Pleroma.Docs.JSON do
+ @behaviour Pleroma.Docs.Generator
+
+ @spec process(keyword()) :: {:ok, String.t()}
+ def process(descriptions) do
+ config_path = "docs/generate_config.json"
+
+ with {:ok, file} <- File.open(config_path, [:write]),
+ json <- generate_json(descriptions),
+ :ok <- IO.write(file, json),
+ :ok <- File.close(file) do
+ {:ok, config_path}
+ end
+ end
+
+ @spec generate_json([keyword()]) :: String.t()
+ def generate_json(descriptions) do
+ Jason.encode!(descriptions)
+ end
+end
diff --git a/lib/pleroma/docs/markdown.ex b/lib/pleroma/docs/markdown.ex
new file mode 100644
index 000000000..8386dc2fb
--- /dev/null
+++ b/lib/pleroma/docs/markdown.ex
@@ -0,0 +1,78 @@
+defmodule Pleroma.Docs.Markdown do
+ @behaviour Pleroma.Docs.Generator
+
+ @spec process(keyword()) :: {:ok, String.t()}
+ def process(descriptions) do
+ config_path = "docs/generated_config.md"
+ {:ok, file} = File.open(config_path, [:utf8, :write])
+ IO.write(file, "# Generated configuration\n")
+ IO.write(file, "Date of generation: #{Date.utc_today()}\n\n")
+
+ IO.write(
+ file,
+ "This file describe the configuration, it is recommended to edit the relevant `*.secret.exs` file instead of the others founds in the ``config`` directory.\n\n" <>
+ "If you run Pleroma with ``MIX_ENV=prod`` the file is ``prod.secret.exs``, otherwise it is ``dev.secret.exs``.\n\n"
+ )
+
+ for group <- descriptions do
+ if is_nil(group[:key]) do
+ IO.write(file, "## #{inspect(group[:group])}\n")
+ else
+ IO.write(file, "## #{inspect(group[:key])}\n")
+ end
+
+ IO.write(file, "#{group[:description]}\n")
+
+ for child <- group[:children] do
+ print_child_header(file, child)
+
+ print_suggestions(file, child[:suggestions])
+
+ if child[:children] do
+ for subchild <- child[:children] do
+ print_child_header(file, subchild)
+
+ print_suggestions(file, subchild[:suggestions])
+ end
+ end
+ end
+
+ IO.write(file, "\n")
+ end
+
+ :ok = File.close(file)
+ {:ok, config_path}
+ end
+
+ defp print_suggestion(file, suggestion) when is_list(suggestion) do
+ IO.write(file, " `#{inspect(suggestion)}`\n")
+ end
+
+ defp print_suggestion(file, suggestion) when is_function(suggestion) do
+ IO.write(file, " `#{inspect(suggestion.())}`\n")
+ end
+
+ defp print_suggestion(file, suggestion, as_list \\ false) do
+ list_mark = if as_list, do: "- ", else: ""
+ IO.write(file, " #{list_mark}`#{inspect(suggestion)}`\n")
+ end
+
+ defp print_suggestions(_file, nil), do: nil
+
+ defp print_suggestions(file, suggestions) do
+ IO.write(file, "Suggestions:\n")
+
+ if length(suggestions) > 1 do
+ for suggestion <- suggestions do
+ print_suggestion(file, suggestion, true)
+ end
+ else
+ print_suggestion(file, List.first(suggestions))
+ end
+ end
+
+ defp print_child_header(file, child) do
+ IO.write(file, "- `#{inspect(child[:key])}` -`#{inspect(child[:type])}` \n")
+ IO.write(file, "#{child[:description]} \n")
+ end
+end
diff --git a/lib/pleroma/emails/mailer.ex b/lib/pleroma/emails/mailer.ex
index 2e4657b7c..eb96f2e8b 100644
--- a/lib/pleroma/emails/mailer.ex
+++ b/lib/pleroma/emails/mailer.ex
@@ -9,6 +9,7 @@ defmodule Pleroma.Emails.Mailer do
The module contains functions to delivery email using Swoosh.Mailer.
"""
+ alias Pleroma.Workers.MailerWorker
alias Swoosh.DeliveryError
@otp_app :pleroma
@@ -19,7 +20,12 @@ defmodule Pleroma.Emails.Mailer do
@doc "add email to queue"
def deliver_async(email, config \\ []) do
- PleromaJobQueue.enqueue(:mailer, __MODULE__, [:deliver_async, email, config])
+ encoded_email =
+ email
+ |> :erlang.term_to_binary()
+ |> Base.encode64()
+
+ MailerWorker.enqueue("email", %{"encoded_email" => encoded_email, "config" => config})
end
@doc "callback to perform send email from queue"
diff --git a/lib/pleroma/healthcheck.ex b/lib/pleroma/healthcheck.ex
new file mode 100644
index 000000000..977b78c26
--- /dev/null
+++ b/lib/pleroma/healthcheck.ex
@@ -0,0 +1,65 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Healthcheck do
+ @moduledoc """
+ Module collects metrics about app and assign healthy status.
+ """
+ alias Pleroma.Healthcheck
+ alias Pleroma.Repo
+
+ @derive Jason.Encoder
+ defstruct pool_size: 0,
+ active: 0,
+ idle: 0,
+ memory_used: 0,
+ healthy: true
+
+ @type t :: %__MODULE__{
+ pool_size: non_neg_integer(),
+ active: non_neg_integer(),
+ idle: non_neg_integer(),
+ memory_used: number(),
+ healthy: boolean()
+ }
+
+ @spec system_info() :: t()
+ def system_info do
+ %Healthcheck{
+ memory_used: Float.round(:erlang.memory(:total) / 1024 / 1024, 2)
+ }
+ |> assign_db_info()
+ |> check_health()
+ end
+
+ defp assign_db_info(healthcheck) do
+ database = Pleroma.Config.get([Repo, :database])
+
+ query =
+ "select state, count(pid) from pg_stat_activity where datname = '#{database}' group by state;"
+
+ result = Repo.query!(query)
+ pool_size = Pleroma.Config.get([Repo, :pool_size])
+
+ db_info =
+ Enum.reduce(result.rows, %{active: 0, idle: 0}, fn [state, cnt], states ->
+ if state == "active" do
+ Map.put(states, :active, states.active + cnt)
+ else
+ Map.put(states, :idle, states.idle + cnt)
+ end
+ end)
+ |> Map.put(:pool_size, pool_size)
+
+ Map.merge(healthcheck, db_info)
+ end
+
+ @spec check_health(Healthcheck.t()) :: Healthcheck.t()
+ def check_health(%{pool_size: pool_size, active: active} = check)
+ when active >= pool_size do
+ %{check | healthy: false}
+ end
+
+ def check_health(check), do: check
+end
diff --git a/lib/pleroma/instances/instance.ex b/lib/pleroma/instances/instance.ex
index 4d7ed4ca1..544c4b687 100644
--- a/lib/pleroma/instances/instance.ex
+++ b/lib/pleroma/instances/instance.ex
@@ -90,7 +90,7 @@ defmodule Pleroma.Instances.Instance do
def set_unreachable(url_or_host, unreachable_since \\ nil)
def set_unreachable(url_or_host, unreachable_since) when is_binary(url_or_host) do
- unreachable_since = unreachable_since || DateTime.utc_now()
+ unreachable_since = parse_datetime(unreachable_since) || NaiveDateTime.utc_now()
host = host(url_or_host)
existing_record = Repo.get_by(Instance, %{host: host})
@@ -114,4 +114,10 @@ defmodule Pleroma.Instances.Instance do
end
def set_unreachable(_, _), do: {:error, nil}
+
+ defp parse_datetime(datetime) when is_binary(datetime) do
+ NaiveDateTime.from_iso8601(datetime)
+ end
+
+ defp parse_datetime(datetime), do: datetime
end
diff --git a/lib/pleroma/notification.ex b/lib/pleroma/notification.ex
index b7c880c51..8012389ac 100644
--- a/lib/pleroma/notification.ex
+++ b/lib/pleroma/notification.ex
@@ -210,8 +210,10 @@ defmodule Pleroma.Notification do
unless skip?(activity, user) do
notification = %Notification{user_id: user.id, activity: activity}
{:ok, notification} = Repo.insert(notification)
- Streamer.stream("user", notification)
- Streamer.stream("user:notification", notification)
+
+ ["user", "user:notification"]
+ |> Streamer.stream(notification)
+
Push.send(notification)
notification
end
diff --git a/lib/pleroma/plugs/cache.ex b/lib/pleroma/plugs/cache.ex
index a81a861d0..50b534e7b 100644
--- a/lib/pleroma/plugs/cache.ex
+++ b/lib/pleroma/plugs/cache.ex
@@ -20,6 +20,7 @@ defmodule Pleroma.Plugs.Cache do
- `ttl`: An expiration time (time-to-live). This value should be in milliseconds or `nil` to disable expiration. Defaults to `nil`.
- `query_params`: Take URL query string into account (`true`), ignore it (`false`) or limit to specific params only (list). Defaults to `true`.
+ - `tracking_fun`: A function that is called on successfull responses, no matter if the request is cached or not. It should accept a conn as the first argument and the value assigned to `tracking_fun_data` as the second.
Additionally, you can overwrite the TTL inside a controller action by assigning `cache_ttl` to the connection struct:
@@ -56,6 +57,11 @@ defmodule Pleroma.Plugs.Cache do
{:ok, nil} ->
cache_resp(conn, opts)
+ {:ok, {content_type, body, tracking_fun_data}} ->
+ conn = opts.tracking_fun.(conn, tracking_fun_data)
+
+ send_cached(conn, {content_type, body})
+
{:ok, record} ->
send_cached(conn, record)
@@ -88,9 +94,17 @@ defmodule Pleroma.Plugs.Cache do
ttl = Map.get(conn.assigns, :cache_ttl, opts.ttl)
key = cache_key(conn, opts)
content_type = content_type(conn)
- record = {content_type, body}
- Cachex.put(:web_resp_cache, key, record, ttl: ttl)
+ conn =
+ unless opts[:tracking_fun] do
+ Cachex.put(:web_resp_cache, key, {content_type, body}, ttl: ttl)
+ conn
+ else
+ tracking_fun_data = Map.get(conn.assigns, :tracking_fun_data, nil)
+ Cachex.put(:web_resp_cache, key, {content_type, body, tracking_fun_data}, ttl: ttl)
+
+ opts.tracking_fun.(conn, tracking_fun_data)
+ end
put_resp_header(conn, "x-cache", "MISS from Pleroma")
diff --git a/lib/pleroma/plugs/http_signature.ex b/lib/pleroma/plugs/http_signature.ex
index d87fa52fa..23d22a712 100644
--- a/lib/pleroma/plugs/http_signature.ex
+++ b/lib/pleroma/plugs/http_signature.ex
@@ -15,7 +15,8 @@ defmodule Pleroma.Web.Plugs.HTTPSignaturePlug do
end
def call(conn, _opts) do
- [signature | _] = get_req_header(conn, "signature")
+ headers = get_req_header(conn, "signature")
+ signature = Enum.at(headers, 0)
if signature do
# set (request-target) header to the appropriate value
diff --git a/lib/pleroma/scheduler.ex b/lib/pleroma/scheduler.ex
new file mode 100644
index 000000000..d84cd99ad
--- /dev/null
+++ b/lib/pleroma/scheduler.ex
@@ -0,0 +1,7 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Scheduler do
+ use Quantum.Scheduler, otp_app: :pleroma
+end
diff --git a/lib/pleroma/user.ex b/lib/pleroma/user.ex
index 3aa245f2a..dd2b1c8c4 100644
--- a/lib/pleroma/user.ex
+++ b/lib/pleroma/user.ex
@@ -11,6 +11,7 @@ defmodule Pleroma.User do
alias Comeonin.Pbkdf2
alias Ecto.Multi
alias Pleroma.Activity
+ alias Pleroma.Delivery
alias Pleroma.Keys
alias Pleroma.Notification
alias Pleroma.Object
@@ -27,6 +28,7 @@ defmodule Pleroma.User do
alias Pleroma.Web.OStatus
alias Pleroma.Web.RelMe
alias Pleroma.Web.Websub
+ alias Pleroma.Workers.BackgroundWorker
require Logger
@@ -61,6 +63,7 @@ defmodule Pleroma.User do
field(:last_digest_emailed_at, :naive_datetime)
has_many(:notifications, Notification)
has_many(:registrations, Registration)
+ has_many(:deliveries, Delivery)
embeds_one(:info, User.Info)
timestamps()
@@ -174,11 +177,25 @@ defmodule Pleroma.User do
|> Repo.aggregate(:count, :id)
end
+ defp truncate_if_exists(params, key, max_length) do
+ if Map.has_key?(params, key) and is_binary(params[key]) do
+ {value, _chopped} = String.split_at(params[key], max_length)
+ Map.put(params, key, value)
+ else
+ params
+ end
+ end
+
def remote_user_creation(params) do
bio_limit = Pleroma.Config.get([:instance, :user_bio_length], 5000)
name_limit = Pleroma.Config.get([:instance, :user_name_length], 100)
- params = Map.put(params, :info, params[:info] || %{})
+ params =
+ params
+ |> Map.put(:info, params[:info] || %{})
+ |> truncate_if_exists(:name, name_limit)
+ |> truncate_if_exists(:bio, bio_limit)
+
info_cng = User.Info.remote_user_creation(%User.Info{}, params[:info])
changes =
@@ -633,8 +650,9 @@ defmodule Pleroma.User do
end
@doc "Fetch some posts when the user has just been federated with"
- def fetch_initial_posts(user),
- do: PleromaJobQueue.enqueue(:background, __MODULE__, [:fetch_initial_posts, user])
+ def fetch_initial_posts(user) do
+ BackgroundWorker.enqueue("fetch_initial_posts", %{"user_id" => user.id})
+ end
@spec get_followers_query(User.t(), pos_integer() | nil) :: Ecto.Query.t()
def get_followers_query(%User{} = user, nil) do
@@ -1064,7 +1082,7 @@ defmodule Pleroma.User do
end
def deactivate_async(user, status \\ true) do
- PleromaJobQueue.enqueue(:background, __MODULE__, [:deactivate_async, user, status])
+ BackgroundWorker.enqueue("deactivate_user", %{"user_id" => user.id, "status" => status})
end
def deactivate(%User{} = user, status \\ true) do
@@ -1092,9 +1110,9 @@ defmodule Pleroma.User do
|> update_and_set_cache()
end
- @spec delete(User.t()) :: :ok
- def delete(%User{} = user),
- do: PleromaJobQueue.enqueue(:background, __MODULE__, [:delete, user])
+ def delete(%User{} = user) do
+ BackgroundWorker.enqueue("delete_user", %{"user_id" => user.id})
+ end
@spec perform(atom(), User.t()) :: {:ok, User.t()}
def perform(:delete, %User{} = user) do
@@ -1201,25 +1219,24 @@ defmodule Pleroma.User do
Repo.all(query)
end
- def blocks_import(%User{} = blocker, blocked_identifiers) when is_list(blocked_identifiers),
- do:
- PleromaJobQueue.enqueue(:background, __MODULE__, [
- :blocks_import,
- blocker,
- blocked_identifiers
- ])
+ def blocks_import(%User{} = blocker, blocked_identifiers) when is_list(blocked_identifiers) do
+ BackgroundWorker.enqueue("blocks_import", %{
+ "blocker_id" => blocker.id,
+ "blocked_identifiers" => blocked_identifiers
+ })
+ end
- def follow_import(%User{} = follower, followed_identifiers) when is_list(followed_identifiers),
- do:
- PleromaJobQueue.enqueue(:background, __MODULE__, [
- :follow_import,
- follower,
- followed_identifiers
- ])
+ def follow_import(%User{} = follower, followed_identifiers)
+ when is_list(followed_identifiers) do
+ BackgroundWorker.enqueue("follow_import", %{
+ "follower_id" => follower.id,
+ "followed_identifiers" => followed_identifiers
+ })
+ end
def delete_user_activities(%User{ap_id: ap_id} = user) do
ap_id
- |> Activity.query_by_actor()
+ |> Activity.Queries.by_actor()
|> RepoStreamer.chunk_stream(50)
|> Stream.each(fn activities ->
Enum.each(activities, &delete_activity(&1))
@@ -1624,4 +1641,25 @@ defmodule Pleroma.User do
def is_internal_user?(%User{nickname: nil}), do: true
def is_internal_user?(%User{local: true, nickname: "internal." <> _}), do: true
def is_internal_user?(_), do: false
+
+ # A hack because user delete activities have a fake id for whatever reason
+ # TODO: Get rid of this
+ def get_delivered_users_by_object_id("pleroma:fake_object_id"), do: []
+
+ def get_delivered_users_by_object_id(object_id) do
+ from(u in User,
+ inner_join: delivery in assoc(u, :deliveries),
+ where: delivery.object_id == ^object_id
+ )
+ |> Repo.all()
+ end
+
+ def change_email(user, email) do
+ user
+ |> cast(%{email: email}, [:email])
+ |> validate_required([:email])
+ |> unique_constraint(:email)
+ |> validate_format(:email, @email_regex)
+ |> update_and_set_cache()
+ end
end
diff --git a/lib/pleroma/user/info.ex b/lib/pleroma/user/info.ex
index 7027c947b..1b5951a0e 100644
--- a/lib/pleroma/user/info.ex
+++ b/lib/pleroma/user/info.ex
@@ -242,6 +242,13 @@ defmodule Pleroma.User.Info do
end
def remote_user_creation(info, params) do
+ params =
+ if Map.has_key?(params, :fields) do
+ Map.put(params, :fields, Enum.map(params[:fields], &truncate_field/1))
+ else
+ params
+ end
+
info
|> cast(params, [
:ap_enabled,
@@ -324,6 +331,16 @@ defmodule Pleroma.User.Info do
defp valid_field?(_), do: false
+ defp truncate_field(%{"name" => name, "value" => value}) do
+ {name, _chopped} =
+ String.split_at(name, Pleroma.Config.get([:instance, :account_field_name_length], 255))
+
+ {value, _chopped} =
+ String.split_at(value, Pleroma.Config.get([:instance, :account_field_value_length], 255))
+
+ %{"name" => name, "value" => value}
+ end
+
@spec confirmation_changeset(Info.t(), keyword()) :: Changeset.t()
def confirmation_changeset(info, opts) do
need_confirmation? = Keyword.get(opts, :need_confirmation)
diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex
index d23ec66ac..bc5ae7fbf 100644
--- a/lib/pleroma/web/activity_pub/activity_pub.ex
+++ b/lib/pleroma/web/activity_pub/activity_pub.ex
@@ -4,6 +4,7 @@
defmodule Pleroma.Web.ActivityPub.ActivityPub do
alias Pleroma.Activity
+ alias Pleroma.Activity.Ir.Topics
alias Pleroma.Config
alias Pleroma.Conversation
alias Pleroma.Notification
@@ -16,7 +17,9 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
alias Pleroma.User
alias Pleroma.Web.ActivityPub.MRF
alias Pleroma.Web.ActivityPub.Transmogrifier
+ alias Pleroma.Web.Streamer
alias Pleroma.Web.WebFinger
+ alias Pleroma.Workers.BackgroundWorker
import Ecto.Query
import Pleroma.Web.ActivityPub.Utils
@@ -145,7 +148,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
activity
end
- PleromaJobQueue.enqueue(:background, Pleroma.Web.RichMedia.Helpers, [:fetch, activity])
+ BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
Notification.create_notifications(activity)
@@ -186,9 +189,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
participations
|> Repo.preload(:user)
- Enum.each(participations, fn participation ->
- Pleroma.Web.Streamer.stream("participation", participation)
- end)
+ Streamer.stream("participation", participations)
end
def stream_out_participations(%Object{data: %{"context" => context}}, user) do
@@ -207,41 +208,15 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
def stream_out_participations(_, _), do: :noop
- def stream_out(activity) do
- if activity.data["type"] in ["Create", "Announce", "Delete"] do
- object = Object.normalize(activity)
- # Do not stream out poll replies
- unless object.data["type"] == "Answer" do
- Pleroma.Web.Streamer.stream("user", activity)
- Pleroma.Web.Streamer.stream("list", activity)
-
- if get_visibility(activity) == "public" do
- Pleroma.Web.Streamer.stream("public", activity)
-
- if activity.local do
- Pleroma.Web.Streamer.stream("public:local", activity)
- end
-
- if activity.data["type"] in ["Create"] do
- object.data
- |> Map.get("tag", [])
- |> Enum.filter(fn tag -> is_bitstring(tag) end)
- |> Enum.each(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end)
-
- if object.data["attachment"] != [] do
- Pleroma.Web.Streamer.stream("public:media", activity)
-
- if activity.local do
- Pleroma.Web.Streamer.stream("public:local:media", activity)
- end
- end
- end
- else
- if get_visibility(activity) == "direct",
- do: Pleroma.Web.Streamer.stream("direct", activity)
- end
- end
- end
+ def stream_out(%Activity{data: %{"type" => data_type}} = activity)
+ when data_type in ["Create", "Announce", "Delete"] do
+ activity
+ |> Topics.get_activity_topics()
+ |> Streamer.stream(activity)
+ end
+
+ def stream_out(_activity) do
+ :noop
end
def create(%{to: to, actor: actor, context: context, object: object} = params, fake \\ false) do
diff --git a/lib/pleroma/web/activity_pub/activity_pub_controller.ex b/lib/pleroma/web/activity_pub/activity_pub_controller.ex
index 705dbc1c2..01b34fb1d 100644
--- a/lib/pleroma/web/activity_pub/activity_pub_controller.ex
+++ b/lib/pleroma/web/activity_pub/activity_pub_controller.ex
@@ -6,6 +6,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do
use Pleroma.Web, :controller
alias Pleroma.Activity
+ alias Pleroma.Delivery
alias Pleroma.Object
alias Pleroma.Object.Fetcher
alias Pleroma.User
@@ -23,7 +24,12 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do
action_fallback(:errors)
- plug(Pleroma.Plugs.Cache, [query_params: false] when action in [:activity, :object])
+ plug(
+ Pleroma.Plugs.Cache,
+ [query_params: false, tracking_fun: &__MODULE__.track_object_fetch/2]
+ when action in [:activity, :object]
+ )
+
plug(Pleroma.Web.FederatingPlug when action in [:inbox, :relay])
plug(:set_requester_reachable when action in [:inbox])
plug(:relay_active? when action in [:relay])
@@ -54,6 +60,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do
%Object{} = object <- Object.get_cached_by_ap_id(ap_id),
{_, true} <- {:public?, Visibility.is_public?(object)} do
conn
+ |> assign(:tracking_fun_data, object.id)
|> set_cache_ttl_for(object)
|> put_resp_content_type("application/activity+json")
|> put_view(ObjectView)
@@ -64,6 +71,16 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do
end
end
+ def track_object_fetch(conn, nil), do: conn
+
+ def track_object_fetch(conn, object_id) do
+ with %{assigns: %{user: %User{id: user_id}}} <- conn do
+ Delivery.create(object_id, user_id)
+ end
+
+ conn
+ end
+
def object_likes(conn, %{"uuid" => uuid, "page" => page}) do
with ap_id <- o_status_url(conn, :object, uuid),
%Object{} = object <- Object.get_cached_by_ap_id(ap_id),
@@ -99,6 +116,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do
%Activity{} = activity <- Activity.normalize(ap_id),
{_, true} <- {:public?, Visibility.is_public?(activity)} do
conn
+ |> maybe_set_tracking_data(activity)
|> set_cache_ttl_for(activity)
|> put_resp_content_type("application/activity+json")
|> put_view(ObjectView)
@@ -109,6 +127,13 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do
end
end
+ defp maybe_set_tracking_data(conn, %Activity{data: %{"type" => "Create"}} = activity) do
+ object_id = Object.normalize(activity).id
+ assign(conn, :tracking_fun_data, object_id)
+ end
+
+ defp maybe_set_tracking_data(conn, _activity), do: conn
+
defp set_cache_ttl_for(conn, %Activity{object: object}) do
set_cache_ttl_for(conn, object)
end
diff --git a/lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex b/lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex
index a179dd54d..26b8539fe 100644
--- a/lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex
+++ b/lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex
@@ -8,6 +8,7 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do
alias Pleroma.HTTP
alias Pleroma.Web.MediaProxy
+ alias Pleroma.Workers.BackgroundWorker
require Logger
@@ -30,7 +31,7 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do
url
|> Enum.each(fn
%{"href" => href} ->
- PleromaJobQueue.enqueue(:background, __MODULE__, [:prefetch, href])
+ BackgroundWorker.enqueue("media_proxy_prefetch", %{"url" => href})
x ->
Logger.debug("Unhandled attachment URL object #{inspect(x)}")
@@ -46,7 +47,7 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do
%{"type" => "Create", "object" => %{"attachment" => attachments} = _object} = message
)
when is_list(attachments) and length(attachments) > 0 do
- PleromaJobQueue.enqueue(:background, __MODULE__, [:preload, message])
+ BackgroundWorker.enqueue("media_proxy_preload", %{"message" => message})
{:ok, message}
end
diff --git a/lib/pleroma/web/activity_pub/publisher.ex b/lib/pleroma/web/activity_pub/publisher.ex
index c97405690..114251b24 100644
--- a/lib/pleroma/web/activity_pub/publisher.ex
+++ b/lib/pleroma/web/activity_pub/publisher.ex
@@ -5,8 +5,10 @@
defmodule Pleroma.Web.ActivityPub.Publisher do
alias Pleroma.Activity
alias Pleroma.Config
+ alias Pleroma.Delivery
alias Pleroma.HTTP
alias Pleroma.Instances
+ alias Pleroma.Object
alias Pleroma.User
alias Pleroma.Web.ActivityPub.Relay
alias Pleroma.Web.ActivityPub.Transmogrifier
@@ -84,6 +86,15 @@ defmodule Pleroma.Web.ActivityPub.Publisher do
end
end
+ def publish_one(%{actor_id: actor_id} = params) do
+ actor = User.get_cached_by_id(actor_id)
+
+ params
+ |> Map.delete(:actor_id)
+ |> Map.put(:actor, actor)
+ |> publish_one()
+ end
+
defp should_federate?(inbox, public) do
if public do
true
@@ -107,7 +118,18 @@ defmodule Pleroma.Web.ActivityPub.Publisher do
{:ok, []}
end
- Pleroma.Web.Salmon.remote_users(actor, activity) ++ followers
+ fetchers =
+ with %Activity{data: %{"type" => "Delete"}} <- activity,
+ %Object{id: object_id} <- Object.normalize(activity),
+ fetchers <- User.get_delivered_users_by_object_id(object_id),
+ _ <- Delivery.delete_all_by_object_id(object_id) do
+ fetchers
+ else
+ _ ->
+ []
+ end
+
+ Pleroma.Web.Salmon.remote_users(actor, activity) ++ followers ++ fetchers
end
defp get_cc_ap_ids(ap_id, recipients) do
@@ -159,7 +181,8 @@ defmodule Pleroma.Web.ActivityPub.Publisher do
Publishes an activity with BCC to all relevant peers.
"""
- def publish(actor, %{data: %{"bcc" => bcc}} = activity) when is_list(bcc) and bcc != [] do
+ def publish(%User{} = actor, %{data: %{"bcc" => bcc}} = activity)
+ when is_list(bcc) and bcc != [] do
public = is_public?(activity)
{:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
@@ -186,7 +209,7 @@ defmodule Pleroma.Web.ActivityPub.Publisher do
Pleroma.Web.Federator.Publisher.enqueue_one(__MODULE__, %{
inbox: inbox,
json: json,
- actor: actor,
+ actor_id: actor.id,
id: activity.data["id"],
unreachable_since: unreachable_since
})
@@ -221,7 +244,7 @@ defmodule Pleroma.Web.ActivityPub.Publisher do
%{
inbox: inbox,
json: json,
- actor: actor,
+ actor_id: actor.id,
id: activity.data["id"],
unreachable_since: unreachable_since
}
diff --git a/lib/pleroma/web/activity_pub/transmogrifier.ex b/lib/pleroma/web/activity_pub/transmogrifier.ex
index 468961bd0..acb3087d0 100644
--- a/lib/pleroma/web/activity_pub/transmogrifier.ex
+++ b/lib/pleroma/web/activity_pub/transmogrifier.ex
@@ -15,6 +15,7 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do
alias Pleroma.Web.ActivityPub.Utils
alias Pleroma.Web.ActivityPub.Visibility
alias Pleroma.Web.Federator
+ alias Pleroma.Workers.TransmogrifierWorker
import Ecto.Query
@@ -185,12 +186,12 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do
|> Map.put("context", replied_object.data["context"] || object["conversation"])
else
e ->
- Logger.error("Couldn't fetch \"#{inspect(in_reply_to_id)}\", error: #{inspect(e)}")
+ Logger.error("Couldn't fetch #{inspect(in_reply_to_id)}, error: #{inspect(e)}")
object
end
e ->
- Logger.error("Couldn't fetch \"#{inspect(in_reply_to_id)}\", error: #{inspect(e)}")
+ Logger.error("Couldn't fetch #{inspect(in_reply_to_id)}, error: #{inspect(e)}")
object
end
else
@@ -1051,7 +1052,7 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do
already_ap <- User.ap_enabled?(user),
{:ok, user} <- user |> User.upgrade_changeset(data) |> User.update_and_set_cache() do
unless already_ap do
- PleromaJobQueue.enqueue(:transmogrifier, __MODULE__, [:user_upgrade, user])
+ TransmogrifierWorker.enqueue("user_upgrade", %{"user_id" => user.id})
end
{:ok, user}
diff --git a/lib/pleroma/web/activity_pub/utils.ex b/lib/pleroma/web/activity_pub/utils.ex
index c9c0c3763..258e56066 100644
--- a/lib/pleroma/web/activity_pub/utils.ex
+++ b/lib/pleroma/web/activity_pub/utils.ex
@@ -85,15 +85,13 @@ defmodule Pleroma.Web.ActivityPub.Utils do
defp extract_list(_), do: []
def maybe_splice_recipient(ap_id, params) do
- need_splice =
+ need_splice? =
!recipient_in_collection(ap_id, params["to"]) &&
!recipient_in_collection(ap_id, params["cc"])
- cc_list = extract_list(params["cc"])
-
- if need_splice do
- params
- |> Map.put("cc", [ap_id | cc_list])
+ if need_splice? do
+ cc_list = extract_list(params["cc"])
+ Map.put(params, "cc", [ap_id | cc_list])
else
params
end
@@ -139,7 +137,7 @@ defmodule Pleroma.Web.ActivityPub.Utils do
"object" => object
}
- Notification.get_notified_from_activity(%Activity{data: fake_create_activity}, false)
+ get_notified_from_object(fake_create_activity)
end
def get_notified_from_object(object) do
@@ -169,14 +167,7 @@ defmodule Pleroma.Web.ActivityPub.Utils do
@spec maybe_federate(any()) :: :ok
def maybe_federate(%Activity{local: true} = activity) do
if Pleroma.Config.get!([:instance, :federating]) do
- priority =
- case activity.data["type"] do
- "Delete" -> 10
- "Create" -> 1
- _ -> 5
- end
-
- Pleroma.Web.Federator.publish(activity, priority)
+ Pleroma.Web.Federator.publish(activity)
end
:ok
@@ -188,9 +179,9 @@ defmodule Pleroma.Web.ActivityPub.Utils do
Adds an id and a published data if they aren't there,
also adds it to an included object
"""
- def lazy_put_activity_defaults(map, fake \\ false) do
+ def lazy_put_activity_defaults(map, fake? \\ false) do
map =
- unless fake do
+ if not fake? do
%{data: %{"id" => context}, id: context_id} = create_context(map["context"])
map
@@ -207,7 +198,7 @@ defmodule Pleroma.Web.ActivityPub.Utils do
end
if is_map(map["object"]) do
- object = lazy_put_object_defaults(map["object"], map, fake)
+ object = lazy_put_object_defaults(map["object"], map, fake?)
%{map | "object" => object}
else
map
@@ -217,9 +208,9 @@ defmodule Pleroma.Web.ActivityPub.Utils do
@doc """
Adds an id and published date if they aren't there.
"""
- def lazy_put_object_defaults(map, activity \\ %{}, fake)
+ def lazy_put_object_defaults(map, activity \\ %{}, fake?)
- def lazy_put_object_defaults(map, activity, true = _fake) do
+ def lazy_put_object_defaults(map, activity, true = _fake?) do
map
|> Map.put_new_lazy("published", &make_date/0)
|> Map.put_new("id", "pleroma:fake_object_id")
@@ -228,7 +219,7 @@ defmodule Pleroma.Web.ActivityPub.Utils do
|> Map.put_new("context_id", activity["context_id"])
end
- def lazy_put_object_defaults(map, activity, _fake) do
+ def lazy_put_object_defaults(map, activity, _fake?) do
map
|> Map.put_new_lazy("id", &generate_object_id/0)
|> Map.put_new_lazy("published", &make_date/0)
@@ -242,9 +233,7 @@ defmodule Pleroma.Web.ActivityPub.Utils do
def insert_full_object(%{"object" => %{"type" => type} = object_data} = map)
when is_map(object_data) and type in @supported_object_types do
with {:ok, object} <- Object.create(object_data) do
- map =
- map
- |> Map.put("object", object.data["id"])
+ map = Map.put(map, "object", object.data["id"])
{:ok, map, object}
end
@@ -263,7 +252,7 @@ defmodule Pleroma.Web.ActivityPub.Utils do
|> Activity.Queries.by_actor()
|> Activity.Queries.by_object_id(id)
|> Activity.Queries.by_type("Like")
- |> Activity.Queries.limit(1)
+ |> limit(1)
|> Repo.one()
end
@@ -380,12 +369,11 @@ defmodule Pleroma.Web.ActivityPub.Utils do
%Activity{data: %{"actor" => actor, "object" => object}} = activity,
state
) do
- with new_data <-
- activity.data
- |> Map.put("state", state),
- changeset <- Changeset.change(activity, data: new_data),
- {:ok, activity} <- Repo.update(changeset),
- _ <- User.set_follow_state_cache(actor, object, state) do
+ new_data = Map.put(activity.data, "state", state)
+ changeset = Changeset.change(activity, data: new_data)
+
+ with {:ok, activity} <- Repo.update(changeset) do
+ User.set_follow_state_cache(actor, object, state)
{:ok, activity}
end
end
@@ -410,28 +398,14 @@ defmodule Pleroma.Web.ActivityPub.Utils do
end
def fetch_latest_follow(%User{ap_id: follower_id}, %User{ap_id: followed_id}) do
- query =
- from(
- activity in Activity,
- where:
- fragment(
- "? ->> 'type' = 'Follow'",
- activity.data
- ),
- where: activity.actor == ^follower_id,
- # this is to use the index
- where:
- fragment(
- "coalesce((?)->'object'->>'id', (?)->>'object') = ?",
- activity.data,
- activity.data,
- ^followed_id
- ),
- order_by: [fragment("? desc nulls last", activity.id)],
- limit: 1
- )
-
- Repo.one(query)
+ "Follow"
+ |> Activity.Queries.by_type()
+ |> where(actor: ^follower_id)
+ # this is to use the index
+ |> Activity.Queries.by_object_id(followed_id)
+ |> order_by([activity], fragment("? desc nulls last", activity.id))
+ |> limit(1)
+ |> Repo.one()
end
#### Announce-related helpers
@@ -439,23 +413,13 @@ defmodule Pleroma.Web.ActivityPub.Utils do
@doc """
Retruns an existing announce activity if the notice has already been announced
"""
- def get_existing_announce(actor, %{data: %{"id" => id}}) do
- query =
- from(
- activity in Activity,
- where: activity.actor == ^actor,
- # this is to use the index
- where:
- fragment(
- "coalesce((?)->'object'->>'id', (?)->>'object') = ?",
- activity.data,
- activity.data,
- ^id
- ),
- where: fragment("(?)->>'type' = 'Announce'", activity.data)
- )
-
- Repo.one(query)
+ def get_existing_announce(actor, %{data: %{"id" => ap_id}}) do
+ "Announce"
+ |> Activity.Queries.by_type()
+ |> where(actor: ^actor)
+ # this is to use the index
+ |> Activity.Queries.by_object_id(ap_id)
+ |> Repo.one()
end
@doc """
@@ -538,11 +502,13 @@ defmodule Pleroma.Web.ActivityPub.Utils do
object
) do
announcements =
- if is_list(object.data["announcements"]), do: object.data["announcements"], else: []
+ if is_list(object.data["announcements"]) do
+ Enum.uniq([actor | object.data["announcements"]])
+ else
+ [actor]
+ end
- with announcements <- [actor | announcements] |> Enum.uniq() do
- update_element_in_object("announcement", announcements, object)
- end
+ update_element_in_object("announcement", announcements, object)
end
def add_announce_to_object(_, object), do: {:ok, object}
@@ -570,28 +536,14 @@ defmodule Pleroma.Web.ActivityPub.Utils do
#### Block-related helpers
def fetch_latest_block(%User{ap_id: blocker_id}, %User{ap_id: blocked_id}) do
- query =
- from(
- activity in Activity,
- where:
- fragment(
- "? ->> 'type' = 'Block'",
- activity.data
- ),
- where: activity.actor == ^blocker_id,
- # this is to use the index
- where:
- fragment(
- "coalesce((?)->'object'->>'id', (?)->>'object') = ?",
- activity.data,
- activity.data,
- ^blocked_id
- ),
- order_by: [fragment("? desc nulls last", activity.id)],
- limit: 1
- )
-
- Repo.one(query)
+ "Block"
+ |> Activity.Queries.by_type()
+ |> where(actor: ^blocker_id)
+ # this is to use the index
+ |> Activity.Queries.by_object_id(blocked_id)
+ |> order_by([activity], fragment("? desc nulls last", activity.id))
+ |> limit(1)
+ |> Repo.one()
end
def make_block_data(blocker, blocked, activity_id) do
@@ -695,11 +647,11 @@ defmodule Pleroma.Web.ActivityPub.Utils do
#### Report-related helpers
def update_report_state(%Activity{} = activity, state) when state in @supported_report_states do
- with new_data <- Map.put(activity.data, "state", state),
- changeset <- Changeset.change(activity, data: new_data),
- {:ok, activity} <- Repo.update(changeset) do
- {:ok, activity}
- end
+ new_data = Map.put(activity.data, "state", state)
+
+ activity
+ |> Changeset.change(data: new_data)
+ |> Repo.update()
end
def update_report_state(_, _), do: {:error, "Unsupported state"}
@@ -766,21 +718,13 @@ defmodule Pleroma.Web.ActivityPub.Utils do
end
def get_existing_votes(actor, %{data: %{"id" => id}}) do
- query =
- from(
- [activity, object: object] in Activity.with_preloaded_object(Activity),
- where: fragment("(?)->>'type' = 'Create'", activity.data),
- where: fragment("(?)->>'actor' = ?", activity.data, ^actor),
- where:
- fragment(
- "(?)->>'inReplyTo' = ?",
- object.data,
- ^to_string(id)
- ),
- where: fragment("(?)->>'type' = 'Answer'", object.data)
- )
-
- Repo.all(query)
+ actor
+ |> Activity.Queries.by_actor()
+ |> Activity.Queries.by_type("Create")
+ |> Activity.with_preloaded_object()
+ |> where([a, object: o], fragment("(?)->>'inReplyTo' = ?", o.data, ^to_string(id)))
+ |> where([a, object: o], fragment("(?)->>'type' = 'Answer'", o.data))
+ |> Repo.all()
end
defp maybe_put(map, _key, nil), do: map
diff --git a/lib/pleroma/web/admin_api/config.ex b/lib/pleroma/web/admin_api/config.ex
index a10cc779b..1917a5580 100644
--- a/lib/pleroma/web/admin_api/config.ex
+++ b/lib/pleroma/web/admin_api/config.ex
@@ -90,6 +90,8 @@ defmodule Pleroma.Web.AdminAPI.Config do
for v <- entity, into: [], do: do_convert(v)
end
+ defp do_convert(%Regex{} = entity), do: inspect(entity)
+
defp do_convert(entity) when is_map(entity) do
for {k, v} <- entity, into: %{}, do: {do_convert(k), do_convert(v)}
end
@@ -122,7 +124,7 @@ defmodule Pleroma.Web.AdminAPI.Config do
def transform(entity), do: :erlang.term_to_binary(entity)
- defp do_transform(%Regex{} = entity) when is_map(entity), do: entity
+ defp do_transform(%Regex{} = entity), do: entity
defp do_transform(%{"tuple" => [":dispatch", [entity]]}) do
{dispatch_settings, []} = do_eval(entity)
@@ -154,8 +156,15 @@ defmodule Pleroma.Web.AdminAPI.Config do
defp do_transform(entity), do: entity
defp do_transform_string("~r/" <> pattern) do
- pattern = String.trim_trailing(pattern, "/")
- ~r/#{pattern}/
+ modificator = String.split(pattern, "/") |> List.last()
+ pattern = String.trim_trailing(pattern, "/" <> modificator)
+
+ case modificator do
+ "" -> ~r/#{pattern}/
+ "i" -> ~r/#{pattern}/i
+ "u" -> ~r/#{pattern}/u
+ "s" -> ~r/#{pattern}/s
+ end
end
defp do_transform_string(":" <> atom), do: String.to_atom(atom)
diff --git a/lib/pleroma/web/controller_helper.ex b/lib/pleroma/web/controller_helper.ex
index eeac9f503..b53a01955 100644
--- a/lib/pleroma/web/controller_helper.ex
+++ b/lib/pleroma/web/controller_helper.ex
@@ -34,79 +34,38 @@ defmodule Pleroma.Web.ControllerHelper do
defp param_to_integer(_, default), do: default
- def add_link_headers(
- conn,
- method,
- activities,
- param \\ nil,
- params \\ %{},
- func3 \\ nil,
- func4 \\ nil
- ) do
- params =
- conn.params
- |> Map.drop(["since_id", "max_id", "min_id"])
- |> Map.merge(params)
+ def add_link_headers(conn, activities, extra_params \\ %{}) do
+ case List.last(activities) do
+ %{id: max_id} ->
+ params =
+ conn.params
+ |> Map.drop(Map.keys(conn.path_params))
+ |> Map.drop(["since_id", "max_id", "min_id"])
+ |> Map.merge(extra_params)
- last = List.last(activities)
+ limit =
+ params
+ |> Map.get("limit", "20")
+ |> String.to_integer()
- func3 = func3 || (&mastodon_api_url/3)
- func4 = func4 || (&mastodon_api_url/4)
+ min_id =
+ if length(activities) <= limit do
+ activities
+ |> List.first()
+ |> Map.get(:id)
+ else
+ activities
+ |> Enum.at(limit * -1)
+ |> Map.get(:id)
+ end
- if last do
- max_id = last.id
+ next_url = current_url(conn, Map.merge(params, %{max_id: max_id}))
+ prev_url = current_url(conn, Map.merge(params, %{min_id: min_id}))
- limit =
- params
- |> Map.get("limit", "20")
- |> String.to_integer()
+ put_resp_header(conn, "link", "<#{next_url}>; rel=\"next\", <#{prev_url}>; rel=\"prev\"")
- min_id =
- if length(activities) <= limit do
- activities
- |> List.first()
- |> Map.get(:id)
- else
- activities
- |> Enum.at(limit * -1)
- |> Map.get(:id)
- end
-
- {next_url, prev_url} =
- if param do
- {
- func4.(
- Pleroma.Web.Endpoint,
- method,
- param,
- Map.merge(params, %{max_id: max_id})
- ),
- func4.(
- Pleroma.Web.Endpoint,
- method,
- param,
- Map.merge(params, %{min_id: min_id})
- )
- }
- else
- {
- func3.(
- Pleroma.Web.Endpoint,
- method,
- Map.merge(params, %{max_id: max_id})
- ),
- func3.(
- Pleroma.Web.Endpoint,
- method,
- Map.merge(params, %{min_id: min_id})
- )
- }
- end
-
- conn
- |> put_resp_header("link", "<#{next_url}>; rel=\"next\", <#{prev_url}>; rel=\"prev\"")
- else
- conn
+ _ ->
+ conn
end
end
end
diff --git a/lib/pleroma/web/federator/federator.ex b/lib/pleroma/web/federator/federator.ex
index f4f9e83e0..1a2da014a 100644
--- a/lib/pleroma/web/federator/federator.ex
+++ b/lib/pleroma/web/federator/federator.ex
@@ -10,16 +10,17 @@ defmodule Pleroma.Web.Federator do
alias Pleroma.Web.ActivityPub.Transmogrifier
alias Pleroma.Web.ActivityPub.Utils
alias Pleroma.Web.Federator.Publisher
- alias Pleroma.Web.Federator.RetryQueue
alias Pleroma.Web.OStatus
alias Pleroma.Web.Websub
+ alias Pleroma.Workers.PublisherWorker
+ alias Pleroma.Workers.ReceiverWorker
+ alias Pleroma.Workers.SubscriberWorker
require Logger
def init do
- # 1 minute
- Process.sleep(1000 * 60)
- refresh_subscriptions()
+ # To do: consider removing this call in favor of scheduled execution (`quantum`-based)
+ refresh_subscriptions(schedule_in: 60)
end
@doc "Addresses [memory leaks on recursive replies fetching](https://git.pleroma.social/pleroma/pleroma/issues/161)"
@@ -37,50 +38,38 @@ defmodule Pleroma.Web.Federator do
# Client API
def incoming_doc(doc) do
- PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_doc, doc])
+ ReceiverWorker.enqueue("incoming_doc", %{"body" => doc})
end
def incoming_ap_doc(params) do
- PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_ap_doc, params])
+ ReceiverWorker.enqueue("incoming_ap_doc", %{"params" => params})
end
- def publish(activity, priority \\ 1) do
- PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish, activity], priority)
+ def publish(%{id: "pleroma:fakeid"} = activity) do
+ perform(:publish, activity)
end
- def verify_websub(websub) do
- PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:verify_websub, websub])
+ def publish(activity) do
+ PublisherWorker.enqueue("publish", %{"activity_id" => activity.id})
end
- def request_subscription(sub) do
- PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:request_subscription, sub])
+ def verify_websub(websub) do
+ SubscriberWorker.enqueue("verify_websub", %{"websub_id" => websub.id})
end
- def refresh_subscriptions do
- PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:refresh_subscriptions])
+ def request_subscription(websub) do
+ SubscriberWorker.enqueue("request_subscription", %{"websub_id" => websub.id})
end
- # Job Worker Callbacks
-
- def perform(:refresh_subscriptions) do
- Logger.debug("Federator running refresh subscriptions")
- Websub.refresh_subscriptions()
-
- spawn(fn ->
- # 6 hours
- Process.sleep(1000 * 60 * 60 * 6)
- refresh_subscriptions()
- end)
+ def refresh_subscriptions(worker_args \\ []) do
+ SubscriberWorker.enqueue("refresh_subscriptions", %{}, worker_args ++ [max_attempts: 1])
end
- def perform(:request_subscription, websub) do
- Logger.debug("Refreshing #{websub.topic}")
+ # Job Worker Callbacks
- with {:ok, websub} <- Websub.request_subscription(websub) do
- Logger.debug("Successfully refreshed #{websub.topic}")
- else
- _e -> Logger.debug("Couldn't refresh #{websub.topic}")
- end
+ @spec perform(atom(), module(), any()) :: {:ok, any()} | {:error, any()}
+ def perform(:publish_one, module, params) do
+ apply(module, :publish_one, [params])
end
def perform(:publish, activity) do
@@ -92,14 +81,6 @@ defmodule Pleroma.Web.Federator do
end
end
- def perform(:verify_websub, websub) do
- Logger.debug(fn ->
- "Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})"
- end)
-
- Websub.verify(websub)
- end
-
def perform(:incoming_doc, doc) do
Logger.info("Got document, trying to parse")
OStatus.handle_incoming(doc)
@@ -130,22 +111,27 @@ defmodule Pleroma.Web.Federator do
end
end
- def perform(
- :publish_single_websub,
- %{xml: _xml, topic: _topic, callback: _callback, secret: _secret} = params
- ) do
- case Websub.publish_one(params) do
- {:ok, _} ->
- :ok
+ def perform(:request_subscription, websub) do
+ Logger.debug("Refreshing #{websub.topic}")
- {:error, _} ->
- RetryQueue.enqueue(params, Websub)
+ with {:ok, websub} <- Websub.request_subscription(websub) do
+ Logger.debug("Successfully refreshed #{websub.topic}")
+ else
+ _e -> Logger.debug("Couldn't refresh #{websub.topic}")
end
end
- def perform(type, _) do
- Logger.debug(fn -> "Unknown task: #{type}" end)
- {:error, "Don't know what to do with this"}
+ def perform(:verify_websub, websub) do
+ Logger.debug(fn ->
+ "Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})"
+ end)
+
+ Websub.verify(websub)
+ end
+
+ def perform(:refresh_subscriptions) do
+ Logger.debug("Federator running refresh subscriptions")
+ Websub.refresh_subscriptions()
end
def ap_enabled_actor(id) do
diff --git a/lib/pleroma/web/federator/publisher.ex b/lib/pleroma/web/federator/publisher.ex
index 70f870244..937064638 100644
--- a/lib/pleroma/web/federator/publisher.ex
+++ b/lib/pleroma/web/federator/publisher.ex
@@ -6,7 +6,7 @@ defmodule Pleroma.Web.Federator.Publisher do
alias Pleroma.Activity
alias Pleroma.Config
alias Pleroma.User
- alias Pleroma.Web.Federator.RetryQueue
+ alias Pleroma.Workers.PublisherWorker
require Logger
@@ -30,23 +30,11 @@ defmodule Pleroma.Web.Federator.Publisher do
Enqueue publishing a single activity.
"""
@spec enqueue_one(module(), Map.t()) :: :ok
- def enqueue_one(module, %{} = params),
- do: PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish_one, module, params])
-
- @spec perform(atom(), module(), any()) :: {:ok, any()} | {:error, any()}
- def perform(:publish_one, module, params) do
- case apply(module, :publish_one, [params]) do
- {:ok, _} ->
- :ok
-
- {:error, _e} ->
- RetryQueue.enqueue(params, module)
- end
- end
-
- def perform(type, _, _) do
- Logger.debug("Unknown task: #{type}")
- {:error, "Don't know what to do with this"}
+ def enqueue_one(module, %{} = params) do
+ PublisherWorker.enqueue(
+ "publish_one",
+ %{"module" => to_string(module), "params" => params}
+ )
end
@doc """
diff --git a/lib/pleroma/web/federator/retry_queue.ex b/lib/pleroma/web/federator/retry_queue.ex
deleted file mode 100644
index 9eab8c218..000000000
--- a/lib/pleroma/web/federator/retry_queue.ex
+++ /dev/null
@@ -1,239 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Web.Federator.RetryQueue do
- use GenServer
-
- require Logger
-
- def init(args) do
- queue_table = :ets.new(:pleroma_retry_queue, [:bag, :protected])
-
- {:ok, %{args | queue_table: queue_table, running_jobs: :sets.new()}}
- end
-
- def start_link(_) do
- enabled =
- if Pleroma.Config.get(:env) == :test,
- do: true,
- else: Pleroma.Config.get([__MODULE__, :enabled], false)
-
- if enabled do
- Logger.info("Starting retry queue")
-
- linkres =
- GenServer.start_link(
- __MODULE__,
- %{delivered: 0, dropped: 0, queue_table: nil, running_jobs: nil},
- name: __MODULE__
- )
-
- maybe_kickoff_timer()
- linkres
- else
- Logger.info("Retry queue disabled")
- :ignore
- end
- end
-
- def enqueue(data, transport, retries \\ 0) do
- GenServer.cast(__MODULE__, {:maybe_enqueue, data, transport, retries + 1})
- end
-
- def get_stats do
- GenServer.call(__MODULE__, :get_stats)
- end
-
- def reset_stats do
- GenServer.call(__MODULE__, :reset_stats)
- end
-
- def get_retry_params(retries) do
- if retries > Pleroma.Config.get([__MODULE__, :max_retries]) do
- {:drop, "Max retries reached"}
- else
- {:retry, growth_function(retries)}
- end
- end
-
- def get_retry_timer_interval do
- Pleroma.Config.get([:retry_queue, :interval], 1000)
- end
-
- defp ets_count_expires(table, current_time) do
- :ets.select_count(
- table,
- [
- {
- {:"$1", :"$2"},
- [{:"=<", :"$1", {:const, current_time}}],
- [true]
- }
- ]
- )
- end
-
- defp ets_pop_n_expired(table, current_time, desired) do
- {popped, _continuation} =
- :ets.select(
- table,
- [
- {
- {:"$1", :"$2"},
- [{:"=<", :"$1", {:const, current_time}}],
- [:"$_"]
- }
- ],
- desired
- )
-
- popped
- |> Enum.each(fn e ->
- :ets.delete_object(table, e)
- end)
-
- popped
- end
-
- def maybe_start_job(running_jobs, queue_table) do
- # we don't want to hit the ets or the DateTime more times than we have to
- # could optimize slightly further by not using the count, and instead grabbing
- # up to N objects early...
- current_time = DateTime.to_unix(DateTime.utc_now())
- n_running_jobs = :sets.size(running_jobs)
-
- if n_running_jobs < Pleroma.Config.get([__MODULE__, :max_jobs]) do
- n_ready_jobs = ets_count_expires(queue_table, current_time)
-
- if n_ready_jobs > 0 do
- # figure out how many we could start
- available_job_slots = Pleroma.Config.get([__MODULE__, :max_jobs]) - n_running_jobs
- start_n_jobs(running_jobs, queue_table, current_time, available_job_slots)
- else
- running_jobs
- end
- else
- running_jobs
- end
- end
-
- defp start_n_jobs(running_jobs, _queue_table, _current_time, 0) do
- running_jobs
- end
-
- defp start_n_jobs(running_jobs, queue_table, current_time, available_job_slots)
- when available_job_slots > 0 do
- candidates = ets_pop_n_expired(queue_table, current_time, available_job_slots)
-
- candidates
- |> List.foldl(running_jobs, fn {_, e}, rj ->
- {:ok, pid} = Task.start(fn -> worker(e) end)
- mref = Process.monitor(pid)
- :sets.add_element(mref, rj)
- end)
- end
-
- def worker({:send, data, transport, retries}) do
- case transport.publish_one(data) do
- {:ok, _} ->
- GenServer.cast(__MODULE__, :inc_delivered)
- :delivered
-
- {:error, _reason} ->
- enqueue(data, transport, retries)
- :retry
- end
- end
-
- def handle_call(:get_stats, _from, %{delivered: delivery_count, dropped: drop_count} = state) do
- {:reply, %{delivered: delivery_count, dropped: drop_count}, state}
- end
-
- def handle_call(:reset_stats, _from, %{delivered: delivery_count, dropped: drop_count} = state) do
- {:reply, %{delivered: delivery_count, dropped: drop_count},
- %{state | delivered: 0, dropped: 0}}
- end
-
- def handle_cast(:reset_stats, state) do
- {:noreply, %{state | delivered: 0, dropped: 0}}
- end
-
- def handle_cast(
- {:maybe_enqueue, data, transport, retries},
- %{dropped: drop_count, queue_table: queue_table, running_jobs: running_jobs} = state
- ) do
- case get_retry_params(retries) do
- {:retry, timeout} ->
- :ets.insert(queue_table, {timeout, {:send, data, transport, retries}})
- running_jobs = maybe_start_job(running_jobs, queue_table)
- {:noreply, %{state | running_jobs: running_jobs}}
-
- {:drop, message} ->
- Logger.debug(message)
- {:noreply, %{state | dropped: drop_count + 1}}
- end
- end
-
- def handle_cast(:kickoff_timer, state) do
- retry_interval = get_retry_timer_interval()
- Process.send_after(__MODULE__, :retry_timer_run, retry_interval)
- {:noreply, state}
- end
-
- def handle_cast(:inc_delivered, %{delivered: delivery_count} = state) do
- {:noreply, %{state | delivered: delivery_count + 1}}
- end
-
- def handle_cast(:inc_dropped, %{dropped: drop_count} = state) do
- {:noreply, %{state | dropped: drop_count + 1}}
- end
-
- def handle_info({:send, data, transport, retries}, %{delivered: delivery_count} = state) do
- case transport.publish_one(data) do
- {:ok, _} ->
- {:noreply, %{state | delivered: delivery_count + 1}}
-
- {:error, _reason} ->
- enqueue(data, transport, retries)
- {:noreply, state}
- end
- end
-
- def handle_info(
- :retry_timer_run,
- %{queue_table: queue_table, running_jobs: running_jobs} = state
- ) do
- maybe_kickoff_timer()
- running_jobs = maybe_start_job(running_jobs, queue_table)
- {:noreply, %{state | running_jobs: running_jobs}}
- end
-
- def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do
- %{running_jobs: running_jobs, queue_table: queue_table} = state
- running_jobs = :sets.del_element(ref, running_jobs)
- running_jobs = maybe_start_job(running_jobs, queue_table)
- {:noreply, %{state | running_jobs: running_jobs}}
- end
-
- def handle_info(unknown, state) do
- Logger.debug("RetryQueue: don't know what to do with #{inspect(unknown)}, ignoring")
- {:noreply, state}
- end
-
- if Pleroma.Config.get(:env) == :test do
- defp growth_function(_retries) do
- _shutit = Pleroma.Config.get([__MODULE__, :initial_timeout])
- DateTime.to_unix(DateTime.utc_now()) - 1
- end
- else
- defp growth_function(retries) do
- round(Pleroma.Config.get([__MODULE__, :initial_timeout]) * :math.pow(retries, 3)) +
- DateTime.to_unix(DateTime.utc_now())
- end
- end
-
- defp maybe_kickoff_timer do
- GenServer.cast(__MODULE__, :kickoff_timer)
- end
-end
diff --git a/lib/pleroma/web/mastodon_api/controllers/mastodon_api_controller.ex b/lib/pleroma/web/mastodon_api/controllers/mastodon_api_controller.ex
index 8dfad7a54..060137b80 100644
--- a/lib/pleroma/web/mastodon_api/controllers/mastodon_api_controller.ex
+++ b/lib/pleroma/web/mastodon_api/controllers/mastodon_api_controller.ex
@@ -6,7 +6,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
use Pleroma.Web, :controller
import Pleroma.Web.ControllerHelper,
- only: [json_response: 3, add_link_headers: 5, add_link_headers: 4, add_link_headers: 3]
+ only: [json_response: 3, add_link_headers: 2, add_link_headers: 3]
alias Ecto.Changeset
alias Pleroma.Activity
@@ -365,7 +365,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
|> Enum.reverse()
conn
- |> add_link_headers(:home_timeline, activities)
+ |> add_link_headers(activities)
|> put_view(StatusView)
|> render("index.json", %{activities: activities, for: user, as: :activity})
end
@@ -384,7 +384,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
|> Enum.reverse()
conn
- |> add_link_headers(:public_timeline, activities, false, %{"local" => local_only})
+ |> add_link_headers(activities, %{"local" => local_only})
|> put_view(StatusView)
|> render("index.json", %{activities: activities, for: user, as: :activity})
end
@@ -398,7 +398,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
activities = ActivityPub.fetch_user_activities(user, reading_user, params)
conn
- |> add_link_headers(:user_statuses, activities, params["id"])
+ |> add_link_headers(activities)
|> put_view(StatusView)
|> render("index.json", %{
activities: activities,
@@ -422,11 +422,25 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
|> Pagination.fetch_paginated(params)
conn
- |> add_link_headers(:dm_timeline, activities)
+ |> add_link_headers(activities)
|> put_view(StatusView)
|> render("index.json", %{activities: activities, for: user, as: :activity})
end
+ def get_statuses(%{assigns: %{user: user}} = conn, %{"ids" => ids}) do
+ limit = 100
+
+ activities =
+ ids
+ |> Enum.take(limit)
+ |> Activity.all_by_ids_with_object()
+ |> Enum.filter(&Visibility.visible_for_user?(&1, user))
+
+ conn
+ |> put_view(StatusView)
+ |> render("index.json", activities: activities, for: user, as: :activity)
+ end
+
def get_status(%{assigns: %{user: user}} = conn, %{"id" => id}) do
with %Activity{} = activity <- Activity.get_by_id_with_object(id),
true <- Visibility.visible_for_user?(activity, user) do
@@ -523,7 +537,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
def scheduled_statuses(%{assigns: %{user: user}} = conn, params) do
with scheduled_activities <- MastodonAPI.get_scheduled_activities(user, params) do
conn
- |> add_link_headers(:scheduled_statuses, scheduled_activities)
+ |> add_link_headers(scheduled_activities)
|> put_view(ScheduledActivityView)
|> render("index.json", %{scheduled_activities: scheduled_activities})
end
@@ -706,7 +720,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
notifications = MastodonAPI.get_notifications(user, params)
conn
- |> add_link_headers(:notifications, notifications)
+ |> add_link_headers(notifications)
|> put_view(NotificationView)
|> render("index.json", %{notifications: notifications, for: user})
end
@@ -828,6 +842,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
def favourited_by(%{assigns: %{user: user}} = conn, %{"id" => id}) do
with %Activity{} = activity <- Activity.get_by_id_with_object(id),
+ {:visible, true} <- {:visible, Visibility.visible_for_user?(activity, user)},
%Object{data: %{"likes" => likes}} <- Object.normalize(activity) do
q = from(u in User, where: u.ap_id in ^likes)
@@ -839,12 +854,14 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
|> put_view(AccountView)
|> render("accounts.json", %{for: user, users: users, as: :user})
else
+ {:visible, false} -> {:error, :not_found}
_ -> json(conn, [])
end
end
def reblogged_by(%{assigns: %{user: user}} = conn, %{"id" => id}) do
with %Activity{} = activity <- Activity.get_by_id_with_object(id),
+ {:visible, true} <- {:visible, Visibility.visible_for_user?(activity, user)},
%Object{data: %{"announcements" => announces}} <- Object.normalize(activity) do
q = from(u in User, where: u.ap_id in ^announces)
@@ -856,6 +873,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
|> put_view(AccountView)
|> render("accounts.json", %{for: user, users: users, as: :user})
else
+ {:visible, false} -> {:error, :not_found}
_ -> json(conn, [])
end
end
@@ -894,7 +912,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
|> Enum.reverse()
conn
- |> add_link_headers(:hashtag_timeline, activities, params["tag"], %{"local" => local_only})
+ |> add_link_headers(activities, %{"local" => local_only})
|> put_view(StatusView)
|> render("index.json", %{activities: activities, for: user, as: :activity})
end
@@ -910,7 +928,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
end
conn
- |> add_link_headers(:followers, followers, user)
+ |> add_link_headers(followers)
|> put_view(AccountView)
|> render("accounts.json", %{for: for_user, users: followers, as: :user})
end
@@ -927,7 +945,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
end
conn
- |> add_link_headers(:following, followers, user)
+ |> add_link_headers(followers)
|> put_view(AccountView)
|> render("accounts.json", %{for: for_user, users: followers, as: :user})
end
@@ -1152,7 +1170,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
|> Enum.reverse()
conn
- |> add_link_headers(:favourites, activities)
+ |> add_link_headers(activities)
|> put_view(StatusView)
|> render("index.json", %{activities: activities, for: user, as: :activity})
end
@@ -1179,7 +1197,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
|> Enum.reverse()
conn
- |> add_link_headers(:favourites, activities)
+ |> add_link_headers(activities)
|> put_view(StatusView)
|> render("index.json", %{activities: activities, for: for_user, as: :activity})
else
@@ -1200,7 +1218,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
|> Enum.map(fn b -> Map.put(b.activity, :bookmark, Map.delete(b, :activity)) end)
conn
- |> add_link_headers(:bookmarks, bookmarks)
+ |> add_link_headers(bookmarks)
|> put_view(StatusView)
|> render("index.json", %{activities: activities, for: user, as: :activity})
end
@@ -1640,7 +1658,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
end)
conn
- |> add_link_headers(:conversations, participations)
+ |> add_link_headers(participations)
|> json(conversations)
end
diff --git a/lib/pleroma/web/mastodon_api/views/status_view.ex b/lib/pleroma/web/mastodon_api/views/status_view.ex
index e71083b91..ef796cddd 100644
--- a/lib/pleroma/web/mastodon_api/views/status_view.ex
+++ b/lib/pleroma/web/mastodon_api/views/status_view.ex
@@ -73,14 +73,12 @@ defmodule Pleroma.Web.MastodonAPI.StatusView do
def render("index.json", opts) do
replied_to_activities = get_replied_to_activities(opts.activities)
- parallel = unless is_nil(opts[:parallel]), do: opts[:parallel], else: true
opts.activities
|> safe_render_many(
StatusView,
"status.json",
- Map.put(opts, :replied_to_activities, replied_to_activities),
- parallel
+ Map.put(opts, :replied_to_activities, replied_to_activities)
)
end
@@ -499,7 +497,7 @@ defmodule Pleroma.Web.MastodonAPI.StatusView do
object_tags = for tag when is_binary(tag) <- object_tags, do: tag
Enum.reduce(object_tags, [], fn tag, tags ->
- tags ++ [%{name: tag, url: "/tag/#{tag}"}]
+ tags ++ [%{name: tag, url: "/tag/#{URI.encode(tag)}"}]
end)
end
diff --git a/lib/pleroma/web/mastodon_api/websocket_handler.ex b/lib/pleroma/web/mastodon_api/websocket_handler.ex
index dbd3542ea..3c26eb406 100644
--- a/lib/pleroma/web/mastodon_api/websocket_handler.ex
+++ b/lib/pleroma/web/mastodon_api/websocket_handler.ex
@@ -8,6 +8,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
alias Pleroma.Repo
alias Pleroma.User
alias Pleroma.Web.OAuth.Token
+ alias Pleroma.Web.Streamer
@behaviour :cowboy_websocket
@@ -24,7 +25,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
]
@anonymous_streams ["public", "public:local", "hashtag"]
- # Handled by periodic keepalive in Pleroma.Web.Streamer.
+ # Handled by periodic keepalive in Pleroma.Web.Streamer.Ping.
@timeout :infinity
def init(%{qs: qs} = req, state) do
@@ -65,7 +66,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
}, topic #{state.topic}"
)
- Pleroma.Web.Streamer.add_socket(state.topic, streamer_socket(state))
+ Streamer.add_socket(state.topic, streamer_socket(state))
{:ok, state}
end
@@ -80,7 +81,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
}, topic #{state.topic || "?"}: #{inspect(reason)}"
)
- Pleroma.Web.Streamer.remove_socket(state.topic, streamer_socket(state))
+ Streamer.remove_socket(state.topic, streamer_socket(state))
:ok
end
diff --git a/lib/pleroma/web/oauth/token/clean_worker.ex b/lib/pleroma/web/oauth/token/clean_worker.ex
index f50098302..eb94bf86f 100644
--- a/lib/pleroma/web/oauth/token/clean_worker.ex
+++ b/lib/pleroma/web/oauth/token/clean_worker.ex
@@ -17,6 +17,7 @@ defmodule Pleroma.Web.OAuth.Token.CleanWorker do
)
alias Pleroma.Web.OAuth.Token
+ alias Pleroma.Workers.BackgroundWorker
def start_link(_), do: GenServer.start_link(__MODULE__, %{})
@@ -27,9 +28,11 @@ defmodule Pleroma.Web.OAuth.Token.CleanWorker do
@doc false
def handle_info(:perform, state) do
- Token.delete_expired_tokens()
+ BackgroundWorker.enqueue("clean_expired_tokens", %{})
Process.send_after(self(), :perform, @interval)
{:noreply, state}
end
+
+ def perform(:clean), do: Token.delete_expired_tokens()
end
diff --git a/lib/pleroma/web/pleroma_api/pleroma_api_controller.ex b/lib/pleroma/web/pleroma_api/pleroma_api_controller.ex
index f4df3b024..d17ccf84d 100644
--- a/lib/pleroma/web/pleroma_api/pleroma_api_controller.ex
+++ b/lib/pleroma/web/pleroma_api/pleroma_api_controller.ex
@@ -5,7 +5,7 @@
defmodule Pleroma.Web.PleromaAPI.PleromaAPIController do
use Pleroma.Web, :controller
- import Pleroma.Web.ControllerHelper, only: [add_link_headers: 7]
+ import Pleroma.Web.ControllerHelper, only: [add_link_headers: 2]
alias Pleroma.Conversation.Participation
alias Pleroma.Notification
@@ -27,31 +27,22 @@ defmodule Pleroma.Web.PleromaAPI.PleromaAPIController do
%{assigns: %{user: user}} = conn,
%{"id" => participation_id} = params
) do
- params =
- params
- |> Map.put("blocking_user", user)
- |> Map.put("muting_user", user)
- |> Map.put("user", user)
-
- participation =
- participation_id
- |> Participation.get(preload: [:conversation])
+ participation = Participation.get(participation_id, preload: [:conversation])
if user.id == participation.user_id do
+ params =
+ params
+ |> Map.put("blocking_user", user)
+ |> Map.put("muting_user", user)
+ |> Map.put("user", user)
+
activities =
participation.conversation.ap_id
|> ActivityPub.fetch_activities_for_context(params)
|> Enum.reverse()
conn
- |> add_link_headers(
- :conversation_statuses,
- activities,
- participation_id,
- params,
- nil,
- &pleroma_api_url/4
- )
+ |> add_link_headers(activities)
|> put_view(StatusView)
|> render("index.json", %{activities: activities, for: user, as: :activity})
end
diff --git a/lib/pleroma/web/push/push.ex b/lib/pleroma/web/push/push.ex
index 729dad02a..7ef1532ac 100644
--- a/lib/pleroma/web/push/push.ex
+++ b/lib/pleroma/web/push/push.ex
@@ -3,7 +3,7 @@
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.Push do
- alias Pleroma.Web.Push.Impl
+ alias Pleroma.Workers.WebPusherWorker
require Logger
@@ -31,6 +31,7 @@ defmodule Pleroma.Web.Push do
end
end
- def send(notification),
- do: PleromaJobQueue.enqueue(:web_push, Impl, [notification])
+ def send(notification) do
+ WebPusherWorker.enqueue("web_push", %{"notification_id" => notification.id})
+ end
end
diff --git a/lib/pleroma/web/rich_media/parser.ex b/lib/pleroma/web/rich_media/parser.ex
index f5f9e358c..c06b0a0f2 100644
--- a/lib/pleroma/web/rich_media/parser.ex
+++ b/lib/pleroma/web/rich_media/parser.ex
@@ -81,6 +81,7 @@ defmodule Pleroma.Web.RichMedia.Parser do
{:ok, %Tesla.Env{body: html}} = Pleroma.HTTP.get(url, [], adapter: @hackney_options)
html
+ |> parse_html
|> maybe_parse()
|> Map.put(:url, url)
|> clean_parsed_data()
@@ -91,6 +92,8 @@ defmodule Pleroma.Web.RichMedia.Parser do
end
end
+ defp parse_html(html), do: Floki.parse(html)
+
defp maybe_parse(html) do
Enum.reduce_while(parsers(), %{}, fn parser, acc ->
case parser.parse(html, acc) do
@@ -100,7 +103,8 @@ defmodule Pleroma.Web.RichMedia.Parser do
end)
end
- defp check_parsed_data(%{title: title} = data) when is_binary(title) and byte_size(title) > 0 do
+ defp check_parsed_data(%{title: title} = data)
+ when is_binary(title) and byte_size(title) > 0 do
{:ok, data}
end
diff --git a/lib/pleroma/web/router.ex b/lib/pleroma/web/router.ex
index cfb973f53..401133bf3 100644
--- a/lib/pleroma/web/router.ex
+++ b/lib/pleroma/web/router.ex
@@ -135,6 +135,7 @@ defmodule Pleroma.Web.Router do
pipeline :http_signature do
plug(Pleroma.Web.Plugs.HTTPSignaturePlug)
+ plug(Pleroma.Web.Plugs.MappedSignatureToIdentityPlug)
end
scope "/api/pleroma", Pleroma.Web.TwitterAPI do
@@ -224,6 +225,7 @@ defmodule Pleroma.Web.Router do
scope [] do
pipe_through(:oauth_write)
+ post("/change_email", UtilController, :change_email)
post("/change_password", UtilController, :change_password)
post("/delete_account", UtilController, :delete_account)
put("/notification_settings", UtilController, :update_notificaton_settings)
@@ -443,6 +445,7 @@ defmodule Pleroma.Web.Router do
get("/timelines/tag/:tag", MastodonAPIController, :hashtag_timeline)
get("/timelines/list/:list_id", MastodonAPIController, :list_timeline)
+ get("/statuses", MastodonAPIController, :get_statuses)
get("/statuses/:id", MastodonAPIController, :get_status)
get("/statuses/:id/context", MastodonAPIController, :get_context)
@@ -512,6 +515,7 @@ defmodule Pleroma.Web.Router do
scope "/", Pleroma.Web do
pipe_through(:ostatus)
+ pipe_through(:http_signature)
get("/objects/:uuid", OStatus.OStatusController, :object)
get("/activities/:uuid", OStatus.OStatusController, :activity)
diff --git a/lib/pleroma/web/salmon/salmon.ex b/lib/pleroma/web/salmon/salmon.ex
index 9b01ebcc6..8ba7380c0 100644
--- a/lib/pleroma/web/salmon/salmon.ex
+++ b/lib/pleroma/web/salmon/salmon.ex
@@ -170,6 +170,15 @@ defmodule Pleroma.Web.Salmon do
end
end
+ def publish_one(%{recipient_id: recipient_id} = params) do
+ recipient = User.get_cached_by_id(recipient_id)
+
+ params
+ |> Map.delete(:recipient_id)
+ |> Map.put(:recipient, recipient)
+ |> publish_one()
+ end
+
def publish_one(_), do: :noop
@supported_activities [
@@ -218,7 +227,7 @@ defmodule Pleroma.Web.Salmon do
Logger.debug(fn -> "Sending Salmon to #{remote_user.ap_id}" end)
Publisher.enqueue_one(__MODULE__, %{
- recipient: remote_user,
+ recipient_id: remote_user.id,
feed: feed,
unreachable_since: reachable_urls_metadata[remote_user.info.salmon]
})
diff --git a/lib/pleroma/web/streamer.ex b/lib/pleroma/web/streamer.ex
deleted file mode 100644
index 587c43f40..000000000
--- a/lib/pleroma/web/streamer.ex
+++ /dev/null
@@ -1,318 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Web.Streamer do
- use GenServer
- require Logger
- alias Pleroma.Activity
- alias Pleroma.Config
- alias Pleroma.Conversation.Participation
- alias Pleroma.Notification
- alias Pleroma.Object
- alias Pleroma.User
- alias Pleroma.Web.ActivityPub.ActivityPub
- alias Pleroma.Web.ActivityPub.Visibility
- alias Pleroma.Web.CommonAPI
- alias Pleroma.Web.MastodonAPI.NotificationView
-
- @keepalive_interval :timer.seconds(30)
-
- def start_link(_) do
- GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
- end
-
- def add_socket(topic, socket) do
- GenServer.cast(__MODULE__, %{action: :add, socket: socket, topic: topic})
- end
-
- def remove_socket(topic, socket) do
- GenServer.cast(__MODULE__, %{action: :remove, socket: socket, topic: topic})
- end
-
- def stream(topic, item) do
- GenServer.cast(__MODULE__, %{action: :stream, topic: topic, item: item})
- end
-
- def init(args) do
- Process.send_after(self(), %{action: :ping}, @keepalive_interval)
-
- {:ok, args}
- end
-
- def handle_info(%{action: :ping}, topics) do
- topics
- |> Map.values()
- |> List.flatten()
- |> Enum.each(fn socket ->
- Logger.debug("Sending keepalive ping")
- send(socket.transport_pid, {:text, ""})
- end)
-
- Process.send_after(self(), %{action: :ping}, @keepalive_interval)
-
- {:noreply, topics}
- end
-
- def handle_cast(%{action: :stream, topic: "direct", item: item}, topics) do
- recipient_topics =
- User.get_recipients_from_activity(item)
- |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
-
- Enum.each(recipient_topics || [], fn user_topic ->
- Logger.debug("Trying to push direct message to #{user_topic}\n\n")
- push_to_socket(topics, user_topic, item)
- end)
-
- {:noreply, topics}
- end
-
- def handle_cast(%{action: :stream, topic: "participation", item: participation}, topics) do
- user_topic = "direct:#{participation.user_id}"
- Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
-
- push_to_socket(topics, user_topic, participation)
-
- {:noreply, topics}
- end
-
- def handle_cast(%{action: :stream, topic: "list", item: item}, topics) do
- # filter the recipient list if the activity is not public, see #270.
- recipient_lists =
- case Visibility.is_public?(item) do
- true ->
- Pleroma.List.get_lists_from_activity(item)
-
- _ ->
- Pleroma.List.get_lists_from_activity(item)
- |> Enum.filter(fn list ->
- owner = User.get_cached_by_id(list.user_id)
-
- Visibility.visible_for_user?(item, owner)
- end)
- end
-
- recipient_topics =
- recipient_lists
- |> Enum.map(fn %{id: id} -> "list:#{id}" end)
-
- Enum.each(recipient_topics || [], fn list_topic ->
- Logger.debug("Trying to push message to #{list_topic}\n\n")
- push_to_socket(topics, list_topic, item)
- end)
-
- {:noreply, topics}
- end
-
- def handle_cast(
- %{action: :stream, topic: topic, item: %Notification{} = item},
- topics
- )
- when topic in ["user", "user:notification"] do
- topics
- |> Map.get("#{topic}:#{item.user_id}", [])
- |> Enum.each(fn socket ->
- with %User{} = user <- User.get_cached_by_ap_id(socket.assigns[:user].ap_id),
- true <- should_send?(user, item) do
- send(
- socket.transport_pid,
- {:text, represent_notification(socket.assigns[:user], item)}
- )
- end
- end)
-
- {:noreply, topics}
- end
-
- def handle_cast(%{action: :stream, topic: "user", item: item}, topics) do
- Logger.debug("Trying to push to users")
-
- recipient_topics =
- User.get_recipients_from_activity(item)
- |> Enum.map(fn %{id: id} -> "user:#{id}" end)
-
- Enum.each(recipient_topics, fn topic ->
- push_to_socket(topics, topic, item)
- end)
-
- {:noreply, topics}
- end
-
- def handle_cast(%{action: :stream, topic: topic, item: item}, topics) do
- Logger.debug("Trying to push to #{topic}")
- Logger.debug("Pushing item to #{topic}")
- push_to_socket(topics, topic, item)
- {:noreply, topics}
- end
-
- def handle_cast(%{action: :add, topic: topic, socket: socket}, sockets) do
- topic = internal_topic(topic, socket)
- sockets_for_topic = sockets[topic] || []
- sockets_for_topic = Enum.uniq([socket | sockets_for_topic])
- sockets = Map.put(sockets, topic, sockets_for_topic)
- Logger.debug("Got new conn for #{topic}")
- {:noreply, sockets}
- end
-
- def handle_cast(%{action: :remove, topic: topic, socket: socket}, sockets) do
- topic = internal_topic(topic, socket)
- sockets_for_topic = sockets[topic] || []
- sockets_for_topic = List.delete(sockets_for_topic, socket)
- sockets = Map.put(sockets, topic, sockets_for_topic)
- Logger.debug("Removed conn for #{topic}")
- {:noreply, sockets}
- end
-
- def handle_cast(m, state) do
- Logger.info("Unknown: #{inspect(m)}, #{inspect(state)}")
- {:noreply, state}
- end
-
- defp represent_update(%Activity{} = activity, %User{} = user) do
- %{
- event: "update",
- payload:
- Pleroma.Web.MastodonAPI.StatusView.render(
- "status.json",
- activity: activity,
- for: user
- )
- |> Jason.encode!()
- }
- |> Jason.encode!()
- end
-
- defp represent_update(%Activity{} = activity) do
- %{
- event: "update",
- payload:
- Pleroma.Web.MastodonAPI.StatusView.render(
- "status.json",
- activity: activity
- )
- |> Jason.encode!()
- }
- |> Jason.encode!()
- end
-
- def represent_conversation(%Participation{} = participation) do
- %{
- event: "conversation",
- payload:
- Pleroma.Web.MastodonAPI.ConversationView.render("participation.json", %{
- participation: participation,
- for: participation.user
- })
- |> Jason.encode!()
- }
- |> Jason.encode!()
- end
-
- @spec represent_notification(User.t(), Notification.t()) :: binary()
- defp represent_notification(%User{} = user, %Notification{} = notify) do
- %{
- event: "notification",
- payload:
- NotificationView.render(
- "show.json",
- %{notification: notify, for: user}
- )
- |> Jason.encode!()
- }
- |> Jason.encode!()
- end
-
- defp should_send?(%User{} = user, %Activity{} = item) do
- blocks = user.info.blocks || []
- mutes = user.info.mutes || []
- reblog_mutes = user.info.muted_reblogs || []
- domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.info.domain_blocks)
-
- with parent when not is_nil(parent) <- Object.normalize(item),
- true <- Enum.all?([blocks, mutes, reblog_mutes], &(item.actor not in &1)),
- true <- Enum.all?([blocks, mutes], &(parent.data["actor"] not in &1)),
- %{host: item_host} <- URI.parse(item.actor),
- %{host: parent_host} <- URI.parse(parent.data["actor"]),
- false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
- false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
- true <- thread_containment(item, user),
- false <- CommonAPI.thread_muted?(user, item) do
- true
- else
- _ -> false
- end
- end
-
- defp should_send?(%User{} = user, %Notification{activity: activity}) do
- should_send?(user, activity)
- end
-
- def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do
- Enum.each(topics[topic] || [], fn socket ->
- # Get the current user so we have up-to-date blocks etc.
- if socket.assigns[:user] do
- user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id)
-
- if should_send?(user, item) do
- send(socket.transport_pid, {:text, represent_update(item, user)})
- end
- else
- send(socket.transport_pid, {:text, represent_update(item)})
- end
- end)
- end
-
- def push_to_socket(topics, topic, %Participation{} = participation) do
- Enum.each(topics[topic] || [], fn socket ->
- send(socket.transport_pid, {:text, represent_conversation(participation)})
- end)
- end
-
- def push_to_socket(topics, topic, %Activity{
- data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
- }) do
- Enum.each(topics[topic] || [], fn socket ->
- send(
- socket.transport_pid,
- {:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()}
- )
- end)
- end
-
- def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
-
- def push_to_socket(topics, topic, item) do
- Enum.each(topics[topic] || [], fn socket ->
- # Get the current user so we have up-to-date blocks etc.
- if socket.assigns[:user] do
- user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id)
- blocks = user.info.blocks || []
- mutes = user.info.mutes || []
-
- with true <- Enum.all?([blocks, mutes], &(item.actor not in &1)),
- true <- thread_containment(item, user) do
- send(socket.transport_pid, {:text, represent_update(item, user)})
- end
- else
- send(socket.transport_pid, {:text, represent_update(item)})
- end
- end)
- end
-
- defp internal_topic(topic, socket) when topic in ~w[user user:notification direct] do
- "#{topic}:#{socket.assigns[:user].id}"
- end
-
- defp internal_topic(topic, _), do: topic
-
- @spec thread_containment(Activity.t(), User.t()) :: boolean()
- defp thread_containment(_activity, %User{info: %{skip_thread_containment: true}}), do: true
-
- defp thread_containment(activity, user) do
- if Config.get([:instance, :skip_thread_containment]) do
- true
- else
- ActivityPub.contain_activity(activity, user)
- end
- end
-end
diff --git a/lib/pleroma/web/streamer/ping.ex b/lib/pleroma/web/streamer/ping.ex
new file mode 100644
index 000000000..f77cbb95c
--- /dev/null
+++ b/lib/pleroma/web/streamer/ping.ex
@@ -0,0 +1,33 @@
+defmodule Pleroma.Web.Streamer.Ping do
+ use GenServer
+ require Logger
+
+ alias Pleroma.Web.Streamer.State
+ alias Pleroma.Web.Streamer.StreamerSocket
+
+ @keepalive_interval :timer.seconds(30)
+
+ def start_link(opts) do
+ ping_interval = Keyword.get(opts, :ping_interval, @keepalive_interval)
+ GenServer.start_link(__MODULE__, %{ping_interval: ping_interval}, name: __MODULE__)
+ end
+
+ def init(%{ping_interval: ping_interval} = args) do
+ Process.send_after(self(), :ping, ping_interval)
+ {:ok, args}
+ end
+
+ def handle_info(:ping, %{ping_interval: ping_interval} = state) do
+ State.get_sockets()
+ |> Map.values()
+ |> List.flatten()
+ |> Enum.each(fn %StreamerSocket{transport_pid: transport_pid} ->
+ Logger.debug("Sending keepalive ping")
+ send(transport_pid, {:text, ""})
+ end)
+
+ Process.send_after(self(), :ping, ping_interval)
+
+ {:noreply, state}
+ end
+end
diff --git a/lib/pleroma/web/streamer/state.ex b/lib/pleroma/web/streamer/state.ex
new file mode 100644
index 000000000..7b5199068
--- /dev/null
+++ b/lib/pleroma/web/streamer/state.ex
@@ -0,0 +1,68 @@
+defmodule Pleroma.Web.Streamer.State do
+ use GenServer
+ require Logger
+
+ alias Pleroma.Web.Streamer.StreamerSocket
+
+ def start_link(_) do
+ GenServer.start_link(__MODULE__, %{sockets: %{}}, name: __MODULE__)
+ end
+
+ def add_socket(topic, socket) do
+ GenServer.call(__MODULE__, {:add, socket, topic})
+ end
+
+ def remove_socket(topic, socket) do
+ GenServer.call(__MODULE__, {:remove, socket, topic})
+ end
+
+ def get_sockets do
+ %{sockets: stream_sockets} = GenServer.call(__MODULE__, :get_state)
+ stream_sockets
+ end
+
+ def init(init_arg) do
+ {:ok, init_arg}
+ end
+
+ def handle_call(:get_state, _from, state) do
+ {:reply, state, state}
+ end
+
+ def handle_call({:add, socket, topic}, _from, %{sockets: sockets} = state) do
+ internal_topic = internal_topic(topic, socket)
+ stream_socket = StreamerSocket.from_socket(socket)
+
+ sockets_for_topic =
+ sockets
+ |> Map.get(internal_topic, [])
+ |> List.insert_at(0, stream_socket)
+ |> Enum.uniq()
+
+ state = put_in(state, [:sockets, internal_topic], sockets_for_topic)
+ Logger.debug("Got new conn for #{topic}")
+ {:reply, state, state}
+ end
+
+ def handle_call({:remove, socket, topic}, _from, %{sockets: sockets} = state) do
+ internal_topic = internal_topic(topic, socket)
+ stream_socket = StreamerSocket.from_socket(socket)
+
+ sockets_for_topic =
+ sockets
+ |> Map.get(internal_topic, [])
+ |> List.delete(stream_socket)
+
+ state = Kernel.put_in(state, [:sockets, internal_topic], sockets_for_topic)
+ {:reply, state, state}
+ end
+
+ defp internal_topic(topic, socket)
+ when topic in ~w[user user:notification direct] do
+ "#{topic}:#{socket.assigns[:user].id}"
+ end
+
+ defp internal_topic(topic, _) do
+ topic
+ end
+end
diff --git a/lib/pleroma/web/streamer/streamer.ex b/lib/pleroma/web/streamer/streamer.ex
new file mode 100644
index 000000000..8cf719277
--- /dev/null
+++ b/lib/pleroma/web/streamer/streamer.ex
@@ -0,0 +1,55 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Web.Streamer do
+ alias Pleroma.Web.Streamer.State
+ alias Pleroma.Web.Streamer.Worker
+
+ @timeout 60_000
+ @mix_env Mix.env()
+
+ def add_socket(topic, socket) do
+ State.add_socket(topic, socket)
+ end
+
+ def remove_socket(topic, socket) do
+ State.remove_socket(topic, socket)
+ end
+
+ def get_sockets do
+ State.get_sockets()
+ end
+
+ def stream(topics, items) do
+ if should_send?() do
+ Task.async(fn ->
+ :poolboy.transaction(
+ :streamer_worker,
+ &Worker.stream(&1, topics, items),
+ @timeout
+ )
+ end)
+ end
+ end
+
+ def supervisor, do: Pleroma.Web.Streamer.Supervisor
+
+ defp should_send? do
+ handle_should_send(@mix_env)
+ end
+
+ defp handle_should_send(:test) do
+ case Process.whereis(:streamer_worker) do
+ nil ->
+ false
+
+ pid ->
+ Process.alive?(pid)
+ end
+ end
+
+ defp handle_should_send(_) do
+ true
+ end
+end
diff --git a/lib/pleroma/web/streamer/streamer_socket.ex b/lib/pleroma/web/streamer/streamer_socket.ex
new file mode 100644
index 000000000..f006c0306
--- /dev/null
+++ b/lib/pleroma/web/streamer/streamer_socket.ex
@@ -0,0 +1,31 @@
+defmodule Pleroma.Web.Streamer.StreamerSocket do
+ defstruct transport_pid: nil, user: nil
+
+ alias Pleroma.User
+ alias Pleroma.Web.Streamer.StreamerSocket
+
+ def from_socket(%{
+ transport_pid: transport_pid,
+ assigns: %{user: nil}
+ }) do
+ %StreamerSocket{
+ transport_pid: transport_pid
+ }
+ end
+
+ def from_socket(%{
+ transport_pid: transport_pid,
+ assigns: %{user: %User{} = user}
+ }) do
+ %StreamerSocket{
+ transport_pid: transport_pid,
+ user: user
+ }
+ end
+
+ def from_socket(%{transport_pid: transport_pid}) do
+ %StreamerSocket{
+ transport_pid: transport_pid
+ }
+ end
+end
diff --git a/lib/pleroma/web/streamer/supervisor.ex b/lib/pleroma/web/streamer/supervisor.ex
new file mode 100644
index 000000000..6afe19323
--- /dev/null
+++ b/lib/pleroma/web/streamer/supervisor.ex
@@ -0,0 +1,33 @@
+defmodule Pleroma.Web.Streamer.Supervisor do
+ use Supervisor
+
+ def start_link(opts) do
+ Supervisor.start_link(__MODULE__, opts, name: __MODULE__)
+ end
+
+ def init(args) do
+ children = [
+ {Pleroma.Web.Streamer.State, args},
+ {Pleroma.Web.Streamer.Ping, args},
+ :poolboy.child_spec(:streamer_worker, poolboy_config())
+ ]
+
+ opts = [strategy: :one_for_one, name: Pleroma.Web.Streamer.Supervisor]
+ Supervisor.init(children, opts)
+ end
+
+ defp poolboy_config do
+ opts =
+ Pleroma.Config.get(:streamer,
+ workers: 3,
+ overflow_workers: 2
+ )
+
+ [
+ {:name, {:local, :streamer_worker}},
+ {:worker_module, Pleroma.Web.Streamer.Worker},
+ {:size, opts[:workers]},
+ {:max_overflow, opts[:overflow_workers]}
+ ]
+ end
+end
diff --git a/lib/pleroma/web/streamer/worker.ex b/lib/pleroma/web/streamer/worker.ex
new file mode 100644
index 000000000..5804508eb
--- /dev/null
+++ b/lib/pleroma/web/streamer/worker.ex
@@ -0,0 +1,220 @@
+defmodule Pleroma.Web.Streamer.Worker do
+ use GenServer
+
+ require Logger
+
+ alias Pleroma.Activity
+ alias Pleroma.Config
+ alias Pleroma.Conversation.Participation
+ alias Pleroma.Notification
+ alias Pleroma.Object
+ alias Pleroma.User
+ alias Pleroma.Web.ActivityPub.ActivityPub
+ alias Pleroma.Web.ActivityPub.Visibility
+ alias Pleroma.Web.CommonAPI
+ alias Pleroma.Web.Streamer.State
+ alias Pleroma.Web.Streamer.StreamerSocket
+ alias Pleroma.Web.StreamerView
+
+ def start_link(_) do
+ GenServer.start_link(__MODULE__, %{}, [])
+ end
+
+ def init(init_arg) do
+ {:ok, init_arg}
+ end
+
+ def stream(pid, topics, items) do
+ GenServer.call(pid, {:stream, topics, items})
+ end
+
+ def handle_call({:stream, topics, item}, _from, state) when is_list(topics) do
+ Enum.each(topics, fn t ->
+ do_stream(%{topic: t, item: item})
+ end)
+
+ {:reply, state, state}
+ end
+
+ def handle_call({:stream, topic, items}, _from, state) when is_list(items) do
+ Enum.each(items, fn i ->
+ do_stream(%{topic: topic, item: i})
+ end)
+
+ {:reply, state, state}
+ end
+
+ def handle_call({:stream, topic, item}, _from, state) do
+ do_stream(%{topic: topic, item: item})
+
+ {:reply, state, state}
+ end
+
+ defp do_stream(%{topic: "direct", item: item}) do
+ recipient_topics =
+ User.get_recipients_from_activity(item)
+ |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
+
+ Enum.each(recipient_topics, fn user_topic ->
+ Logger.debug("Trying to push direct message to #{user_topic}\n\n")
+ push_to_socket(State.get_sockets(), user_topic, item)
+ end)
+ end
+
+ defp do_stream(%{topic: "participation", item: participation}) do
+ user_topic = "direct:#{participation.user_id}"
+ Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
+
+ push_to_socket(State.get_sockets(), user_topic, participation)
+ end
+
+ defp do_stream(%{topic: "list", item: item}) do
+ # filter the recipient list if the activity is not public, see #270.
+ recipient_lists =
+ case Visibility.is_public?(item) do
+ true ->
+ Pleroma.List.get_lists_from_activity(item)
+
+ _ ->
+ Pleroma.List.get_lists_from_activity(item)
+ |> Enum.filter(fn list ->
+ owner = User.get_cached_by_id(list.user_id)
+
+ Visibility.visible_for_user?(item, owner)
+ end)
+ end
+
+ recipient_topics =
+ recipient_lists
+ |> Enum.map(fn %{id: id} -> "list:#{id}" end)
+
+ Enum.each(recipient_topics, fn list_topic ->
+ Logger.debug("Trying to push message to #{list_topic}\n\n")
+ push_to_socket(State.get_sockets(), list_topic, item)
+ end)
+ end
+
+ defp do_stream(%{topic: topic, item: %Notification{} = item})
+ when topic in ["user", "user:notification"] do
+ State.get_sockets()
+ |> Map.get("#{topic}:#{item.user_id}", [])
+ |> Enum.each(fn %StreamerSocket{transport_pid: transport_pid, user: socket_user} ->
+ with %User{} = user <- User.get_cached_by_ap_id(socket_user.ap_id),
+ true <- should_send?(user, item) do
+ send(transport_pid, {:text, StreamerView.render("notification.json", socket_user, item)})
+ end
+ end)
+ end
+
+ defp do_stream(%{topic: "user", item: item}) do
+ Logger.debug("Trying to push to users")
+
+ recipient_topics =
+ User.get_recipients_from_activity(item)
+ |> Enum.map(fn %{id: id} -> "user:#{id}" end)
+
+ Enum.each(recipient_topics, fn topic ->
+ push_to_socket(State.get_sockets(), topic, item)
+ end)
+ end
+
+ defp do_stream(%{topic: topic, item: item}) do
+ Logger.debug("Trying to push to #{topic}")
+ Logger.debug("Pushing item to #{topic}")
+ push_to_socket(State.get_sockets(), topic, item)
+ end
+
+ defp should_send?(%User{} = user, %Activity{} = item) do
+ blocks = user.info.blocks || []
+ mutes = user.info.mutes || []
+ reblog_mutes = user.info.muted_reblogs || []
+ domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.info.domain_blocks)
+
+ with parent when not is_nil(parent) <- Object.normalize(item),
+ true <- Enum.all?([blocks, mutes, reblog_mutes], &(item.actor not in &1)),
+ true <- Enum.all?([blocks, mutes], &(parent.data["actor"] not in &1)),
+ %{host: item_host} <- URI.parse(item.actor),
+ %{host: parent_host} <- URI.parse(parent.data["actor"]),
+ false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
+ false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
+ true <- thread_containment(item, user),
+ false <- CommonAPI.thread_muted?(user, item) do
+ true
+ else
+ _ -> false
+ end
+ end
+
+ defp should_send?(%User{} = user, %Notification{activity: activity}) do
+ should_send?(user, activity)
+ end
+
+ def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do
+ Enum.each(topics[topic] || [], fn %StreamerSocket{
+ transport_pid: transport_pid,
+ user: socket_user
+ } ->
+ # Get the current user so we have up-to-date blocks etc.
+ if socket_user do
+ user = User.get_cached_by_ap_id(socket_user.ap_id)
+
+ if should_send?(user, item) do
+ send(transport_pid, {:text, StreamerView.render("update.json", item, user)})
+ end
+ else
+ send(transport_pid, {:text, StreamerView.render("update.json", item)})
+ end
+ end)
+ end
+
+ def push_to_socket(topics, topic, %Participation{} = participation) do
+ Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
+ send(transport_pid, {:text, StreamerView.render("conversation.json", participation)})
+ end)
+ end
+
+ def push_to_socket(topics, topic, %Activity{
+ data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
+ }) do
+ Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
+ send(
+ transport_pid,
+ {:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()}
+ )
+ end)
+ end
+
+ def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
+
+ def push_to_socket(topics, topic, item) do
+ Enum.each(topics[topic] || [], fn %StreamerSocket{
+ transport_pid: transport_pid,
+ user: socket_user
+ } ->
+ # Get the current user so we have up-to-date blocks etc.
+ if socket_user do
+ user = User.get_cached_by_ap_id(socket_user.ap_id)
+ blocks = user.info.blocks || []
+ mutes = user.info.mutes || []
+
+ with true <- Enum.all?([blocks, mutes], &(item.actor not in &1)),
+ true <- thread_containment(item, user) do
+ send(transport_pid, {:text, StreamerView.render("update.json", item, user)})
+ end
+ else
+ send(transport_pid, {:text, StreamerView.render("update.json", item)})
+ end
+ end)
+ end
+
+ @spec thread_containment(Activity.t(), User.t()) :: boolean()
+ defp thread_containment(_activity, %User{info: %{skip_thread_containment: true}}), do: true
+
+ defp thread_containment(activity, user) do
+ if Config.get([:instance, :skip_thread_containment]) do
+ true
+ else
+ ActivityPub.contain_activity(activity, user)
+ end
+ end
+end
diff --git a/lib/pleroma/web/twitter_api/controllers/util_controller.ex b/lib/pleroma/web/twitter_api/controllers/util_controller.ex
index 3405bd3b7..d7745ae7a 100644
--- a/lib/pleroma/web/twitter_api/controllers/util_controller.ex
+++ b/lib/pleroma/web/twitter_api/controllers/util_controller.ex
@@ -265,12 +265,7 @@ defmodule Pleroma.Web.TwitterAPI.UtilController do
String.split(line, ",") |> List.first()
end)
|> List.delete("Account address") do
- PleromaJobQueue.enqueue(:background, User, [
- :follow_import,
- follower,
- followed_identifiers
- ])
-
+ User.follow_import(follower, followed_identifiers)
json(conn, "job started")
end
end
@@ -281,12 +276,7 @@ defmodule Pleroma.Web.TwitterAPI.UtilController do
def blocks_import(%{assigns: %{user: blocker}} = conn, %{"list" => list}) do
with blocked_identifiers <- String.split(list) do
- PleromaJobQueue.enqueue(:background, User, [
- :blocks_import,
- blocker,
- blocked_identifiers
- ])
-
+ User.blocks_import(blocker, blocked_identifiers)
json(conn, "job started")
end
end
@@ -314,6 +304,25 @@ defmodule Pleroma.Web.TwitterAPI.UtilController do
end
end
+ def change_email(%{assigns: %{user: user}} = conn, params) do
+ case CommonAPI.Utils.confirm_current_password(user, params["password"]) do
+ {:ok, user} ->
+ with {:ok, _user} <- User.change_email(user, params["email"]) do
+ json(conn, %{status: "success"})
+ else
+ {:error, changeset} ->
+ {_, {error, _}} = Enum.at(changeset.errors, 0)
+ json(conn, %{error: "Email #{error}."})
+
+ _ ->
+ json(conn, %{error: "Unable to change email."})
+ end
+
+ {:error, msg} ->
+ json(conn, %{error: msg})
+ end
+ end
+
def delete_account(%{assigns: %{user: user}} = conn, params) do
case CommonAPI.Utils.confirm_current_password(user, params["password"]) do
{:ok, user} ->
diff --git a/lib/pleroma/web/views/streamer_view.ex b/lib/pleroma/web/views/streamer_view.ex
new file mode 100644
index 000000000..b13030fa0
--- /dev/null
+++ b/lib/pleroma/web/views/streamer_view.ex
@@ -0,0 +1,66 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Web.StreamerView do
+ use Pleroma.Web, :view
+
+ alias Pleroma.Activity
+ alias Pleroma.Conversation.Participation
+ alias Pleroma.Notification
+ alias Pleroma.User
+ alias Pleroma.Web.MastodonAPI.NotificationView
+
+ def render("update.json", %Activity{} = activity, %User{} = user) do
+ %{
+ event: "update",
+ payload:
+ Pleroma.Web.MastodonAPI.StatusView.render(
+ "status.json",
+ activity: activity,
+ for: user
+ )
+ |> Jason.encode!()
+ }
+ |> Jason.encode!()
+ end
+
+ def render("notification.json", %User{} = user, %Notification{} = notify) do
+ %{
+ event: "notification",
+ payload:
+ NotificationView.render(
+ "show.json",
+ %{notification: notify, for: user}
+ )
+ |> Jason.encode!()
+ }
+ |> Jason.encode!()
+ end
+
+ def render("update.json", %Activity{} = activity) do
+ %{
+ event: "update",
+ payload:
+ Pleroma.Web.MastodonAPI.StatusView.render(
+ "status.json",
+ activity: activity
+ )
+ |> Jason.encode!()
+ }
+ |> Jason.encode!()
+ end
+
+ def render("conversation.json", %Participation{} = participation) do
+ %{
+ event: "conversation",
+ payload:
+ Pleroma.Web.MastodonAPI.ConversationView.render("participation.json", %{
+ participation: participation,
+ for: participation.user
+ })
+ |> Jason.encode!()
+ }
+ |> Jason.encode!()
+ end
+end
diff --git a/lib/pleroma/web/web.ex b/lib/pleroma/web/web.ex
index bfb6c7287..687346554 100644
--- a/lib/pleroma/web/web.ex
+++ b/lib/pleroma/web/web.ex
@@ -66,23 +66,9 @@ defmodule Pleroma.Web do
end
@doc """
- Same as `render_many/4` but wrapped in rescue block and parallelized (unless disabled by passing false as a fifth argument).
+ Same as `render_many/4` but wrapped in rescue block.
"""
- def safe_render_many(collection, view, template, assigns \\ %{}, parallel \\ true)
-
- def safe_render_many(collection, view, template, assigns, true) do
- Enum.map(collection, fn resource ->
- Task.async(fn ->
- as = Map.get(assigns, :as) || view.__resource__
- assigns = Map.put(assigns, as, resource)
- safe_render(view, template, assigns)
- end)
- end)
- |> Enum.map(&Task.await(&1, :infinity))
- |> Enum.filter(& &1)
- end
-
- def safe_render_many(collection, view, template, assigns, false) do
+ def safe_render_many(collection, view, template, assigns \\ %{}) do
Enum.map(collection, fn resource ->
as = Map.get(assigns, :as) || view.__resource__
assigns = Map.put(assigns, as, resource)
diff --git a/lib/pleroma/workers/activity_expiration_worker.ex b/lib/pleroma/workers/activity_expiration_worker.ex
new file mode 100644
index 000000000..4e3e4195f
--- /dev/null
+++ b/lib/pleroma/workers/activity_expiration_worker.ex
@@ -0,0 +1,18 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.ActivityExpirationWorker do
+ use Pleroma.Workers.WorkerHelper, queue: "activity_expiration"
+
+ @impl Oban.Worker
+ def perform(
+ %{
+ "op" => "activity_expiration",
+ "activity_expiration_id" => activity_expiration_id
+ },
+ _job
+ ) do
+ Pleroma.Daemons.ActivityExpirationDaemon.perform(:execute, activity_expiration_id)
+ end
+end
diff --git a/lib/pleroma/workers/background_worker.ex b/lib/pleroma/workers/background_worker.ex
new file mode 100644
index 000000000..082f20ab7
--- /dev/null
+++ b/lib/pleroma/workers/background_worker.ex
@@ -0,0 +1,69 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.BackgroundWorker do
+ alias Pleroma.Activity
+ alias Pleroma.User
+ alias Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy
+ alias Pleroma.Web.OAuth.Token.CleanWorker
+
+ use Pleroma.Workers.WorkerHelper, queue: "background"
+
+ @impl Oban.Worker
+ def perform(%{"op" => "fetch_initial_posts", "user_id" => user_id}, _job) do
+ user = User.get_cached_by_id(user_id)
+ User.perform(:fetch_initial_posts, user)
+ end
+
+ def perform(%{"op" => "deactivate_user", "user_id" => user_id, "status" => status}, _job) do
+ user = User.get_cached_by_id(user_id)
+ User.perform(:deactivate_async, user, status)
+ end
+
+ def perform(%{"op" => "delete_user", "user_id" => user_id}, _job) do
+ user = User.get_cached_by_id(user_id)
+ User.perform(:delete, user)
+ end
+
+ def perform(
+ %{
+ "op" => "blocks_import",
+ "blocker_id" => blocker_id,
+ "blocked_identifiers" => blocked_identifiers
+ },
+ _job
+ ) do
+ blocker = User.get_cached_by_id(blocker_id)
+ User.perform(:blocks_import, blocker, blocked_identifiers)
+ end
+
+ def perform(
+ %{
+ "op" => "follow_import",
+ "follower_id" => follower_id,
+ "followed_identifiers" => followed_identifiers
+ },
+ _job
+ ) do
+ follower = User.get_cached_by_id(follower_id)
+ User.perform(:follow_import, follower, followed_identifiers)
+ end
+
+ def perform(%{"op" => "clean_expired_tokens"}, _job) do
+ CleanWorker.perform(:clean)
+ end
+
+ def perform(%{"op" => "media_proxy_preload", "message" => message}, _job) do
+ MediaProxyWarmingPolicy.perform(:preload, message)
+ end
+
+ def perform(%{"op" => "media_proxy_prefetch", "url" => url}, _job) do
+ MediaProxyWarmingPolicy.perform(:prefetch, url)
+ end
+
+ def perform(%{"op" => "fetch_data_for_activity", "activity_id" => activity_id}, _job) do
+ activity = Activity.get_by_id(activity_id)
+ Pleroma.Web.RichMedia.Helpers.perform(:fetch, activity)
+ end
+end
diff --git a/lib/pleroma/workers/digest_emails_worker.ex b/lib/pleroma/workers/digest_emails_worker.ex
new file mode 100644
index 000000000..3e5a836d0
--- /dev/null
+++ b/lib/pleroma/workers/digest_emails_worker.ex
@@ -0,0 +1,16 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.DigestEmailsWorker do
+ alias Pleroma.User
+
+ use Pleroma.Workers.WorkerHelper, queue: "digest_emails"
+
+ @impl Oban.Worker
+ def perform(%{"op" => "digest_email", "user_id" => user_id}, _job) do
+ user_id
+ |> User.get_cached_by_id()
+ |> Pleroma.Daemons.DigestEmailDaemon.perform()
+ end
+end
diff --git a/lib/pleroma/workers/mailer_worker.ex b/lib/pleroma/workers/mailer_worker.ex
new file mode 100644
index 000000000..1b7a0eb3e
--- /dev/null
+++ b/lib/pleroma/workers/mailer_worker.ex
@@ -0,0 +1,15 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.MailerWorker do
+ use Pleroma.Workers.WorkerHelper, queue: "mailer"
+
+ @impl Oban.Worker
+ def perform(%{"op" => "email", "encoded_email" => encoded_email, "config" => config}, _job) do
+ encoded_email
+ |> Base.decode64!()
+ |> :erlang.binary_to_term()
+ |> Pleroma.Emails.Mailer.deliver(config)
+ end
+end
diff --git a/lib/pleroma/workers/publisher_worker.ex b/lib/pleroma/workers/publisher_worker.ex
new file mode 100644
index 000000000..455f7fc7e
--- /dev/null
+++ b/lib/pleroma/workers/publisher_worker.ex
@@ -0,0 +1,25 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.PublisherWorker do
+ alias Pleroma.Activity
+ alias Pleroma.Web.Federator
+
+ use Pleroma.Workers.WorkerHelper, queue: "federator_outgoing"
+
+ def backoff(attempt) when is_integer(attempt) do
+ Pleroma.Workers.WorkerHelper.sidekiq_backoff(attempt, 5)
+ end
+
+ @impl Oban.Worker
+ def perform(%{"op" => "publish", "activity_id" => activity_id}, _job) do
+ activity = Activity.get_by_id(activity_id)
+ Federator.perform(:publish, activity)
+ end
+
+ def perform(%{"op" => "publish_one", "module" => module_name, "params" => params}, _job) do
+ params = Map.new(params, fn {k, v} -> {String.to_atom(k), v} end)
+ Federator.perform(:publish_one, String.to_atom(module_name), params)
+ end
+end
diff --git a/lib/pleroma/workers/receiver_worker.ex b/lib/pleroma/workers/receiver_worker.ex
new file mode 100644
index 000000000..83d528a66
--- /dev/null
+++ b/lib/pleroma/workers/receiver_worker.ex
@@ -0,0 +1,18 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.ReceiverWorker do
+ alias Pleroma.Web.Federator
+
+ use Pleroma.Workers.WorkerHelper, queue: "federator_incoming"
+
+ @impl Oban.Worker
+ def perform(%{"op" => "incoming_doc", "body" => doc}, _job) do
+ Federator.perform(:incoming_doc, doc)
+ end
+
+ def perform(%{"op" => "incoming_ap_doc", "params" => params}, _job) do
+ Federator.perform(:incoming_ap_doc, params)
+ end
+end
diff --git a/lib/pleroma/workers/scheduled_activity_worker.ex b/lib/pleroma/workers/scheduled_activity_worker.ex
new file mode 100644
index 000000000..ca7d53af1
--- /dev/null
+++ b/lib/pleroma/workers/scheduled_activity_worker.ex
@@ -0,0 +1,12 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.ScheduledActivityWorker do
+ use Pleroma.Workers.WorkerHelper, queue: "scheduled_activities"
+
+ @impl Oban.Worker
+ def perform(%{"op" => "execute", "activity_id" => activity_id}, _job) do
+ Pleroma.Daemons.ScheduledActivityDaemon.perform(:execute, activity_id)
+ end
+end
diff --git a/lib/pleroma/workers/subscriber_worker.ex b/lib/pleroma/workers/subscriber_worker.ex
new file mode 100644
index 000000000..fc490e300
--- /dev/null
+++ b/lib/pleroma/workers/subscriber_worker.ex
@@ -0,0 +1,26 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.SubscriberWorker do
+ alias Pleroma.Repo
+ alias Pleroma.Web.Federator
+ alias Pleroma.Web.Websub
+
+ use Pleroma.Workers.WorkerHelper, queue: "federator_outgoing"
+
+ @impl Oban.Worker
+ def perform(%{"op" => "refresh_subscriptions"}, _job) do
+ Federator.perform(:refresh_subscriptions)
+ end
+
+ def perform(%{"op" => "request_subscription", "websub_id" => websub_id}, _job) do
+ websub = Repo.get(Websub.WebsubClientSubscription, websub_id)
+ Federator.perform(:request_subscription, websub)
+ end
+
+ def perform(%{"op" => "verify_websub", "websub_id" => websub_id}, _job) do
+ websub = Repo.get(Websub.WebsubServerSubscription, websub_id)
+ Federator.perform(:verify_websub, websub)
+ end
+end
diff --git a/lib/pleroma/workers/transmogrifier_worker.ex b/lib/pleroma/workers/transmogrifier_worker.ex
new file mode 100644
index 000000000..b581a2f86
--- /dev/null
+++ b/lib/pleroma/workers/transmogrifier_worker.ex
@@ -0,0 +1,15 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.TransmogrifierWorker do
+ alias Pleroma.User
+
+ use Pleroma.Workers.WorkerHelper, queue: "transmogrifier"
+
+ @impl Oban.Worker
+ def perform(%{"op" => "user_upgrade", "user_id" => user_id}, _job) do
+ user = User.get_cached_by_id(user_id)
+ Pleroma.Web.ActivityPub.Transmogrifier.perform(:user_upgrade, user)
+ end
+end
diff --git a/lib/pleroma/workers/web_pusher_worker.ex b/lib/pleroma/workers/web_pusher_worker.ex
new file mode 100644
index 000000000..bea2baffb
--- /dev/null
+++ b/lib/pleroma/workers/web_pusher_worker.ex
@@ -0,0 +1,16 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.WebPusherWorker do
+ alias Pleroma.Notification
+ alias Pleroma.Repo
+
+ use Pleroma.Workers.WorkerHelper, queue: "web_push"
+
+ @impl Oban.Worker
+ def perform(%{"op" => "web_push", "notification_id" => notification_id}, _job) do
+ notification = Repo.get(Notification, notification_id)
+ Pleroma.Web.Push.Impl.perform(notification)
+ end
+end
diff --git a/lib/pleroma/workers/worker_helper.ex b/lib/pleroma/workers/worker_helper.ex
new file mode 100644
index 000000000..358efa14a
--- /dev/null
+++ b/lib/pleroma/workers/worker_helper.ex
@@ -0,0 +1,46 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.WorkerHelper do
+ alias Pleroma.Config
+ alias Pleroma.Workers.WorkerHelper
+
+ def worker_args(queue) do
+ case Config.get([:workers, :retries, queue]) do
+ nil -> []
+ max_attempts -> [max_attempts: max_attempts]
+ end
+ end
+
+ def sidekiq_backoff(attempt, pow \\ 4, base_backoff \\ 15) do
+ backoff =
+ :math.pow(attempt, pow) +
+ base_backoff +
+ :rand.uniform(2 * base_backoff) * attempt
+
+ trunc(backoff)
+ end
+
+ defmacro __using__(opts) do
+ caller_module = __CALLER__.module
+ queue = Keyword.fetch!(opts, :queue)
+
+ quote do
+ # Note: `max_attempts` is intended to be overridden in `new/2` call
+ use Oban.Worker,
+ queue: unquote(queue),
+ max_attempts: 1
+
+ def enqueue(op, params, worker_args \\ []) do
+ params = Map.merge(%{"op" => op}, params)
+ queue_atom = String.to_atom(unquote(queue))
+ worker_args = worker_args ++ WorkerHelper.worker_args(queue_atom)
+
+ unquote(caller_module)
+ |> apply(:new, [params, worker_args])
+ |> Pleroma.Repo.insert()
+ end
+ end
+ end
+end