aboutsummaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/mix/tasks/pleroma/benchmark.ex36
-rw-r--r--lib/mix/tasks/pleroma/docs.ex42
-rw-r--r--lib/pleroma/activity.ex215
-rw-r--r--lib/pleroma/activity/ir/topics.ex63
-rw-r--r--lib/pleroma/activity/queries.ex32
-rw-r--r--lib/pleroma/application.ex29
-rw-r--r--lib/pleroma/daemons/activity_expiration_daemon.ex (renamed from lib/pleroma/activity_expiration_worker.ex)8
-rw-r--r--lib/pleroma/daemons/digest_email_daemon.ex (renamed from lib/pleroma/digest_email_worker.ex)13
-rw-r--r--lib/pleroma/daemons/scheduled_activity_daemon.ex (renamed from lib/pleroma/scheduled_activity_worker.ex)8
-rw-r--r--lib/pleroma/delivery.ex51
-rw-r--r--lib/pleroma/docs/generator.ex73
-rw-r--r--lib/pleroma/docs/json.ex20
-rw-r--r--lib/pleroma/docs/markdown.ex78
-rw-r--r--lib/pleroma/emails/mailer.ex8
-rw-r--r--lib/pleroma/healthcheck.ex (renamed from lib/healthcheck.ex)1
-rw-r--r--lib/pleroma/instances/instance.ex8
-rw-r--r--lib/pleroma/notification.ex6
-rw-r--r--lib/pleroma/object.ex6
-rw-r--r--lib/pleroma/plugs/cache.ex136
-rw-r--r--lib/pleroma/plugs/http_signature.ex3
-rw-r--r--lib/pleroma/plugs/trailing_format_plug.ex41
-rw-r--r--lib/pleroma/scheduler.ex7
-rw-r--r--lib/pleroma/user.ex100
-rw-r--r--lib/pleroma/user/info.ex17
-rw-r--r--lib/pleroma/web/activity_pub/activity_pub.ex55
-rw-r--r--lib/pleroma/web/activity_pub/activity_pub_controller.ex102
-rw-r--r--lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex5
-rw-r--r--lib/pleroma/web/activity_pub/publisher.ex31
-rw-r--r--lib/pleroma/web/activity_pub/transmogrifier.ex7
-rw-r--r--lib/pleroma/web/activity_pub/utils.ex176
-rw-r--r--lib/pleroma/web/admin_api/config.ex15
-rw-r--r--lib/pleroma/web/controller_helper.ex95
-rw-r--r--lib/pleroma/web/endpoint.ex2
-rw-r--r--lib/pleroma/web/federator/federator.ex90
-rw-r--r--lib/pleroma/web/federator/publisher.ex24
-rw-r--r--lib/pleroma/web/federator/retry_queue.ex239
-rw-r--r--lib/pleroma/web/mastodon_api/controllers/mastodon_api_controller.ex50
-rw-r--r--lib/pleroma/web/mastodon_api/views/notification_view.ex2
-rw-r--r--lib/pleroma/web/mastodon_api/views/status_view.ex39
-rw-r--r--lib/pleroma/web/mastodon_api/websocket_handler.ex7
-rw-r--r--lib/pleroma/web/oauth/token/clean_worker.ex5
-rw-r--r--lib/pleroma/web/pleroma_api/pleroma_api_controller.ex27
-rw-r--r--lib/pleroma/web/push/push.ex7
-rw-r--r--lib/pleroma/web/rich_media/parser.ex6
-rw-r--r--lib/pleroma/web/router.ex104
-rw-r--r--lib/pleroma/web/salmon/salmon.ex11
-rw-r--r--lib/pleroma/web/streamer.ex318
-rw-r--r--lib/pleroma/web/streamer/ping.ex33
-rw-r--r--lib/pleroma/web/streamer/state.ex68
-rw-r--r--lib/pleroma/web/streamer/streamer.ex55
-rw-r--r--lib/pleroma/web/streamer/streamer_socket.ex31
-rw-r--r--lib/pleroma/web/streamer/supervisor.ex33
-rw-r--r--lib/pleroma/web/streamer/worker.ex220
-rw-r--r--lib/pleroma/web/twitter_api/controllers/util_controller.ex33
-rw-r--r--lib/pleroma/web/twitter_api/representers/base_representer.ex38
-rw-r--r--lib/pleroma/web/twitter_api/representers/object_representer.ex39
-rw-r--r--lib/pleroma/web/twitter_api/twitter_api.ex195
-rw-r--r--lib/pleroma/web/twitter_api/twitter_api_controller.ex766
-rw-r--r--lib/pleroma/web/twitter_api/views/activity_view.ex366
-rw-r--r--lib/pleroma/web/twitter_api/views/notification_view.ex71
-rw-r--r--lib/pleroma/web/twitter_api/views/user_view.ex191
-rw-r--r--lib/pleroma/web/views/streamer_view.ex66
-rw-r--r--lib/pleroma/web/web.ex18
-rw-r--r--lib/pleroma/workers/activity_expiration_worker.ex18
-rw-r--r--lib/pleroma/workers/background_worker.ex69
-rw-r--r--lib/pleroma/workers/digest_emails_worker.ex16
-rw-r--r--lib/pleroma/workers/mailer_worker.ex15
-rw-r--r--lib/pleroma/workers/publisher_worker.ex25
-rw-r--r--lib/pleroma/workers/receiver_worker.ex18
-rw-r--r--lib/pleroma/workers/scheduled_activity_worker.ex12
-rw-r--r--lib/pleroma/workers/subscriber_worker.ex26
-rw-r--r--lib/pleroma/workers/transmogrifier_worker.ex15
-rw-r--r--lib/pleroma/workers/web_pusher_worker.ex16
-rw-r--r--lib/pleroma/workers/worker_helper.ex46
74 files changed, 1963 insertions, 2954 deletions
diff --git a/lib/mix/tasks/pleroma/benchmark.ex b/lib/mix/tasks/pleroma/benchmark.ex
index 4cc634727..84dccf7f3 100644
--- a/lib/mix/tasks/pleroma/benchmark.ex
+++ b/lib/mix/tasks/pleroma/benchmark.ex
@@ -27,7 +27,7 @@ defmodule Mix.Tasks.Pleroma.Benchmark do
})
end
- def run(["render_timeline", nickname]) do
+ def run(["render_timeline", nickname | _] = args) do
start_pleroma()
user = Pleroma.User.get_by_nickname(nickname)
@@ -37,33 +37,37 @@ defmodule Mix.Tasks.Pleroma.Benchmark do
|> Map.put("blocking_user", user)
|> Map.put("muting_user", user)
|> Map.put("user", user)
- |> Map.put("limit", 80)
+ |> Map.put("limit", 4096)
|> Pleroma.Web.ActivityPub.ActivityPub.fetch_public_activities()
|> Enum.reverse()
inputs = %{
- "One activity" => Enum.take_random(activities, 1),
- "Ten activities" => Enum.take_random(activities, 10),
- "Twenty activities" => Enum.take_random(activities, 20),
- "Forty activities" => Enum.take_random(activities, 40),
- "Eighty activities" => Enum.take_random(activities, 80)
+ "1 activity" => Enum.take_random(activities, 1),
+ "10 activities" => Enum.take_random(activities, 10),
+ "20 activities" => Enum.take_random(activities, 20),
+ "40 activities" => Enum.take_random(activities, 40),
+ "80 activities" => Enum.take_random(activities, 80)
}
+ inputs =
+ if Enum.at(args, 2) == "extended" do
+ Map.merge(inputs, %{
+ "200 activities" => Enum.take_random(activities, 200),
+ "500 activities" => Enum.take_random(activities, 500),
+ "2000 activities" => Enum.take_random(activities, 2000),
+ "4096 activities" => Enum.take_random(activities, 4096)
+ })
+ else
+ inputs
+ end
+
Benchee.run(
%{
- "Parallel rendering" => fn activities ->
- Pleroma.Web.MastodonAPI.StatusView.render("index.json", %{
- activities: activities,
- for: user,
- as: :activity
- })
- end,
"Standart rendering" => fn activities ->
Pleroma.Web.MastodonAPI.StatusView.render("index.json", %{
activities: activities,
for: user,
- as: :activity,
- parallel: false
+ as: :activity
})
end
},
diff --git a/lib/mix/tasks/pleroma/docs.ex b/lib/mix/tasks/pleroma/docs.ex
new file mode 100644
index 000000000..0d2663648
--- /dev/null
+++ b/lib/mix/tasks/pleroma/docs.ex
@@ -0,0 +1,42 @@
+defmodule Mix.Tasks.Pleroma.Docs do
+ use Mix.Task
+ import Mix.Pleroma
+
+ @shortdoc "Generates docs from descriptions.exs"
+ @moduledoc """
+ Generates docs from `descriptions.exs`.
+
+ Supports two formats: `markdown` and `json`.
+
+ ## Generate Markdown docs
+
+ `mix pleroma.docs`
+
+ ## Generate JSON docs
+
+ `mix pleroma.docs json`
+ """
+
+ def run(["json"]) do
+ do_run(Pleroma.Docs.JSON)
+ end
+
+ def run(_) do
+ do_run(Pleroma.Docs.Markdown)
+ end
+
+ defp do_run(implementation) do
+ start_pleroma()
+
+ with {descriptions, _paths} <- Mix.Config.eval!("config/description.exs"),
+ {:ok, file_path} <-
+ Pleroma.Docs.Generator.process(
+ implementation,
+ descriptions[:pleroma][:config_description]
+ ) do
+ type = if implementation == Pleroma.Docs.Markdown, do: "Markdown", else: "JSON"
+
+ Mix.shell().info([:green, "#{type} docs successfully generated to #{file_path}."])
+ end
+ end
+end
diff --git a/lib/pleroma/activity.ex b/lib/pleroma/activity.ex
index 2d4e9da0c..ec558168a 100644
--- a/lib/pleroma/activity.ex
+++ b/lib/pleroma/activity.ex
@@ -6,6 +6,7 @@ defmodule Pleroma.Activity do
use Ecto.Schema
alias Pleroma.Activity
+ alias Pleroma.Activity.Queries
alias Pleroma.ActivityExpiration
alias Pleroma.Bookmark
alias Pleroma.Notification
@@ -65,8 +66,8 @@ defmodule Pleroma.Activity do
timestamps()
end
- def with_joined_object(query) do
- join(query, :inner, [activity], o in Object,
+ def with_joined_object(query, join_type \\ :inner) do
+ join(query, join_type, [activity], o in Object,
on:
fragment(
"(?->>'id') = COALESCE(?->'object'->>'id', ?->>'object')",
@@ -78,10 +79,10 @@ defmodule Pleroma.Activity do
)
end
- def with_preloaded_object(query) do
+ def with_preloaded_object(query, join_type \\ :inner) do
query
|> has_named_binding?(:object)
- |> if(do: query, else: with_joined_object(query))
+ |> if(do: query, else: with_joined_object(query, join_type))
|> preload([activity, object: object], object: object)
end
@@ -107,12 +108,9 @@ defmodule Pleroma.Activity do
def with_set_thread_muted_field(query, _), do: query
def get_by_ap_id(ap_id) do
- Repo.one(
- from(
- activity in Activity,
- where: fragment("(?)->>'id' = ?", activity.data, ^to_string(ap_id))
- )
- )
+ ap_id
+ |> Queries.by_ap_id()
+ |> Repo.one()
end
def get_bookmark(%Activity{} = activity, %User{} = user) do
@@ -133,21 +131,10 @@ defmodule Pleroma.Activity do
end
def get_by_ap_id_with_object(ap_id) do
- Repo.one(
- from(
- activity in Activity,
- where: fragment("(?)->>'id' = ?", activity.data, ^to_string(ap_id)),
- left_join: o in Object,
- on:
- fragment(
- "(?->>'id') = COALESCE(?->'object'->>'id', ?->>'object')",
- o.data,
- activity.data,
- activity.data
- ),
- preload: [object: o]
- )
- )
+ ap_id
+ |> Queries.by_ap_id()
+ |> with_preloaded_object(:left)
+ |> Repo.one()
end
def get_by_id(id) do
@@ -158,66 +145,34 @@ defmodule Pleroma.Activity do
end
def get_by_id_with_object(id) do
- from(activity in Activity,
- where: activity.id == ^id,
- inner_join: o in Object,
- on:
- fragment(
- "(?->>'id') = COALESCE(?->'object'->>'id', ?->>'object')",
- o.data,
- activity.data,
- activity.data
- ),
- preload: [object: o]
- )
+ Activity
+ |> where(id: ^id)
+ |> with_preloaded_object()
|> Repo.one()
end
- def by_object_ap_id(ap_id) do
- from(
- activity in Activity,
- where:
- fragment(
- "coalesce((?)->'object'->>'id', (?)->>'object') = ?",
- activity.data,
- activity.data,
- ^to_string(ap_id)
- )
- )
- end
-
- def create_by_object_ap_id(ap_ids) when is_list(ap_ids) do
- from(
- activity in Activity,
- where:
- fragment(
- "coalesce((?)->'object'->>'id', (?)->>'object') = ANY(?)",
- activity.data,
- activity.data,
- ^ap_ids
- ),
- where: fragment("(?)->>'type' = 'Create'", activity.data)
- )
+ def all_by_ids_with_object(ids) do
+ Activity
+ |> where([a], a.id in ^ids)
+ |> with_preloaded_object()
+ |> Repo.all()
end
- def create_by_object_ap_id(ap_id) when is_binary(ap_id) do
- from(
- activity in Activity,
- where:
- fragment(
- "coalesce((?)->'object'->>'id', (?)->>'object') = ?",
- activity.data,
- activity.data,
- ^to_string(ap_id)
- ),
- where: fragment("(?)->>'type' = 'Create'", activity.data)
- )
+ @doc """
+ Accepts `ap_id` or list of `ap_id`.
+ Returns a query.
+ """
+ @spec create_by_object_ap_id(String.t() | [String.t()]) :: Ecto.Queryable.t()
+ def create_by_object_ap_id(ap_id) do
+ ap_id
+ |> Queries.by_object_id()
+ |> Queries.by_type("Create")
end
- def create_by_object_ap_id(_), do: nil
-
def get_all_create_by_object_ap_id(ap_id) do
- Repo.all(create_by_object_ap_id(ap_id))
+ ap_id
+ |> create_by_object_ap_id()
+ |> Repo.all()
end
def get_create_by_object_ap_id(ap_id) when is_binary(ap_id) do
@@ -228,54 +183,17 @@ defmodule Pleroma.Activity do
def get_create_by_object_ap_id(_), do: nil
- def create_by_object_ap_id_with_object(ap_ids) when is_list(ap_ids) do
- from(
- activity in Activity,
- where:
- fragment(
- "coalesce((?)->'object'->>'id', (?)->>'object') = ANY(?)",
- activity.data,
- activity.data,
- ^ap_ids
- ),
- where: fragment("(?)->>'type' = 'Create'", activity.data),
- inner_join: o in Object,
- on:
- fragment(
- "(?->>'id') = COALESCE(?->'object'->>'id', ?->>'object')",
- o.data,
- activity.data,
- activity.data
- ),
- preload: [object: o]
- )
- end
-
- def create_by_object_ap_id_with_object(ap_id) when is_binary(ap_id) do
- from(
- activity in Activity,
- where:
- fragment(
- "coalesce((?)->'object'->>'id', (?)->>'object') = ?",
- activity.data,
- activity.data,
- ^to_string(ap_id)
- ),
- where: fragment("(?)->>'type' = 'Create'", activity.data),
- inner_join: o in Object,
- on:
- fragment(
- "(?->>'id') = COALESCE(?->'object'->>'id', ?->>'object')",
- o.data,
- activity.data,
- activity.data
- ),
- preload: [object: o]
- )
+ @doc """
+ Accepts `ap_id` or list of `ap_id`.
+ Returns a query.
+ """
+ @spec create_by_object_ap_id_with_object(String.t() | [String.t()]) :: Ecto.Queryable.t()
+ def create_by_object_ap_id_with_object(ap_id) do
+ ap_id
+ |> create_by_object_ap_id()
+ |> with_preloaded_object()
end
- def create_by_object_ap_id_with_object(_), do: nil
-
def get_create_by_object_ap_id_with_object(ap_id) when is_binary(ap_id) do
ap_id
|> create_by_object_ap_id_with_object()
@@ -299,7 +217,8 @@ defmodule Pleroma.Activity do
def normalize(_), do: nil
def delete_by_ap_id(id) when is_binary(id) do
- by_object_ap_id(id)
+ id
+ |> Queries.by_object_id()
|> select([u], u)
|> Repo.delete_all()
|> elem(1)
@@ -308,10 +227,19 @@ defmodule Pleroma.Activity do
%{data: %{"type" => "Create", "object" => %{"id" => ap_id}}} -> ap_id == id
_ -> nil
end)
+ |> purge_web_resp_cache()
end
def delete_by_ap_id(_), do: nil
+ defp purge_web_resp_cache(%Activity{} = activity) do
+ %{path: path} = URI.parse(activity.data["id"])
+ Cachex.del(:web_resp_cache, path)
+ activity
+ end
+
+ defp purge_web_resp_cache(nil), do: nil
+
for {ap_type, type} <- @mastodon_notification_types do
def mastodon_notification_type(%Activity{data: %{"type" => unquote(ap_type)}}),
do: unquote(type)
@@ -334,40 +262,19 @@ defmodule Pleroma.Activity do
end
def follow_requests_for_actor(%Pleroma.User{ap_id: ap_id}) do
- from(
- a in Activity,
- where:
- fragment(
- "? ->> 'type' = 'Follow'",
- a.data
- ),
- where:
- fragment(
- "? ->> 'state' = 'pending'",
- a.data
- ),
- where:
- fragment(
- "coalesce((?)->'object'->>'id', (?)->>'object') = ?",
- a.data,
- a.data,
- ^ap_id
- )
- )
- end
-
- @spec query_by_actor(actor()) :: Ecto.Query.t()
- def query_by_actor(actor) do
- from(a in Activity, where: a.actor == ^actor)
+ ap_id
+ |> Queries.by_object_id()
+ |> Queries.by_type("Follow")
+ |> where([a], fragment("? ->> 'state' = 'pending'", a.data))
end
def restrict_deactivated_users(query) do
+ deactivated_users =
+ from(u in User.Query.build(deactivated: true), select: u.ap_id)
+ |> Repo.all()
+
from(activity in query,
- where:
- fragment(
- "? not in (SELECT ap_id FROM users WHERE info->'deactivated' @> 'true')",
- activity.actor
- )
+ where: activity.actor not in ^deactivated_users
)
end
diff --git a/lib/pleroma/activity/ir/topics.ex b/lib/pleroma/activity/ir/topics.ex
new file mode 100644
index 000000000..010897abc
--- /dev/null
+++ b/lib/pleroma/activity/ir/topics.ex
@@ -0,0 +1,63 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Activity.Ir.Topics do
+ alias Pleroma.Object
+ alias Pleroma.Web.ActivityPub.Visibility
+
+ def get_activity_topics(activity) do
+ activity
+ |> Object.normalize()
+ |> generate_topics(activity)
+ |> List.flatten()
+ end
+
+ defp generate_topics(%{data: %{"type" => "Answer"}}, _) do
+ []
+ end
+
+ defp generate_topics(object, activity) do
+ ["user", "list"] ++ visibility_tags(object, activity)
+ end
+
+ defp visibility_tags(object, activity) do
+ case Visibility.get_visibility(activity) do
+ "public" ->
+ if activity.local do
+ ["public", "public:local"]
+ else
+ ["public"]
+ end
+ |> item_creation_tags(object, activity)
+
+ "direct" ->
+ ["direct"]
+
+ _ ->
+ []
+ end
+ end
+
+ defp item_creation_tags(tags, %{data: %{"type" => "Create"}} = object, activity) do
+ tags ++ hashtags_to_topics(object) ++ attachment_topics(object, activity)
+ end
+
+ defp item_creation_tags(tags, _, _) do
+ tags
+ end
+
+ defp hashtags_to_topics(%{data: %{"tag" => tags}}) do
+ tags
+ |> Enum.filter(&is_bitstring(&1))
+ |> Enum.map(fn tag -> "hashtag:" <> tag end)
+ end
+
+ defp hashtags_to_topics(_), do: []
+
+ defp attachment_topics(%{data: %{"attachment" => []}}, _act), do: []
+
+ defp attachment_topics(_object, %{local: true}), do: ["public:media", "public:local:media"]
+
+ defp attachment_topics(_object, _act), do: ["public:media"]
+end
diff --git a/lib/pleroma/activity/queries.ex b/lib/pleroma/activity/queries.ex
index aa5b29566..13fa33831 100644
--- a/lib/pleroma/activity/queries.ex
+++ b/lib/pleroma/activity/queries.ex
@@ -13,6 +13,14 @@ defmodule Pleroma.Activity.Queries do
alias Pleroma.Activity
+ @spec by_ap_id(query, String.t()) :: query
+ def by_ap_id(query \\ Activity, ap_id) do
+ from(
+ activity in query,
+ where: fragment("(?)->>'id' = ?", activity.data, ^to_string(ap_id))
+ )
+ end
+
@spec by_actor(query, String.t()) :: query
def by_actor(query \\ Activity, actor) do
from(
@@ -21,8 +29,23 @@ defmodule Pleroma.Activity.Queries do
)
end
- @spec by_object_id(query, String.t()) :: query
- def by_object_id(query \\ Activity, object_id) do
+ @spec by_object_id(query, String.t() | [String.t()]) :: query
+ def by_object_id(query \\ Activity, object_id)
+
+ def by_object_id(query, object_ids) when is_list(object_ids) do
+ from(
+ activity in query,
+ where:
+ fragment(
+ "coalesce((?)->'object'->>'id', (?)->>'object') = ANY(?)",
+ activity.data,
+ activity.data,
+ ^object_ids
+ )
+ )
+ end
+
+ def by_object_id(query, object_id) when is_binary(object_id) do
from(activity in query,
where:
fragment(
@@ -41,9 +64,4 @@ defmodule Pleroma.Activity.Queries do
where: fragment("(?)->>'type' = ?", activity.data, ^activity_type)
)
end
-
- @spec limit(query, pos_integer()) :: query
- def limit(query \\ Activity, limit) do
- from(activity in query, limit: ^limit)
- end
end
diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex
index 483ac1f39..3b37ce630 100644
--- a/lib/pleroma/application.ex
+++ b/lib/pleroma/application.ex
@@ -31,18 +31,19 @@ defmodule Pleroma.Application do
children =
[
Pleroma.Repo,
+ Pleroma.Scheduler,
Pleroma.Config.TransferTask,
Pleroma.Emoji,
Pleroma.Captcha,
Pleroma.FlakeId,
- Pleroma.ScheduledActivityWorker,
- Pleroma.ActivityExpirationWorker
+ Pleroma.Daemons.ScheduledActivityDaemon,
+ Pleroma.Daemons.ActivityExpirationDaemon
] ++
cachex_children() ++
hackney_pool_children() ++
[
- Pleroma.Web.Federator.RetryQueue,
Pleroma.Stats,
+ {Oban, Pleroma.Config.get(Oban)},
%{
id: :web_push_init,
start: {Task, :start_link, [&Pleroma.Web.Push.init/0]},
@@ -70,9 +71,7 @@ defmodule Pleroma.Application do
# See http://elixir-lang.org/docs/stable/elixir/Supervisor.html
# for other strategies and supported options
opts = [strategy: :one_for_one, name: Pleroma.Supervisor]
- result = Supervisor.start_link(children, opts)
- :ok = after_supervisor_start()
- result
+ Supervisor.start_link(children, opts)
end
defp setup_instrumenters do
@@ -116,7 +115,8 @@ defmodule Pleroma.Application do
build_cachex("object", default_ttl: 25_000, ttl_interval: 1000, limit: 2500),
build_cachex("rich_media", default_ttl: :timer.minutes(120), limit: 5000),
build_cachex("scrubber", limit: 2500),
- build_cachex("idempotency", expiration: idempotency_expiration(), limit: 2500)
+ build_cachex("idempotency", expiration: idempotency_expiration(), limit: 2500),
+ build_cachex("web_resp", limit: 2500)
]
end
@@ -141,7 +141,7 @@ defmodule Pleroma.Application do
defp streamer_child(:test), do: []
defp streamer_child(_) do
- [Pleroma.Web.Streamer]
+ [Pleroma.Web.Streamer.supervisor()]
end
defp oauth_cleanup_child(true),
@@ -163,17 +163,4 @@ defmodule Pleroma.Application do
:hackney_pool.child_spec(pool, options)
end
end
-
- defp after_supervisor_start do
- with digest_config <- Application.get_env(:pleroma, :email_notifications)[:digest],
- true <- digest_config[:active] do
- PleromaJobQueue.schedule(
- digest_config[:schedule],
- :digest_emails,
- Pleroma.DigestEmailWorker
- )
- end
-
- :ok
- end
end
diff --git a/lib/pleroma/activity_expiration_worker.ex b/lib/pleroma/daemons/activity_expiration_daemon.ex
index 0f9e715f8..cab7628c4 100644
--- a/lib/pleroma/activity_expiration_worker.ex
+++ b/lib/pleroma/daemons/activity_expiration_daemon.ex
@@ -2,13 +2,14 @@
# Copyright © 2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
-defmodule Pleroma.ActivityExpirationWorker do
+defmodule Pleroma.Daemons.ActivityExpirationDaemon do
alias Pleroma.Activity
alias Pleroma.ActivityExpiration
alias Pleroma.Config
alias Pleroma.Repo
alias Pleroma.User
alias Pleroma.Web.CommonAPI
+
require Logger
use GenServer
import Ecto.Query
@@ -49,7 +50,10 @@ defmodule Pleroma.ActivityExpirationWorker do
def handle_info(:perform, state) do
ActivityExpiration.due_expirations(@schedule_interval)
|> Enum.each(fn expiration ->
- PleromaJobQueue.enqueue(:activity_expiration, __MODULE__, [:execute, expiration.id])
+ Pleroma.Workers.ActivityExpirationWorker.enqueue(
+ "activity_expiration",
+ %{"activity_expiration_id" => expiration.id}
+ )
end)
schedule_next()
diff --git a/lib/pleroma/digest_email_worker.ex b/lib/pleroma/daemons/digest_email_daemon.ex
index 5644d6a67..462ad2c55 100644
--- a/lib/pleroma/digest_email_worker.ex
+++ b/lib/pleroma/daemons/digest_email_daemon.ex
@@ -2,10 +2,11 @@
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
-defmodule Pleroma.DigestEmailWorker do
- import Ecto.Query
+defmodule Pleroma.Daemons.DigestEmailDaemon do
+ alias Pleroma.Repo
+ alias Pleroma.Workers.DigestEmailsWorker
- @queue_name :digest_emails
+ import Ecto.Query
def perform do
config = Pleroma.Config.get([:email_notifications, :digest])
@@ -20,8 +21,10 @@ defmodule Pleroma.DigestEmailWorker do
where: u.last_digest_emailed_at < datetime_add(^now, ^negative_interval, "day"),
select: u
)
- |> Pleroma.Repo.all()
- |> Enum.each(&PleromaJobQueue.enqueue(@queue_name, __MODULE__, [&1]))
+ |> Repo.all()
+ |> Enum.each(fn user ->
+ DigestEmailsWorker.enqueue("digest_email", %{"user_id" => user.id})
+ end)
end
@doc """
diff --git a/lib/pleroma/scheduled_activity_worker.ex b/lib/pleroma/daemons/scheduled_activity_daemon.ex
index 8578cab5e..aee5f723a 100644
--- a/lib/pleroma/scheduled_activity_worker.ex
+++ b/lib/pleroma/daemons/scheduled_activity_daemon.ex
@@ -2,7 +2,7 @@
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
-defmodule Pleroma.ScheduledActivityWorker do
+defmodule Pleroma.Daemons.ScheduledActivityDaemon do
@moduledoc """
Sends scheduled activities to the job queue.
"""
@@ -11,6 +11,7 @@ defmodule Pleroma.ScheduledActivityWorker do
alias Pleroma.ScheduledActivity
alias Pleroma.User
alias Pleroma.Web.CommonAPI
+
use GenServer
require Logger
@@ -45,7 +46,10 @@ defmodule Pleroma.ScheduledActivityWorker do
def handle_info(:perform, state) do
ScheduledActivity.due_activities(@schedule_interval)
|> Enum.each(fn scheduled_activity ->
- PleromaJobQueue.enqueue(:scheduled_activities, __MODULE__, [:execute, scheduled_activity.id])
+ Pleroma.Workers.ScheduledActivityWorker.enqueue(
+ "execute",
+ %{"activity_id" => scheduled_activity.id}
+ )
end)
schedule_next()
diff --git a/lib/pleroma/delivery.ex b/lib/pleroma/delivery.ex
new file mode 100644
index 000000000..29a1e5a77
--- /dev/null
+++ b/lib/pleroma/delivery.ex
@@ -0,0 +1,51 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Delivery do
+ use Ecto.Schema
+
+ alias Pleroma.Delivery
+ alias Pleroma.FlakeId
+ alias Pleroma.Object
+ alias Pleroma.Repo
+ alias Pleroma.User
+ alias Pleroma.User
+
+ import Ecto.Changeset
+ import Ecto.Query
+
+ schema "deliveries" do
+ belongs_to(:user, User, type: FlakeId)
+ belongs_to(:object, Object)
+ end
+
+ def changeset(delivery, params \\ %{}) do
+ delivery
+ |> cast(params, [:user_id, :object_id])
+ |> validate_required([:user_id, :object_id])
+ |> foreign_key_constraint(:object_id)
+ |> foreign_key_constraint(:user_id)
+ |> unique_constraint(:user_id, name: :deliveries_user_id_object_id_index)
+ end
+
+ def create(object_id, user_id) do
+ %Delivery{}
+ |> changeset(%{user_id: user_id, object_id: object_id})
+ |> Repo.insert(on_conflict: :nothing)
+ end
+
+ def get(object_id, user_id) do
+ from(d in Delivery, where: d.user_id == ^user_id and d.object_id == ^object_id)
+ |> Repo.one()
+ end
+
+ # A hack because user delete activities have a fake id for whatever reason
+ # TODO: Get rid of this
+ def delete_all_by_object_id("pleroma:fake_object_id"), do: {0, []}
+
+ def delete_all_by_object_id(object_id) do
+ from(d in Delivery, where: d.object_id == ^object_id)
+ |> Repo.delete_all()
+ end
+end
diff --git a/lib/pleroma/docs/generator.ex b/lib/pleroma/docs/generator.ex
new file mode 100644
index 000000000..aa578eee2
--- /dev/null
+++ b/lib/pleroma/docs/generator.ex
@@ -0,0 +1,73 @@
+defmodule Pleroma.Docs.Generator do
+ @callback process(keyword()) :: {:ok, String.t()}
+
+ @spec process(module(), keyword()) :: {:ok, String.t()}
+ def process(implementation, descriptions) do
+ implementation.process(descriptions)
+ end
+
+ @spec uploaders_list() :: [module()]
+ def uploaders_list do
+ {:ok, modules} = :application.get_key(:pleroma, :modules)
+
+ Enum.filter(modules, fn module ->
+ name_as_list = Module.split(module)
+
+ List.starts_with?(name_as_list, ["Pleroma", "Uploaders"]) and
+ List.last(name_as_list) != "Uploader"
+ end)
+ end
+
+ @spec filters_list() :: [module()]
+ def filters_list do
+ {:ok, modules} = :application.get_key(:pleroma, :modules)
+
+ Enum.filter(modules, fn module ->
+ name_as_list = Module.split(module)
+
+ List.starts_with?(name_as_list, ["Pleroma", "Upload", "Filter"])
+ end)
+ end
+
+ @spec mrf_list() :: [module()]
+ def mrf_list do
+ {:ok, modules} = :application.get_key(:pleroma, :modules)
+
+ Enum.filter(modules, fn module ->
+ name_as_list = Module.split(module)
+
+ List.starts_with?(name_as_list, ["Pleroma", "Web", "ActivityPub", "MRF"]) and
+ length(name_as_list) > 4
+ end)
+ end
+
+ @spec richmedia_parsers() :: [module()]
+ def richmedia_parsers do
+ {:ok, modules} = :application.get_key(:pleroma, :modules)
+
+ Enum.filter(modules, fn module ->
+ name_as_list = Module.split(module)
+
+ List.starts_with?(name_as_list, ["Pleroma", "Web", "RichMedia", "Parsers"]) and
+ length(name_as_list) == 5
+ end)
+ end
+end
+
+defimpl Jason.Encoder, for: Tuple do
+ def encode(tuple, opts) do
+ Jason.Encode.list(Tuple.to_list(tuple), opts)
+ end
+end
+
+defimpl Jason.Encoder, for: [Regex, Function] do
+ def encode(term, opts) do
+ Jason.Encode.string(inspect(term), opts)
+ end
+end
+
+defimpl String.Chars, for: Regex do
+ def to_string(term) do
+ inspect(term)
+ end
+end
diff --git a/lib/pleroma/docs/json.ex b/lib/pleroma/docs/json.ex
new file mode 100644
index 000000000..18ba01d58
--- /dev/null
+++ b/lib/pleroma/docs/json.ex
@@ -0,0 +1,20 @@
+defmodule Pleroma.Docs.JSON do
+ @behaviour Pleroma.Docs.Generator
+
+ @spec process(keyword()) :: {:ok, String.t()}
+ def process(descriptions) do
+ config_path = "docs/generate_config.json"
+
+ with {:ok, file} <- File.open(config_path, [:write]),
+ json <- generate_json(descriptions),
+ :ok <- IO.write(file, json),
+ :ok <- File.close(file) do
+ {:ok, config_path}
+ end
+ end
+
+ @spec generate_json([keyword()]) :: String.t()
+ def generate_json(descriptions) do
+ Jason.encode!(descriptions)
+ end
+end
diff --git a/lib/pleroma/docs/markdown.ex b/lib/pleroma/docs/markdown.ex
new file mode 100644
index 000000000..8386dc2fb
--- /dev/null
+++ b/lib/pleroma/docs/markdown.ex
@@ -0,0 +1,78 @@
+defmodule Pleroma.Docs.Markdown do
+ @behaviour Pleroma.Docs.Generator
+
+ @spec process(keyword()) :: {:ok, String.t()}
+ def process(descriptions) do
+ config_path = "docs/generated_config.md"
+ {:ok, file} = File.open(config_path, [:utf8, :write])
+ IO.write(file, "# Generated configuration\n")
+ IO.write(file, "Date of generation: #{Date.utc_today()}\n\n")
+
+ IO.write(
+ file,
+ "This file describe the configuration, it is recommended to edit the relevant `*.secret.exs` file instead of the others founds in the ``config`` directory.\n\n" <>
+ "If you run Pleroma with ``MIX_ENV=prod`` the file is ``prod.secret.exs``, otherwise it is ``dev.secret.exs``.\n\n"
+ )
+
+ for group <- descriptions do
+ if is_nil(group[:key]) do
+ IO.write(file, "## #{inspect(group[:group])}\n")
+ else
+ IO.write(file, "## #{inspect(group[:key])}\n")
+ end
+
+ IO.write(file, "#{group[:description]}\n")
+
+ for child <- group[:children] do
+ print_child_header(file, child)
+
+ print_suggestions(file, child[:suggestions])
+
+ if child[:children] do
+ for subchild <- child[:children] do
+ print_child_header(file, subchild)
+
+ print_suggestions(file, subchild[:suggestions])
+ end
+ end
+ end
+
+ IO.write(file, "\n")
+ end
+
+ :ok = File.close(file)
+ {:ok, config_path}
+ end
+
+ defp print_suggestion(file, suggestion) when is_list(suggestion) do
+ IO.write(file, " `#{inspect(suggestion)}`\n")
+ end
+
+ defp print_suggestion(file, suggestion) when is_function(suggestion) do
+ IO.write(file, " `#{inspect(suggestion.())}`\n")
+ end
+
+ defp print_suggestion(file, suggestion, as_list \\ false) do
+ list_mark = if as_list, do: "- ", else: ""
+ IO.write(file, " #{list_mark}`#{inspect(suggestion)}`\n")
+ end
+
+ defp print_suggestions(_file, nil), do: nil
+
+ defp print_suggestions(file, suggestions) do
+ IO.write(file, "Suggestions:\n")
+
+ if length(suggestions) > 1 do
+ for suggestion <- suggestions do
+ print_suggestion(file, suggestion, true)
+ end
+ else
+ print_suggestion(file, List.first(suggestions))
+ end
+ end
+
+ defp print_child_header(file, child) do
+ IO.write(file, "- `#{inspect(child[:key])}` -`#{inspect(child[:type])}` \n")
+ IO.write(file, "#{child[:description]} \n")
+ end
+end
diff --git a/lib/pleroma/emails/mailer.ex b/lib/pleroma/emails/mailer.ex
index 2e4657b7c..eb96f2e8b 100644
--- a/lib/pleroma/emails/mailer.ex
+++ b/lib/pleroma/emails/mailer.ex
@@ -9,6 +9,7 @@ defmodule Pleroma.Emails.Mailer do
The module contains functions to delivery email using Swoosh.Mailer.
"""
+ alias Pleroma.Workers.MailerWorker
alias Swoosh.DeliveryError
@otp_app :pleroma
@@ -19,7 +20,12 @@ defmodule Pleroma.Emails.Mailer do
@doc "add email to queue"
def deliver_async(email, config \\ []) do
- PleromaJobQueue.enqueue(:mailer, __MODULE__, [:deliver_async, email, config])
+ encoded_email =
+ email
+ |> :erlang.term_to_binary()
+ |> Base.encode64()
+
+ MailerWorker.enqueue("email", %{"encoded_email" => encoded_email, "config" => config})
end
@doc "callback to perform send email from queue"
diff --git a/lib/healthcheck.ex b/lib/pleroma/healthcheck.ex
index f97d14432..977b78c26 100644
--- a/lib/healthcheck.ex
+++ b/lib/pleroma/healthcheck.ex
@@ -9,6 +9,7 @@ defmodule Pleroma.Healthcheck do
alias Pleroma.Healthcheck
alias Pleroma.Repo
+ @derive Jason.Encoder
defstruct pool_size: 0,
active: 0,
idle: 0,
diff --git a/lib/pleroma/instances/instance.ex b/lib/pleroma/instances/instance.ex
index 4d7ed4ca1..544c4b687 100644
--- a/lib/pleroma/instances/instance.ex
+++ b/lib/pleroma/instances/instance.ex
@@ -90,7 +90,7 @@ defmodule Pleroma.Instances.Instance do
def set_unreachable(url_or_host, unreachable_since \\ nil)
def set_unreachable(url_or_host, unreachable_since) when is_binary(url_or_host) do
- unreachable_since = unreachable_since || DateTime.utc_now()
+ unreachable_since = parse_datetime(unreachable_since) || NaiveDateTime.utc_now()
host = host(url_or_host)
existing_record = Repo.get_by(Instance, %{host: host})
@@ -114,4 +114,10 @@ defmodule Pleroma.Instances.Instance do
end
def set_unreachable(_, _), do: {:error, nil}
+
+ defp parse_datetime(datetime) when is_binary(datetime) do
+ NaiveDateTime.from_iso8601(datetime)
+ end
+
+ defp parse_datetime(datetime), do: datetime
end
diff --git a/lib/pleroma/notification.ex b/lib/pleroma/notification.ex
index b7c880c51..8012389ac 100644
--- a/lib/pleroma/notification.ex
+++ b/lib/pleroma/notification.ex
@@ -210,8 +210,10 @@ defmodule Pleroma.Notification do
unless skip?(activity, user) do
notification = %Notification{user_id: user.id, activity: activity}
{:ok, notification} = Repo.insert(notification)
- Streamer.stream("user", notification)
- Streamer.stream("user:notification", notification)
+
+ ["user", "user:notification"]
+ |> Streamer.stream(notification)
+
Push.send(notification)
notification
end
diff --git a/lib/pleroma/object.ex b/lib/pleroma/object.ex
index d58eb7f7d..5033798ae 100644
--- a/lib/pleroma/object.ex
+++ b/lib/pleroma/object.ex
@@ -130,14 +130,16 @@ defmodule Pleroma.Object do
def delete(%Object{data: %{"id" => id}} = object) do
with {:ok, _obj} = swap_object_with_tombstone(object),
deleted_activity = Activity.delete_by_ap_id(id),
- {:ok, true} <- Cachex.del(:object_cache, "object:#{id}") do
+ {:ok, true} <- Cachex.del(:object_cache, "object:#{id}"),
+ {:ok, _} <- Cachex.del(:web_resp_cache, URI.parse(id).path) do
{:ok, object, deleted_activity}
end
end
def prune(%Object{data: %{"id" => id}} = object) do
with {:ok, object} <- Repo.delete(object),
- {:ok, true} <- Cachex.del(:object_cache, "object:#{id}") do
+ {:ok, true} <- Cachex.del(:object_cache, "object:#{id}"),
+ {:ok, _} <- Cachex.del(:web_resp_cache, URI.parse(id).path) do
{:ok, object}
end
end
diff --git a/lib/pleroma/plugs/cache.ex b/lib/pleroma/plugs/cache.ex
new file mode 100644
index 000000000..50b534e7b
--- /dev/null
+++ b/lib/pleroma/plugs/cache.ex
@@ -0,0 +1,136 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Plugs.Cache do
+ @moduledoc """
+ Caches successful GET responses.
+
+ To enable the cache add the plug to a router pipeline or controller:
+
+ plug(Pleroma.Plugs.Cache)
+
+ ## Configuration
+
+ To configure the plug you need to pass settings as the second argument to the `plug/2` macro:
+
+ plug(Pleroma.Plugs.Cache, [ttl: nil, query_params: true])
+
+ Available options:
+
+ - `ttl`: An expiration time (time-to-live). This value should be in milliseconds or `nil` to disable expiration. Defaults to `nil`.
+ - `query_params`: Take URL query string into account (`true`), ignore it (`false`) or limit to specific params only (list). Defaults to `true`.
+ - `tracking_fun`: A function that is called on successfull responses, no matter if the request is cached or not. It should accept a conn as the first argument and the value assigned to `tracking_fun_data` as the second.
+
+ Additionally, you can overwrite the TTL inside a controller action by assigning `cache_ttl` to the connection struct:
+
+ def index(conn, _params) do
+ ttl = 60_000 # one minute
+
+ conn
+ |> assign(:cache_ttl, ttl)
+ |> render("index.html")
+ end
+
+ """
+
+ import Phoenix.Controller, only: [current_path: 1, json: 2]
+ import Plug.Conn
+
+ @behaviour Plug
+
+ @defaults %{ttl: nil, query_params: true}
+
+ @impl true
+ def init([]), do: @defaults
+
+ def init(opts) do
+ opts = Map.new(opts)
+ Map.merge(@defaults, opts)
+ end
+
+ @impl true
+ def call(%{method: "GET"} = conn, opts) do
+ key = cache_key(conn, opts)
+
+ case Cachex.get(:web_resp_cache, key) do
+ {:ok, nil} ->
+ cache_resp(conn, opts)
+
+ {:ok, {content_type, body, tracking_fun_data}} ->
+ conn = opts.tracking_fun.(conn, tracking_fun_data)
+
+ send_cached(conn, {content_type, body})
+
+ {:ok, record} ->
+ send_cached(conn, record)
+
+ {atom, message} when atom in [:ignore, :error] ->
+ render_error(conn, message)
+ end
+ end
+
+ def call(conn, _), do: conn
+
+ # full path including query params
+ defp cache_key(conn, %{query_params: true}), do: current_path(conn)
+
+ # request path without query params
+ defp cache_key(conn, %{query_params: false}), do: conn.request_path
+
+ # request path with specific query params
+ defp cache_key(conn, %{query_params: query_params}) when is_list(query_params) do
+ query_string =
+ conn.params
+ |> Map.take(query_params)
+ |> URI.encode_query()
+
+ conn.request_path <> "?" <> query_string
+ end
+
+ defp cache_resp(conn, opts) do
+ register_before_send(conn, fn
+ %{status: 200, resp_body: body} = conn ->
+ ttl = Map.get(conn.assigns, :cache_ttl, opts.ttl)
+ key = cache_key(conn, opts)
+ content_type = content_type(conn)
+
+ conn =
+ unless opts[:tracking_fun] do
+ Cachex.put(:web_resp_cache, key, {content_type, body}, ttl: ttl)
+ conn
+ else
+ tracking_fun_data = Map.get(conn.assigns, :tracking_fun_data, nil)
+ Cachex.put(:web_resp_cache, key, {content_type, body, tracking_fun_data}, ttl: ttl)
+
+ opts.tracking_fun.(conn, tracking_fun_data)
+ end
+
+ put_resp_header(conn, "x-cache", "MISS from Pleroma")
+
+ conn ->
+ conn
+ end)
+ end
+
+ defp content_type(conn) do
+ conn
+ |> Plug.Conn.get_resp_header("content-type")
+ |> hd()
+ end
+
+ defp send_cached(conn, {content_type, body}) do
+ conn
+ |> put_resp_content_type(content_type, nil)
+ |> put_resp_header("x-cache", "HIT from Pleroma")
+ |> send_resp(:ok, body)
+ |> halt()
+ end
+
+ defp render_error(conn, message) do
+ conn
+ |> put_status(:internal_server_error)
+ |> json(%{error: message})
+ |> halt()
+ end
+end
diff --git a/lib/pleroma/plugs/http_signature.ex b/lib/pleroma/plugs/http_signature.ex
index d87fa52fa..23d22a712 100644
--- a/lib/pleroma/plugs/http_signature.ex
+++ b/lib/pleroma/plugs/http_signature.ex
@@ -15,7 +15,8 @@ defmodule Pleroma.Web.Plugs.HTTPSignaturePlug do
end
def call(conn, _opts) do
- [signature | _] = get_req_header(conn, "signature")
+ headers = get_req_header(conn, "signature")
+ signature = Enum.at(headers, 0)
if signature do
# set (request-target) header to the appropriate value
diff --git a/lib/pleroma/plugs/trailing_format_plug.ex b/lib/pleroma/plugs/trailing_format_plug.ex
new file mode 100644
index 000000000..ce366b218
--- /dev/null
+++ b/lib/pleroma/plugs/trailing_format_plug.ex
@@ -0,0 +1,41 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Plugs.TrailingFormatPlug do
+ @moduledoc "Calls TrailingFormatPlug for specific paths. Ideally we would just do this in the router, but TrailingFormatPlug needs to be called before Plug.Parsers."
+
+ @behaviour Plug
+ @paths [
+ "/api/statusnet",
+ "/api/statuses",
+ "/api/qvitter",
+ "/api/search",
+ "/api/account",
+ "/api/friends",
+ "/api/mutes",
+ "/api/media",
+ "/api/favorites",
+ "/api/blocks",
+ "/api/friendships",
+ "/api/users",
+ "/users",
+ "/nodeinfo",
+ "/api/help",
+ "/api/externalprofile",
+ "/notice",
+ "/api/pleroma/emoji"
+ ]
+
+ def init(opts) do
+ TrailingFormatPlug.init(opts)
+ end
+
+ for path <- @paths do
+ def call(%{request_path: unquote(path) <> _} = conn, opts) do
+ TrailingFormatPlug.call(conn, opts)
+ end
+ end
+
+ def call(conn, _opts), do: conn
+end
diff --git a/lib/pleroma/scheduler.ex b/lib/pleroma/scheduler.ex
new file mode 100644
index 000000000..d84cd99ad
--- /dev/null
+++ b/lib/pleroma/scheduler.ex
@@ -0,0 +1,7 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Scheduler do
+ use Quantum.Scheduler, otp_app: :pleroma
+end
diff --git a/lib/pleroma/user.ex b/lib/pleroma/user.ex
index 29fd6d2ea..dd2b1c8c4 100644
--- a/lib/pleroma/user.ex
+++ b/lib/pleroma/user.ex
@@ -11,6 +11,7 @@ defmodule Pleroma.User do
alias Comeonin.Pbkdf2
alias Ecto.Multi
alias Pleroma.Activity
+ alias Pleroma.Delivery
alias Pleroma.Keys
alias Pleroma.Notification
alias Pleroma.Object
@@ -27,6 +28,7 @@ defmodule Pleroma.User do
alias Pleroma.Web.OStatus
alias Pleroma.Web.RelMe
alias Pleroma.Web.Websub
+ alias Pleroma.Workers.BackgroundWorker
require Logger
@@ -61,6 +63,7 @@ defmodule Pleroma.User do
field(:last_digest_emailed_at, :naive_datetime)
has_many(:notifications, Notification)
has_many(:registrations, Registration)
+ has_many(:deliveries, Delivery)
embeds_one(:info, User.Info)
timestamps()
@@ -174,11 +177,25 @@ defmodule Pleroma.User do
|> Repo.aggregate(:count, :id)
end
+ defp truncate_if_exists(params, key, max_length) do
+ if Map.has_key?(params, key) and is_binary(params[key]) do
+ {value, _chopped} = String.split_at(params[key], max_length)
+ Map.put(params, key, value)
+ else
+ params
+ end
+ end
+
def remote_user_creation(params) do
bio_limit = Pleroma.Config.get([:instance, :user_bio_length], 5000)
name_limit = Pleroma.Config.get([:instance, :user_name_length], 100)
- params = Map.put(params, :info, params[:info] || %{})
+ params =
+ params
+ |> Map.put(:info, params[:info] || %{})
+ |> truncate_if_exists(:name, name_limit)
+ |> truncate_if_exists(:bio, bio_limit)
+
info_cng = User.Info.remote_user_creation(%User.Info{}, params[:info])
changes =
@@ -569,8 +586,22 @@ defmodule Pleroma.User do
end)
end
- def get_cached_by_nickname_or_id(nickname_or_id) do
- get_cached_by_id(nickname_or_id) || get_cached_by_nickname(nickname_or_id)
+ def get_cached_by_nickname_or_id(nickname_or_id, opts \\ []) do
+ restrict_to_local = Pleroma.Config.get([:instance, :limit_to_local_content])
+
+ cond do
+ is_integer(nickname_or_id) or Pleroma.FlakeId.is_flake_id?(nickname_or_id) ->
+ get_cached_by_id(nickname_or_id) || get_cached_by_nickname(nickname_or_id)
+
+ restrict_to_local == false ->
+ get_cached_by_nickname(nickname_or_id)
+
+ restrict_to_local == :unauthenticated and match?(%User{}, opts[:for]) ->
+ get_cached_by_nickname(nickname_or_id)
+
+ true ->
+ nil
+ end
end
def get_by_nickname(nickname) do
@@ -619,8 +650,9 @@ defmodule Pleroma.User do
end
@doc "Fetch some posts when the user has just been federated with"
- def fetch_initial_posts(user),
- do: PleromaJobQueue.enqueue(:background, __MODULE__, [:fetch_initial_posts, user])
+ def fetch_initial_posts(user) do
+ BackgroundWorker.enqueue("fetch_initial_posts", %{"user_id" => user.id})
+ end
@spec get_followers_query(User.t(), pos_integer() | nil) :: Ecto.Query.t()
def get_followers_query(%User{} = user, nil) do
@@ -1050,7 +1082,7 @@ defmodule Pleroma.User do
end
def deactivate_async(user, status \\ true) do
- PleromaJobQueue.enqueue(:background, __MODULE__, [:deactivate_async, user, status])
+ BackgroundWorker.enqueue("deactivate_user", %{"user_id" => user.id, "status" => status})
end
def deactivate(%User{} = user, status \\ true) do
@@ -1078,9 +1110,9 @@ defmodule Pleroma.User do
|> update_and_set_cache()
end
- @spec delete(User.t()) :: :ok
- def delete(%User{} = user),
- do: PleromaJobQueue.enqueue(:background, __MODULE__, [:delete, user])
+ def delete(%User{} = user) do
+ BackgroundWorker.enqueue("delete_user", %{"user_id" => user.id})
+ end
@spec perform(atom(), User.t()) :: {:ok, User.t()}
def perform(:delete, %User{} = user) do
@@ -1187,25 +1219,24 @@ defmodule Pleroma.User do
Repo.all(query)
end
- def blocks_import(%User{} = blocker, blocked_identifiers) when is_list(blocked_identifiers),
- do:
- PleromaJobQueue.enqueue(:background, __MODULE__, [
- :blocks_import,
- blocker,
- blocked_identifiers
- ])
+ def blocks_import(%User{} = blocker, blocked_identifiers) when is_list(blocked_identifiers) do
+ BackgroundWorker.enqueue("blocks_import", %{
+ "blocker_id" => blocker.id,
+ "blocked_identifiers" => blocked_identifiers
+ })
+ end
- def follow_import(%User{} = follower, followed_identifiers) when is_list(followed_identifiers),
- do:
- PleromaJobQueue.enqueue(:background, __MODULE__, [
- :follow_import,
- follower,
- followed_identifiers
- ])
+ def follow_import(%User{} = follower, followed_identifiers)
+ when is_list(followed_identifiers) do
+ BackgroundWorker.enqueue("follow_import", %{
+ "follower_id" => follower.id,
+ "followed_identifiers" => followed_identifiers
+ })
+ end
def delete_user_activities(%User{ap_id: ap_id} = user) do
ap_id
- |> Activity.query_by_actor()
+ |> Activity.Queries.by_actor()
|> RepoStreamer.chunk_stream(50)
|> Stream.each(fn activities ->
Enum.each(activities, &delete_activity(&1))
@@ -1610,4 +1641,25 @@ defmodule Pleroma.User do
def is_internal_user?(%User{nickname: nil}), do: true
def is_internal_user?(%User{local: true, nickname: "internal." <> _}), do: true
def is_internal_user?(_), do: false
+
+ # A hack because user delete activities have a fake id for whatever reason
+ # TODO: Get rid of this
+ def get_delivered_users_by_object_id("pleroma:fake_object_id"), do: []
+
+ def get_delivered_users_by_object_id(object_id) do
+ from(u in User,
+ inner_join: delivery in assoc(u, :deliveries),
+ where: delivery.object_id == ^object_id
+ )
+ |> Repo.all()
+ end
+
+ def change_email(user, email) do
+ user
+ |> cast(%{email: email}, [:email])
+ |> validate_required([:email])
+ |> unique_constraint(:email)
+ |> validate_format(:email, @email_regex)
+ |> update_and_set_cache()
+ end
end
diff --git a/lib/pleroma/user/info.ex b/lib/pleroma/user/info.ex
index 779bfbc18..151e025de 100644
--- a/lib/pleroma/user/info.ex
+++ b/lib/pleroma/user/info.ex
@@ -242,6 +242,13 @@ defmodule Pleroma.User.Info do
end
def remote_user_creation(info, params) do
+ params =
+ if Map.has_key?(params, :fields) do
+ Map.put(params, :fields, Enum.map(params[:fields], &truncate_field/1))
+ else
+ params
+ end
+
info
|> cast(params, [
:ap_enabled,
@@ -326,6 +333,16 @@ defmodule Pleroma.User.Info do
defp valid_field?(_), do: false
+ defp truncate_field(%{"name" => name, "value" => value}) do
+ {name, _chopped} =
+ String.split_at(name, Pleroma.Config.get([:instance, :account_field_name_length], 255))
+
+ {value, _chopped} =
+ String.split_at(value, Pleroma.Config.get([:instance, :account_field_value_length], 255))
+
+ %{"name" => name, "value" => value}
+ end
+
@spec confirmation_changeset(Info.t(), keyword()) :: Changeset.t()
def confirmation_changeset(info, opts) do
need_confirmation? = Keyword.get(opts, :need_confirmation)
diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex
index eeb826814..bc5ae7fbf 100644
--- a/lib/pleroma/web/activity_pub/activity_pub.ex
+++ b/lib/pleroma/web/activity_pub/activity_pub.ex
@@ -4,6 +4,7 @@
defmodule Pleroma.Web.ActivityPub.ActivityPub do
alias Pleroma.Activity
+ alias Pleroma.Activity.Ir.Topics
alias Pleroma.Config
alias Pleroma.Conversation
alias Pleroma.Notification
@@ -16,7 +17,9 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
alias Pleroma.User
alias Pleroma.Web.ActivityPub.MRF
alias Pleroma.Web.ActivityPub.Transmogrifier
+ alias Pleroma.Web.Streamer
alias Pleroma.Web.WebFinger
+ alias Pleroma.Workers.BackgroundWorker
import Ecto.Query
import Pleroma.Web.ActivityPub.Utils
@@ -145,7 +148,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
activity
end
- PleromaJobQueue.enqueue(:background, Pleroma.Web.RichMedia.Helpers, [:fetch, activity])
+ BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
Notification.create_notifications(activity)
@@ -186,9 +189,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
participations
|> Repo.preload(:user)
- Enum.each(participations, fn participation ->
- Pleroma.Web.Streamer.stream("participation", participation)
- end)
+ Streamer.stream("participation", participations)
end
def stream_out_participations(%Object{data: %{"context" => context}}, user) do
@@ -207,41 +208,15 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
def stream_out_participations(_, _), do: :noop
- def stream_out(activity) do
- if activity.data["type"] in ["Create", "Announce", "Delete"] do
- object = Object.normalize(activity)
- # Do not stream out poll replies
- unless object.data["type"] == "Answer" do
- Pleroma.Web.Streamer.stream("user", activity)
- Pleroma.Web.Streamer.stream("list", activity)
-
- if get_visibility(activity) == "public" do
- Pleroma.Web.Streamer.stream("public", activity)
-
- if activity.local do
- Pleroma.Web.Streamer.stream("public:local", activity)
- end
-
- if activity.data["type"] in ["Create"] do
- object.data
- |> Map.get("tag", [])
- |> Enum.filter(fn tag -> is_bitstring(tag) end)
- |> Enum.each(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end)
-
- if object.data["attachment"] != [] do
- Pleroma.Web.Streamer.stream("public:media", activity)
-
- if activity.local do
- Pleroma.Web.Streamer.stream("public:local:media", activity)
- end
- end
- end
- else
- if get_visibility(activity) == "direct",
- do: Pleroma.Web.Streamer.stream("direct", activity)
- end
- end
- end
+ def stream_out(%Activity{data: %{"type" => data_type}} = activity)
+ when data_type in ["Create", "Announce", "Delete"] do
+ activity
+ |> Topics.get_activity_topics()
+ |> Streamer.stream(activity)
+ end
+
+ def stream_out(_activity) do
+ :noop
end
def create(%{to: to, actor: actor, context: context, object: object} = params, fake \\ false) do
@@ -796,7 +771,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
)
unless opts["skip_preload"] do
- from([thread_mute: tm] in query, where: is_nil(tm))
+ from([thread_mute: tm] in query, where: is_nil(tm.user_id))
else
query
end
diff --git a/lib/pleroma/web/activity_pub/activity_pub_controller.ex b/lib/pleroma/web/activity_pub/activity_pub_controller.ex
index 08bf1c752..01b34fb1d 100644
--- a/lib/pleroma/web/activity_pub/activity_pub_controller.ex
+++ b/lib/pleroma/web/activity_pub/activity_pub_controller.ex
@@ -6,6 +6,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do
use Pleroma.Web, :controller
alias Pleroma.Activity
+ alias Pleroma.Delivery
alias Pleroma.Object
alias Pleroma.Object.Fetcher
alias Pleroma.User
@@ -23,6 +24,12 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do
action_fallback(:errors)
+ plug(
+ Pleroma.Plugs.Cache,
+ [query_params: false, tracking_fun: &__MODULE__.track_object_fetch/2]
+ when action in [:activity, :object]
+ )
+
plug(Pleroma.Web.FederatingPlug when action in [:inbox, :relay])
plug(:set_requester_reachable when action in [:inbox])
plug(:relay_active? when action in [:relay])
@@ -53,14 +60,27 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do
%Object{} = object <- Object.get_cached_by_ap_id(ap_id),
{_, true} <- {:public?, Visibility.is_public?(object)} do
conn
+ |> assign(:tracking_fun_data, object.id)
+ |> set_cache_ttl_for(object)
|> put_resp_content_type("application/activity+json")
- |> json(ObjectView.render("object.json", %{object: object}))
+ |> put_view(ObjectView)
+ |> render("object.json", object: object)
else
{:public?, false} ->
{:error, :not_found}
end
end
+ def track_object_fetch(conn, nil), do: conn
+
+ def track_object_fetch(conn, object_id) do
+ with %{assigns: %{user: %User{id: user_id}}} <- conn do
+ Delivery.create(object_id, user_id)
+ end
+
+ conn
+ end
+
def object_likes(conn, %{"uuid" => uuid, "page" => page}) do
with ap_id <- o_status_url(conn, :object, uuid),
%Object{} = object <- Object.get_cached_by_ap_id(ap_id),
@@ -96,14 +116,44 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do
%Activity{} = activity <- Activity.normalize(ap_id),
{_, true} <- {:public?, Visibility.is_public?(activity)} do
conn
+ |> maybe_set_tracking_data(activity)
+ |> set_cache_ttl_for(activity)
|> put_resp_content_type("application/activity+json")
- |> json(ObjectView.render("object.json", %{object: activity}))
+ |> put_view(ObjectView)
+ |> render("object.json", object: activity)
else
- {:public?, false} ->
- {:error, :not_found}
+ {:public?, false} -> {:error, :not_found}
+ nil -> {:error, :not_found}
end
end
+ defp maybe_set_tracking_data(conn, %Activity{data: %{"type" => "Create"}} = activity) do
+ object_id = Object.normalize(activity).id
+ assign(conn, :tracking_fun_data, object_id)
+ end
+
+ defp maybe_set_tracking_data(conn, _activity), do: conn
+
+ defp set_cache_ttl_for(conn, %Activity{object: object}) do
+ set_cache_ttl_for(conn, object)
+ end
+
+ defp set_cache_ttl_for(conn, entity) do
+ ttl =
+ case entity do
+ %Object{data: %{"type" => "Question"}} ->
+ Pleroma.Config.get([:web_cache_ttl, :activity_pub_question])
+
+ %Object{} ->
+ Pleroma.Config.get([:web_cache_ttl, :activity_pub])
+
+ _ ->
+ nil
+ end
+
+ assign(conn, :cache_ttl, ttl)
+ end
+
# GET /relay/following
def following(%{assigns: %{relay: true}} = conn, _params) do
conn
@@ -251,22 +301,36 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do
def whoami(_conn, _params), do: {:error, :not_found}
- def read_inbox(%{assigns: %{user: user}} = conn, %{"nickname" => nickname} = params) do
- if nickname == user.nickname do
- conn
- |> put_resp_content_type("application/activity+json")
- |> json(UserView.render("inbox.json", %{user: user, max_id: params["max_id"]}))
- else
- err =
- dgettext("errors", "can't read inbox of %{nickname} as %{as_nickname}",
- nickname: nickname,
- as_nickname: user.nickname
- )
+ def read_inbox(
+ %{assigns: %{user: %{nickname: nickname} = user}} = conn,
+ %{"nickname" => nickname} = params
+ ) do
+ conn
+ |> put_resp_content_type("application/activity+json")
+ |> put_view(UserView)
+ |> render("inbox.json", user: user, max_id: params["max_id"])
+ end
- conn
- |> put_status(:forbidden)
- |> json(err)
- end
+ def read_inbox(%{assigns: %{user: nil}} = conn, %{"nickname" => nickname}) do
+ err = dgettext("errors", "can't read inbox of %{nickname}", nickname: nickname)
+
+ conn
+ |> put_status(:forbidden)
+ |> json(err)
+ end
+
+ def read_inbox(%{assigns: %{user: %{nickname: as_nickname}}} = conn, %{
+ "nickname" => nickname
+ }) do
+ err =
+ dgettext("errors", "can't read inbox of %{nickname} as %{as_nickname}",
+ nickname: nickname,
+ as_nickname: as_nickname
+ )
+
+ conn
+ |> put_status(:forbidden)
+ |> json(err)
end
def handle_user_activity(user, %{"type" => "Create"} = params) do
diff --git a/lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex b/lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex
index a179dd54d..26b8539fe 100644
--- a/lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex
+++ b/lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex
@@ -8,6 +8,7 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do
alias Pleroma.HTTP
alias Pleroma.Web.MediaProxy
+ alias Pleroma.Workers.BackgroundWorker
require Logger
@@ -30,7 +31,7 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do
url
|> Enum.each(fn
%{"href" => href} ->
- PleromaJobQueue.enqueue(:background, __MODULE__, [:prefetch, href])
+ BackgroundWorker.enqueue("media_proxy_prefetch", %{"url" => href})
x ->
Logger.debug("Unhandled attachment URL object #{inspect(x)}")
@@ -46,7 +47,7 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do
%{"type" => "Create", "object" => %{"attachment" => attachments} = _object} = message
)
when is_list(attachments) and length(attachments) > 0 do
- PleromaJobQueue.enqueue(:background, __MODULE__, [:preload, message])
+ BackgroundWorker.enqueue("media_proxy_preload", %{"message" => message})
{:ok, message}
end
diff --git a/lib/pleroma/web/activity_pub/publisher.ex b/lib/pleroma/web/activity_pub/publisher.ex
index c97405690..114251b24 100644
--- a/lib/pleroma/web/activity_pub/publisher.ex
+++ b/lib/pleroma/web/activity_pub/publisher.ex
@@ -5,8 +5,10 @@
defmodule Pleroma.Web.ActivityPub.Publisher do
alias Pleroma.Activity
alias Pleroma.Config
+ alias Pleroma.Delivery
alias Pleroma.HTTP
alias Pleroma.Instances
+ alias Pleroma.Object
alias Pleroma.User
alias Pleroma.Web.ActivityPub.Relay
alias Pleroma.Web.ActivityPub.Transmogrifier
@@ -84,6 +86,15 @@ defmodule Pleroma.Web.ActivityPub.Publisher do
end
end
+ def publish_one(%{actor_id: actor_id} = params) do
+ actor = User.get_cached_by_id(actor_id)
+
+ params
+ |> Map.delete(:actor_id)
+ |> Map.put(:actor, actor)
+ |> publish_one()
+ end
+
defp should_federate?(inbox, public) do
if public do
true
@@ -107,7 +118,18 @@ defmodule Pleroma.Web.ActivityPub.Publisher do
{:ok, []}
end
- Pleroma.Web.Salmon.remote_users(actor, activity) ++ followers
+ fetchers =
+ with %Activity{data: %{"type" => "Delete"}} <- activity,
+ %Object{id: object_id} <- Object.normalize(activity),
+ fetchers <- User.get_delivered_users_by_object_id(object_id),
+ _ <- Delivery.delete_all_by_object_id(object_id) do
+ fetchers
+ else
+ _ ->
+ []
+ end
+
+ Pleroma.Web.Salmon.remote_users(actor, activity) ++ followers ++ fetchers
end
defp get_cc_ap_ids(ap_id, recipients) do
@@ -159,7 +181,8 @@ defmodule Pleroma.Web.ActivityPub.Publisher do
Publishes an activity with BCC to all relevant peers.
"""
- def publish(actor, %{data: %{"bcc" => bcc}} = activity) when is_list(bcc) and bcc != [] do
+ def publish(%User{} = actor, %{data: %{"bcc" => bcc}} = activity)
+ when is_list(bcc) and bcc != [] do
public = is_public?(activity)
{:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
@@ -186,7 +209,7 @@ defmodule Pleroma.Web.ActivityPub.Publisher do
Pleroma.Web.Federator.Publisher.enqueue_one(__MODULE__, %{
inbox: inbox,
json: json,
- actor: actor,
+ actor_id: actor.id,
id: activity.data["id"],
unreachable_since: unreachable_since
})
@@ -221,7 +244,7 @@ defmodule Pleroma.Web.ActivityPub.Publisher do
%{
inbox: inbox,
json: json,
- actor: actor,
+ actor_id: actor.id,
id: activity.data["id"],
unreachable_since: unreachable_since
}
diff --git a/lib/pleroma/web/activity_pub/transmogrifier.ex b/lib/pleroma/web/activity_pub/transmogrifier.ex
index 468961bd0..acb3087d0 100644
--- a/lib/pleroma/web/activity_pub/transmogrifier.ex
+++ b/lib/pleroma/web/activity_pub/transmogrifier.ex
@@ -15,6 +15,7 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do
alias Pleroma.Web.ActivityPub.Utils
alias Pleroma.Web.ActivityPub.Visibility
alias Pleroma.Web.Federator
+ alias Pleroma.Workers.TransmogrifierWorker
import Ecto.Query
@@ -185,12 +186,12 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do
|> Map.put("context", replied_object.data["context"] || object["conversation"])
else
e ->
- Logger.error("Couldn't fetch \"#{inspect(in_reply_to_id)}\", error: #{inspect(e)}")
+ Logger.error("Couldn't fetch #{inspect(in_reply_to_id)}, error: #{inspect(e)}")
object
end
e ->
- Logger.error("Couldn't fetch \"#{inspect(in_reply_to_id)}\", error: #{inspect(e)}")
+ Logger.error("Couldn't fetch #{inspect(in_reply_to_id)}, error: #{inspect(e)}")
object
end
else
@@ -1051,7 +1052,7 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do
already_ap <- User.ap_enabled?(user),
{:ok, user} <- user |> User.upgrade_changeset(data) |> User.update_and_set_cache() do
unless already_ap do
- PleromaJobQueue.enqueue(:transmogrifier, __MODULE__, [:user_upgrade, user])
+ TransmogrifierWorker.enqueue("user_upgrade", %{"user_id" => user.id})
end
{:ok, user}
diff --git a/lib/pleroma/web/activity_pub/utils.ex b/lib/pleroma/web/activity_pub/utils.ex
index c9c0c3763..258e56066 100644
--- a/lib/pleroma/web/activity_pub/utils.ex
+++ b/lib/pleroma/web/activity_pub/utils.ex
@@ -85,15 +85,13 @@ defmodule Pleroma.Web.ActivityPub.Utils do
defp extract_list(_), do: []
def maybe_splice_recipient(ap_id, params) do
- need_splice =
+ need_splice? =
!recipient_in_collection(ap_id, params["to"]) &&
!recipient_in_collection(ap_id, params["cc"])
- cc_list = extract_list(params["cc"])
-
- if need_splice do
- params
- |> Map.put("cc", [ap_id | cc_list])
+ if need_splice? do
+ cc_list = extract_list(params["cc"])
+ Map.put(params, "cc", [ap_id | cc_list])
else
params
end
@@ -139,7 +137,7 @@ defmodule Pleroma.Web.ActivityPub.Utils do
"object" => object
}
- Notification.get_notified_from_activity(%Activity{data: fake_create_activity}, false)
+ get_notified_from_object(fake_create_activity)
end
def get_notified_from_object(object) do
@@ -169,14 +167,7 @@ defmodule Pleroma.Web.ActivityPub.Utils do
@spec maybe_federate(any()) :: :ok
def maybe_federate(%Activity{local: true} = activity) do
if Pleroma.Config.get!([:instance, :federating]) do
- priority =
- case activity.data["type"] do
- "Delete" -> 10
- "Create" -> 1
- _ -> 5
- end
-
- Pleroma.Web.Federator.publish(activity, priority)
+ Pleroma.Web.Federator.publish(activity)
end
:ok
@@ -188,9 +179,9 @@ defmodule Pleroma.Web.ActivityPub.Utils do
Adds an id and a published data if they aren't there,
also adds it to an included object
"""
- def lazy_put_activity_defaults(map, fake \\ false) do
+ def lazy_put_activity_defaults(map, fake? \\ false) do
map =
- unless fake do
+ if not fake? do
%{data: %{"id" => context}, id: context_id} = create_context(map["context"])
map
@@ -207,7 +198,7 @@ defmodule Pleroma.Web.ActivityPub.Utils do
end
if is_map(map["object"]) do
- object = lazy_put_object_defaults(map["object"], map, fake)
+ object = lazy_put_object_defaults(map["object"], map, fake?)
%{map | "object" => object}
else
map
@@ -217,9 +208,9 @@ defmodule Pleroma.Web.ActivityPub.Utils do
@doc """
Adds an id and published date if they aren't there.
"""
- def lazy_put_object_defaults(map, activity \\ %{}, fake)
+ def lazy_put_object_defaults(map, activity \\ %{}, fake?)
- def lazy_put_object_defaults(map, activity, true = _fake) do
+ def lazy_put_object_defaults(map, activity, true = _fake?) do
map
|> Map.put_new_lazy("published", &make_date/0)
|> Map.put_new("id", "pleroma:fake_object_id")
@@ -228,7 +219,7 @@ defmodule Pleroma.Web.ActivityPub.Utils do
|> Map.put_new("context_id", activity["context_id"])
end
- def lazy_put_object_defaults(map, activity, _fake) do
+ def lazy_put_object_defaults(map, activity, _fake?) do
map
|> Map.put_new_lazy("id", &generate_object_id/0)
|> Map.put_new_lazy("published", &make_date/0)
@@ -242,9 +233,7 @@ defmodule Pleroma.Web.ActivityPub.Utils do
def insert_full_object(%{"object" => %{"type" => type} = object_data} = map)
when is_map(object_data) and type in @supported_object_types do
with {:ok, object} <- Object.create(object_data) do
- map =
- map
- |> Map.put("object", object.data["id"])
+ map = Map.put(map, "object", object.data["id"])
{:ok, map, object}
end
@@ -263,7 +252,7 @@ defmodule Pleroma.Web.ActivityPub.Utils do
|> Activity.Queries.by_actor()
|> Activity.Queries.by_object_id(id)
|> Activity.Queries.by_type("Like")
- |> Activity.Queries.limit(1)
+ |> limit(1)
|> Repo.one()
end
@@ -380,12 +369,11 @@ defmodule Pleroma.Web.ActivityPub.Utils do
%Activity{data: %{"actor" => actor, "object" => object}} = activity,
state
) do
- with new_data <-
- activity.data
- |> Map.put("state", state),
- changeset <- Changeset.change(activity, data: new_data),
- {:ok, activity} <- Repo.update(changeset),
- _ <- User.set_follow_state_cache(actor, object, state) do
+ new_data = Map.put(activity.data, "state", state)
+ changeset = Changeset.change(activity, data: new_data)
+
+ with {:ok, activity} <- Repo.update(changeset) do
+ User.set_follow_state_cache(actor, object, state)
{:ok, activity}
end
end
@@ -410,28 +398,14 @@ defmodule Pleroma.Web.ActivityPub.Utils do
end
def fetch_latest_follow(%User{ap_id: follower_id}, %User{ap_id: followed_id}) do
- query =
- from(
- activity in Activity,
- where:
- fragment(
- "? ->> 'type' = 'Follow'",
- activity.data
- ),
- where: activity.actor == ^follower_id,
- # this is to use the index
- where:
- fragment(
- "coalesce((?)->'object'->>'id', (?)->>'object') = ?",
- activity.data,
- activity.data,
- ^followed_id
- ),
- order_by: [fragment("? desc nulls last", activity.id)],
- limit: 1
- )
-
- Repo.one(query)
+ "Follow"
+ |> Activity.Queries.by_type()
+ |> where(actor: ^follower_id)
+ # this is to use the index
+ |> Activity.Queries.by_object_id(followed_id)
+ |> order_by([activity], fragment("? desc nulls last", activity.id))
+ |> limit(1)
+ |> Repo.one()
end
#### Announce-related helpers
@@ -439,23 +413,13 @@ defmodule Pleroma.Web.ActivityPub.Utils do
@doc """
Retruns an existing announce activity if the notice has already been announced
"""
- def get_existing_announce(actor, %{data: %{"id" => id}}) do
- query =
- from(
- activity in Activity,
- where: activity.actor == ^actor,
- # this is to use the index
- where:
- fragment(
- "coalesce((?)->'object'->>'id', (?)->>'object') = ?",
- activity.data,
- activity.data,
- ^id
- ),
- where: fragment("(?)->>'type' = 'Announce'", activity.data)
- )
-
- Repo.one(query)
+ def get_existing_announce(actor, %{data: %{"id" => ap_id}}) do
+ "Announce"
+ |> Activity.Queries.by_type()
+ |> where(actor: ^actor)
+ # this is to use the index
+ |> Activity.Queries.by_object_id(ap_id)
+ |> Repo.one()
end
@doc """
@@ -538,11 +502,13 @@ defmodule Pleroma.Web.ActivityPub.Utils do
object
) do
announcements =
- if is_list(object.data["announcements"]), do: object.data["announcements"], else: []
+ if is_list(object.data["announcements"]) do
+ Enum.uniq([actor | object.data["announcements"]])
+ else
+ [actor]
+ end
- with announcements <- [actor | announcements] |> Enum.uniq() do
- update_element_in_object("announcement", announcements, object)
- end
+ update_element_in_object("announcement", announcements, object)
end
def add_announce_to_object(_, object), do: {:ok, object}
@@ -570,28 +536,14 @@ defmodule Pleroma.Web.ActivityPub.Utils do
#### Block-related helpers
def fetch_latest_block(%User{ap_id: blocker_id}, %User{ap_id: blocked_id}) do
- query =
- from(
- activity in Activity,
- where:
- fragment(
- "? ->> 'type' = 'Block'",
- activity.data
- ),
- where: activity.actor == ^blocker_id,
- # this is to use the index
- where:
- fragment(
- "coalesce((?)->'object'->>'id', (?)->>'object') = ?",
- activity.data,
- activity.data,
- ^blocked_id
- ),
- order_by: [fragment("? desc nulls last", activity.id)],
- limit: 1
- )
-
- Repo.one(query)
+ "Block"
+ |> Activity.Queries.by_type()
+ |> where(actor: ^blocker_id)
+ # this is to use the index
+ |> Activity.Queries.by_object_id(blocked_id)
+ |> order_by([activity], fragment("? desc nulls last", activity.id))
+ |> limit(1)
+ |> Repo.one()
end
def make_block_data(blocker, blocked, activity_id) do
@@ -695,11 +647,11 @@ defmodule Pleroma.Web.ActivityPub.Utils do
#### Report-related helpers
def update_report_state(%Activity{} = activity, state) when state in @supported_report_states do
- with new_data <- Map.put(activity.data, "state", state),
- changeset <- Changeset.change(activity, data: new_data),
- {:ok, activity} <- Repo.update(changeset) do
- {:ok, activity}
- end
+ new_data = Map.put(activity.data, "state", state)
+
+ activity
+ |> Changeset.change(data: new_data)
+ |> Repo.update()
end
def update_report_state(_, _), do: {:error, "Unsupported state"}
@@ -766,21 +718,13 @@ defmodule Pleroma.Web.ActivityPub.Utils do
end
def get_existing_votes(actor, %{data: %{"id" => id}}) do
- query =
- from(
- [activity, object: object] in Activity.with_preloaded_object(Activity),
- where: fragment("(?)->>'type' = 'Create'", activity.data),
- where: fragment("(?)->>'actor' = ?", activity.data, ^actor),
- where:
- fragment(
- "(?)->>'inReplyTo' = ?",
- object.data,
- ^to_string(id)
- ),
- where: fragment("(?)->>'type' = 'Answer'", object.data)
- )
-
- Repo.all(query)
+ actor
+ |> Activity.Queries.by_actor()
+ |> Activity.Queries.by_type("Create")
+ |> Activity.with_preloaded_object()
+ |> where([a, object: o], fragment("(?)->>'inReplyTo' = ?", o.data, ^to_string(id)))
+ |> where([a, object: o], fragment("(?)->>'type' = 'Answer'", o.data))
+ |> Repo.all()
end
defp maybe_put(map, _key, nil), do: map
diff --git a/lib/pleroma/web/admin_api/config.ex b/lib/pleroma/web/admin_api/config.ex
index a10cc779b..1917a5580 100644
--- a/lib/pleroma/web/admin_api/config.ex
+++ b/lib/pleroma/web/admin_api/config.ex
@@ -90,6 +90,8 @@ defmodule Pleroma.Web.AdminAPI.Config do
for v <- entity, into: [], do: do_convert(v)
end
+ defp do_convert(%Regex{} = entity), do: inspect(entity)
+
defp do_convert(entity) when is_map(entity) do
for {k, v} <- entity, into: %{}, do: {do_convert(k), do_convert(v)}
end
@@ -122,7 +124,7 @@ defmodule Pleroma.Web.AdminAPI.Config do
def transform(entity), do: :erlang.term_to_binary(entity)
- defp do_transform(%Regex{} = entity) when is_map(entity), do: entity
+ defp do_transform(%Regex{} = entity), do: entity
defp do_transform(%{"tuple" => [":dispatch", [entity]]}) do
{dispatch_settings, []} = do_eval(entity)
@@ -154,8 +156,15 @@ defmodule Pleroma.Web.AdminAPI.Config do
defp do_transform(entity), do: entity
defp do_transform_string("~r/" <> pattern) do
- pattern = String.trim_trailing(pattern, "/")
- ~r/#{pattern}/
+ modificator = String.split(pattern, "/") |> List.last()
+ pattern = String.trim_trailing(pattern, "/" <> modificator)
+
+ case modificator do
+ "" -> ~r/#{pattern}/
+ "i" -> ~r/#{pattern}/i
+ "u" -> ~r/#{pattern}/u
+ "s" -> ~r/#{pattern}/s
+ end
end
defp do_transform_string(":" <> atom), do: String.to_atom(atom)
diff --git a/lib/pleroma/web/controller_helper.ex b/lib/pleroma/web/controller_helper.ex
index eeac9f503..b53a01955 100644
--- a/lib/pleroma/web/controller_helper.ex
+++ b/lib/pleroma/web/controller_helper.ex
@@ -34,79 +34,38 @@ defmodule Pleroma.Web.ControllerHelper do
defp param_to_integer(_, default), do: default
- def add_link_headers(
- conn,
- method,
- activities,
- param \\ nil,
- params \\ %{},
- func3 \\ nil,
- func4 \\ nil
- ) do
- params =
- conn.params
- |> Map.drop(["since_id", "max_id", "min_id"])
- |> Map.merge(params)
+ def add_link_headers(conn, activities, extra_params \\ %{}) do
+ case List.last(activities) do
+ %{id: max_id} ->
+ params =
+ conn.params
+ |> Map.drop(Map.keys(conn.path_params))
+ |> Map.drop(["since_id", "max_id", "min_id"])
+ |> Map.merge(extra_params)
- last = List.last(activities)
+ limit =
+ params
+ |> Map.get("limit", "20")
+ |> String.to_integer()
- func3 = func3 || (&mastodon_api_url/3)
- func4 = func4 || (&mastodon_api_url/4)
+ min_id =
+ if length(activities) <= limit do
+ activities
+ |> List.first()
+ |> Map.get(:id)
+ else
+ activities
+ |> Enum.at(limit * -1)
+ |> Map.get(:id)
+ end
- if last do
- max_id = last.id
+ next_url = current_url(conn, Map.merge(params, %{max_id: max_id}))
+ prev_url = current_url(conn, Map.merge(params, %{min_id: min_id}))
- limit =
- params
- |> Map.get("limit", "20")
- |> String.to_integer()
+ put_resp_header(conn, "link", "<#{next_url}>; rel=\"next\", <#{prev_url}>; rel=\"prev\"")
- min_id =
- if length(activities) <= limit do
- activities
- |> List.first()
- |> Map.get(:id)
- else
- activities
- |> Enum.at(limit * -1)
- |> Map.get(:id)
- end
-
- {next_url, prev_url} =
- if param do
- {
- func4.(
- Pleroma.Web.Endpoint,
- method,
- param,
- Map.merge(params, %{max_id: max_id})
- ),
- func4.(
- Pleroma.Web.Endpoint,
- method,
- param,
- Map.merge(params, %{min_id: min_id})
- )
- }
- else
- {
- func3.(
- Pleroma.Web.Endpoint,
- method,
- Map.merge(params, %{max_id: max_id})
- ),
- func3.(
- Pleroma.Web.Endpoint,
- method,
- Map.merge(params, %{min_id: min_id})
- )
- }
- end
-
- conn
- |> put_resp_header("link", "<#{next_url}>; rel=\"next\", <#{prev_url}>; rel=\"prev\"")
- else
- conn
+ _ ->
+ conn
end
end
end
diff --git a/lib/pleroma/web/endpoint.ex b/lib/pleroma/web/endpoint.ex
index c123530dc..eb805e853 100644
--- a/lib/pleroma/web/endpoint.ex
+++ b/lib/pleroma/web/endpoint.ex
@@ -57,7 +57,7 @@ defmodule Pleroma.Web.Endpoint do
plug(Phoenix.CodeReloader)
end
- plug(TrailingFormatPlug)
+ plug(Pleroma.Plugs.TrailingFormatPlug)
plug(Plug.RequestId)
plug(Plug.Logger)
diff --git a/lib/pleroma/web/federator/federator.ex b/lib/pleroma/web/federator/federator.ex
index f4f9e83e0..1a2da014a 100644
--- a/lib/pleroma/web/federator/federator.ex
+++ b/lib/pleroma/web/federator/federator.ex
@@ -10,16 +10,17 @@ defmodule Pleroma.Web.Federator do
alias Pleroma.Web.ActivityPub.Transmogrifier
alias Pleroma.Web.ActivityPub.Utils
alias Pleroma.Web.Federator.Publisher
- alias Pleroma.Web.Federator.RetryQueue
alias Pleroma.Web.OStatus
alias Pleroma.Web.Websub
+ alias Pleroma.Workers.PublisherWorker
+ alias Pleroma.Workers.ReceiverWorker
+ alias Pleroma.Workers.SubscriberWorker
require Logger
def init do
- # 1 minute
- Process.sleep(1000 * 60)
- refresh_subscriptions()
+ # To do: consider removing this call in favor of scheduled execution (`quantum`-based)
+ refresh_subscriptions(schedule_in: 60)
end
@doc "Addresses [memory leaks on recursive replies fetching](https://git.pleroma.social/pleroma/pleroma/issues/161)"
@@ -37,50 +38,38 @@ defmodule Pleroma.Web.Federator do
# Client API
def incoming_doc(doc) do
- PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_doc, doc])
+ ReceiverWorker.enqueue("incoming_doc", %{"body" => doc})
end
def incoming_ap_doc(params) do
- PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_ap_doc, params])
+ ReceiverWorker.enqueue("incoming_ap_doc", %{"params" => params})
end
- def publish(activity, priority \\ 1) do
- PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish, activity], priority)
+ def publish(%{id: "pleroma:fakeid"} = activity) do
+ perform(:publish, activity)
end
- def verify_websub(websub) do
- PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:verify_websub, websub])
+ def publish(activity) do
+ PublisherWorker.enqueue("publish", %{"activity_id" => activity.id})
end
- def request_subscription(sub) do
- PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:request_subscription, sub])
+ def verify_websub(websub) do
+ SubscriberWorker.enqueue("verify_websub", %{"websub_id" => websub.id})
end
- def refresh_subscriptions do
- PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:refresh_subscriptions])
+ def request_subscription(websub) do
+ SubscriberWorker.enqueue("request_subscription", %{"websub_id" => websub.id})
end
- # Job Worker Callbacks
-
- def perform(:refresh_subscriptions) do
- Logger.debug("Federator running refresh subscriptions")
- Websub.refresh_subscriptions()
-
- spawn(fn ->
- # 6 hours
- Process.sleep(1000 * 60 * 60 * 6)
- refresh_subscriptions()
- end)
+ def refresh_subscriptions(worker_args \\ []) do
+ SubscriberWorker.enqueue("refresh_subscriptions", %{}, worker_args ++ [max_attempts: 1])
end
- def perform(:request_subscription, websub) do
- Logger.debug("Refreshing #{websub.topic}")
+ # Job Worker Callbacks
- with {:ok, websub} <- Websub.request_subscription(websub) do
- Logger.debug("Successfully refreshed #{websub.topic}")
- else
- _e -> Logger.debug("Couldn't refresh #{websub.topic}")
- end
+ @spec perform(atom(), module(), any()) :: {:ok, any()} | {:error, any()}
+ def perform(:publish_one, module, params) do
+ apply(module, :publish_one, [params])
end
def perform(:publish, activity) do
@@ -92,14 +81,6 @@ defmodule Pleroma.Web.Federator do
end
end
- def perform(:verify_websub, websub) do
- Logger.debug(fn ->
- "Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})"
- end)
-
- Websub.verify(websub)
- end
-
def perform(:incoming_doc, doc) do
Logger.info("Got document, trying to parse")
OStatus.handle_incoming(doc)
@@ -130,22 +111,27 @@ defmodule Pleroma.Web.Federator do
end
end
- def perform(
- :publish_single_websub,
- %{xml: _xml, topic: _topic, callback: _callback, secret: _secret} = params
- ) do
- case Websub.publish_one(params) do
- {:ok, _} ->
- :ok
+ def perform(:request_subscription, websub) do
+ Logger.debug("Refreshing #{websub.topic}")
- {:error, _} ->
- RetryQueue.enqueue(params, Websub)
+ with {:ok, websub} <- Websub.request_subscription(websub) do
+ Logger.debug("Successfully refreshed #{websub.topic}")
+ else
+ _e -> Logger.debug("Couldn't refresh #{websub.topic}")
end
end
- def perform(type, _) do
- Logger.debug(fn -> "Unknown task: #{type}" end)
- {:error, "Don't know what to do with this"}
+ def perform(:verify_websub, websub) do
+ Logger.debug(fn ->
+ "Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})"
+ end)
+
+ Websub.verify(websub)
+ end
+
+ def perform(:refresh_subscriptions) do
+ Logger.debug("Federator running refresh subscriptions")
+ Websub.refresh_subscriptions()
end
def ap_enabled_actor(id) do
diff --git a/lib/pleroma/web/federator/publisher.ex b/lib/pleroma/web/federator/publisher.ex
index 70f870244..937064638 100644
--- a/lib/pleroma/web/federator/publisher.ex
+++ b/lib/pleroma/web/federator/publisher.ex
@@ -6,7 +6,7 @@ defmodule Pleroma.Web.Federator.Publisher do
alias Pleroma.Activity
alias Pleroma.Config
alias Pleroma.User
- alias Pleroma.Web.Federator.RetryQueue
+ alias Pleroma.Workers.PublisherWorker
require Logger
@@ -30,23 +30,11 @@ defmodule Pleroma.Web.Federator.Publisher do
Enqueue publishing a single activity.
"""
@spec enqueue_one(module(), Map.t()) :: :ok
- def enqueue_one(module, %{} = params),
- do: PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish_one, module, params])
-
- @spec perform(atom(), module(), any()) :: {:ok, any()} | {:error, any()}
- def perform(:publish_one, module, params) do
- case apply(module, :publish_one, [params]) do
- {:ok, _} ->
- :ok
-
- {:error, _e} ->
- RetryQueue.enqueue(params, module)
- end
- end
-
- def perform(type, _, _) do
- Logger.debug("Unknown task: #{type}")
- {:error, "Don't know what to do with this"}
+ def enqueue_one(module, %{} = params) do
+ PublisherWorker.enqueue(
+ "publish_one",
+ %{"module" => to_string(module), "params" => params}
+ )
end
@doc """
diff --git a/lib/pleroma/web/federator/retry_queue.ex b/lib/pleroma/web/federator/retry_queue.ex
deleted file mode 100644
index 9eab8c218..000000000
--- a/lib/pleroma/web/federator/retry_queue.ex
+++ /dev/null
@@ -1,239 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Web.Federator.RetryQueue do
- use GenServer
-
- require Logger
-
- def init(args) do
- queue_table = :ets.new(:pleroma_retry_queue, [:bag, :protected])
-
- {:ok, %{args | queue_table: queue_table, running_jobs: :sets.new()}}
- end
-
- def start_link(_) do
- enabled =
- if Pleroma.Config.get(:env) == :test,
- do: true,
- else: Pleroma.Config.get([__MODULE__, :enabled], false)
-
- if enabled do
- Logger.info("Starting retry queue")
-
- linkres =
- GenServer.start_link(
- __MODULE__,
- %{delivered: 0, dropped: 0, queue_table: nil, running_jobs: nil},
- name: __MODULE__
- )
-
- maybe_kickoff_timer()
- linkres
- else
- Logger.info("Retry queue disabled")
- :ignore
- end
- end
-
- def enqueue(data, transport, retries \\ 0) do
- GenServer.cast(__MODULE__, {:maybe_enqueue, data, transport, retries + 1})
- end
-
- def get_stats do
- GenServer.call(__MODULE__, :get_stats)
- end
-
- def reset_stats do
- GenServer.call(__MODULE__, :reset_stats)
- end
-
- def get_retry_params(retries) do
- if retries > Pleroma.Config.get([__MODULE__, :max_retries]) do
- {:drop, "Max retries reached"}
- else
- {:retry, growth_function(retries)}
- end
- end
-
- def get_retry_timer_interval do
- Pleroma.Config.get([:retry_queue, :interval], 1000)
- end
-
- defp ets_count_expires(table, current_time) do
- :ets.select_count(
- table,
- [
- {
- {:"$1", :"$2"},
- [{:"=<", :"$1", {:const, current_time}}],
- [true]
- }
- ]
- )
- end
-
- defp ets_pop_n_expired(table, current_time, desired) do
- {popped, _continuation} =
- :ets.select(
- table,
- [
- {
- {:"$1", :"$2"},
- [{:"=<", :"$1", {:const, current_time}}],
- [:"$_"]
- }
- ],
- desired
- )
-
- popped
- |> Enum.each(fn e ->
- :ets.delete_object(table, e)
- end)
-
- popped
- end
-
- def maybe_start_job(running_jobs, queue_table) do
- # we don't want to hit the ets or the DateTime more times than we have to
- # could optimize slightly further by not using the count, and instead grabbing
- # up to N objects early...
- current_time = DateTime.to_unix(DateTime.utc_now())
- n_running_jobs = :sets.size(running_jobs)
-
- if n_running_jobs < Pleroma.Config.get([__MODULE__, :max_jobs]) do
- n_ready_jobs = ets_count_expires(queue_table, current_time)
-
- if n_ready_jobs > 0 do
- # figure out how many we could start
- available_job_slots = Pleroma.Config.get([__MODULE__, :max_jobs]) - n_running_jobs
- start_n_jobs(running_jobs, queue_table, current_time, available_job_slots)
- else
- running_jobs
- end
- else
- running_jobs
- end
- end
-
- defp start_n_jobs(running_jobs, _queue_table, _current_time, 0) do
- running_jobs
- end
-
- defp start_n_jobs(running_jobs, queue_table, current_time, available_job_slots)
- when available_job_slots > 0 do
- candidates = ets_pop_n_expired(queue_table, current_time, available_job_slots)
-
- candidates
- |> List.foldl(running_jobs, fn {_, e}, rj ->
- {:ok, pid} = Task.start(fn -> worker(e) end)
- mref = Process.monitor(pid)
- :sets.add_element(mref, rj)
- end)
- end
-
- def worker({:send, data, transport, retries}) do
- case transport.publish_one(data) do
- {:ok, _} ->
- GenServer.cast(__MODULE__, :inc_delivered)
- :delivered
-
- {:error, _reason} ->
- enqueue(data, transport, retries)
- :retry
- end
- end
-
- def handle_call(:get_stats, _from, %{delivered: delivery_count, dropped: drop_count} = state) do
- {:reply, %{delivered: delivery_count, dropped: drop_count}, state}
- end
-
- def handle_call(:reset_stats, _from, %{delivered: delivery_count, dropped: drop_count} = state) do
- {:reply, %{delivered: delivery_count, dropped: drop_count},
- %{state | delivered: 0, dropped: 0}}
- end
-
- def handle_cast(:reset_stats, state) do
- {:noreply, %{state | delivered: 0, dropped: 0}}
- end
-
- def handle_cast(
- {:maybe_enqueue, data, transport, retries},
- %{dropped: drop_count, queue_table: queue_table, running_jobs: running_jobs} = state
- ) do
- case get_retry_params(retries) do
- {:retry, timeout} ->
- :ets.insert(queue_table, {timeout, {:send, data, transport, retries}})
- running_jobs = maybe_start_job(running_jobs, queue_table)
- {:noreply, %{state | running_jobs: running_jobs}}
-
- {:drop, message} ->
- Logger.debug(message)
- {:noreply, %{state | dropped: drop_count + 1}}
- end
- end
-
- def handle_cast(:kickoff_timer, state) do
- retry_interval = get_retry_timer_interval()
- Process.send_after(__MODULE__, :retry_timer_run, retry_interval)
- {:noreply, state}
- end
-
- def handle_cast(:inc_delivered, %{delivered: delivery_count} = state) do
- {:noreply, %{state | delivered: delivery_count + 1}}
- end
-
- def handle_cast(:inc_dropped, %{dropped: drop_count} = state) do
- {:noreply, %{state | dropped: drop_count + 1}}
- end
-
- def handle_info({:send, data, transport, retries}, %{delivered: delivery_count} = state) do
- case transport.publish_one(data) do
- {:ok, _} ->
- {:noreply, %{state | delivered: delivery_count + 1}}
-
- {:error, _reason} ->
- enqueue(data, transport, retries)
- {:noreply, state}
- end
- end
-
- def handle_info(
- :retry_timer_run,
- %{queue_table: queue_table, running_jobs: running_jobs} = state
- ) do
- maybe_kickoff_timer()
- running_jobs = maybe_start_job(running_jobs, queue_table)
- {:noreply, %{state | running_jobs: running_jobs}}
- end
-
- def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do
- %{running_jobs: running_jobs, queue_table: queue_table} = state
- running_jobs = :sets.del_element(ref, running_jobs)
- running_jobs = maybe_start_job(running_jobs, queue_table)
- {:noreply, %{state | running_jobs: running_jobs}}
- end
-
- def handle_info(unknown, state) do
- Logger.debug("RetryQueue: don't know what to do with #{inspect(unknown)}, ignoring")
- {:noreply, state}
- end
-
- if Pleroma.Config.get(:env) == :test do
- defp growth_function(_retries) do
- _shutit = Pleroma.Config.get([__MODULE__, :initial_timeout])
- DateTime.to_unix(DateTime.utc_now()) - 1
- end
- else
- defp growth_function(retries) do
- round(Pleroma.Config.get([__MODULE__, :initial_timeout]) * :math.pow(retries, 3)) +
- DateTime.to_unix(DateTime.utc_now())
- end
- end
-
- defp maybe_kickoff_timer do
- GenServer.cast(__MODULE__, :kickoff_timer)
- end
-end
diff --git a/lib/pleroma/web/mastodon_api/controllers/mastodon_api_controller.ex b/lib/pleroma/web/mastodon_api/controllers/mastodon_api_controller.ex
index 83e877c0e..060137b80 100644
--- a/lib/pleroma/web/mastodon_api/controllers/mastodon_api_controller.ex
+++ b/lib/pleroma/web/mastodon_api/controllers/mastodon_api_controller.ex
@@ -6,7 +6,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
use Pleroma.Web, :controller
import Pleroma.Web.ControllerHelper,
- only: [json_response: 3, add_link_headers: 5, add_link_headers: 4, add_link_headers: 3]
+ only: [json_response: 3, add_link_headers: 2, add_link_headers: 3]
alias Ecto.Changeset
alias Pleroma.Activity
@@ -290,7 +290,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
end
def user(%{assigns: %{user: for_user}} = conn, %{"id" => nickname_or_id}) do
- with %User{} = user <- User.get_cached_by_nickname_or_id(nickname_or_id),
+ with %User{} = user <- User.get_cached_by_nickname_or_id(nickname_or_id, for: for_user),
true <- User.auth_active?(user) || user.id == for_user.id || User.superuser?(for_user) do
account = AccountView.render("account.json", %{user: user, for: for_user})
json(conn, account)
@@ -365,7 +365,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
|> Enum.reverse()
conn
- |> add_link_headers(:home_timeline, activities)
+ |> add_link_headers(activities)
|> put_view(StatusView)
|> render("index.json", %{activities: activities, for: user, as: :activity})
end
@@ -384,13 +384,13 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
|> Enum.reverse()
conn
- |> add_link_headers(:public_timeline, activities, false, %{"local" => local_only})
+ |> add_link_headers(activities, %{"local" => local_only})
|> put_view(StatusView)
|> render("index.json", %{activities: activities, for: user, as: :activity})
end
def user_statuses(%{assigns: %{user: reading_user}} = conn, params) do
- with %User{} = user <- User.get_cached_by_nickname_or_id(params["id"]) do
+ with %User{} = user <- User.get_cached_by_nickname_or_id(params["id"], for: reading_user) do
params =
params
|> Map.put("tag", params["tagged"])
@@ -398,7 +398,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
activities = ActivityPub.fetch_user_activities(user, reading_user, params)
conn
- |> add_link_headers(:user_statuses, activities, params["id"])
+ |> add_link_headers(activities)
|> put_view(StatusView)
|> render("index.json", %{
activities: activities,
@@ -422,11 +422,25 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
|> Pagination.fetch_paginated(params)
conn
- |> add_link_headers(:dm_timeline, activities)
+ |> add_link_headers(activities)
|> put_view(StatusView)
|> render("index.json", %{activities: activities, for: user, as: :activity})
end
+ def get_statuses(%{assigns: %{user: user}} = conn, %{"ids" => ids}) do
+ limit = 100
+
+ activities =
+ ids
+ |> Enum.take(limit)
+ |> Activity.all_by_ids_with_object()
+ |> Enum.filter(&Visibility.visible_for_user?(&1, user))
+
+ conn
+ |> put_view(StatusView)
+ |> render("index.json", activities: activities, for: user, as: :activity)
+ end
+
def get_status(%{assigns: %{user: user}} = conn, %{"id" => id}) do
with %Activity{} = activity <- Activity.get_by_id_with_object(id),
true <- Visibility.visible_for_user?(activity, user) do
@@ -523,7 +537,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
def scheduled_statuses(%{assigns: %{user: user}} = conn, params) do
with scheduled_activities <- MastodonAPI.get_scheduled_activities(user, params) do
conn
- |> add_link_headers(:scheduled_statuses, scheduled_activities)
+ |> add_link_headers(scheduled_activities)
|> put_view(ScheduledActivityView)
|> render("index.json", %{scheduled_activities: scheduled_activities})
end
@@ -706,7 +720,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
notifications = MastodonAPI.get_notifications(user, params)
conn
- |> add_link_headers(:notifications, notifications)
+ |> add_link_headers(notifications)
|> put_view(NotificationView)
|> render("index.json", %{notifications: notifications, for: user})
end
@@ -828,6 +842,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
def favourited_by(%{assigns: %{user: user}} = conn, %{"id" => id}) do
with %Activity{} = activity <- Activity.get_by_id_with_object(id),
+ {:visible, true} <- {:visible, Visibility.visible_for_user?(activity, user)},
%Object{data: %{"likes" => likes}} <- Object.normalize(activity) do
q = from(u in User, where: u.ap_id in ^likes)
@@ -839,12 +854,14 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
|> put_view(AccountView)
|> render("accounts.json", %{for: user, users: users, as: :user})
else
+ {:visible, false} -> {:error, :not_found}
_ -> json(conn, [])
end
end
def reblogged_by(%{assigns: %{user: user}} = conn, %{"id" => id}) do
with %Activity{} = activity <- Activity.get_by_id_with_object(id),
+ {:visible, true} <- {:visible, Visibility.visible_for_user?(activity, user)},
%Object{data: %{"announcements" => announces}} <- Object.normalize(activity) do
q = from(u in User, where: u.ap_id in ^announces)
@@ -856,6 +873,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
|> put_view(AccountView)
|> render("accounts.json", %{for: user, users: users, as: :user})
else
+ {:visible, false} -> {:error, :not_found}
_ -> json(conn, [])
end
end
@@ -894,7 +912,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
|> Enum.reverse()
conn
- |> add_link_headers(:hashtag_timeline, activities, params["tag"], %{"local" => local_only})
+ |> add_link_headers(activities, %{"local" => local_only})
|> put_view(StatusView)
|> render("index.json", %{activities: activities, for: user, as: :activity})
end
@@ -910,7 +928,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
end
conn
- |> add_link_headers(:followers, followers, user)
+ |> add_link_headers(followers)
|> put_view(AccountView)
|> render("accounts.json", %{for: for_user, users: followers, as: :user})
end
@@ -927,7 +945,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
end
conn
- |> add_link_headers(:following, followers, user)
+ |> add_link_headers(followers)
|> put_view(AccountView)
|> render("accounts.json", %{for: for_user, users: followers, as: :user})
end
@@ -1152,7 +1170,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
|> Enum.reverse()
conn
- |> add_link_headers(:favourites, activities)
+ |> add_link_headers(activities)
|> put_view(StatusView)
|> render("index.json", %{activities: activities, for: user, as: :activity})
end
@@ -1179,7 +1197,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
|> Enum.reverse()
conn
- |> add_link_headers(:favourites, activities)
+ |> add_link_headers(activities)
|> put_view(StatusView)
|> render("index.json", %{activities: activities, for: for_user, as: :activity})
else
@@ -1200,7 +1218,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
|> Enum.map(fn b -> Map.put(b.activity, :bookmark, Map.delete(b, :activity)) end)
conn
- |> add_link_headers(:bookmarks, bookmarks)
+ |> add_link_headers(bookmarks)
|> put_view(StatusView)
|> render("index.json", %{activities: activities, for: user, as: :activity})
end
@@ -1640,7 +1658,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
end)
conn
- |> add_link_headers(:conversations, participations)
+ |> add_link_headers(participations)
|> json(conversations)
end
diff --git a/lib/pleroma/web/mastodon_api/views/notification_view.ex b/lib/pleroma/web/mastodon_api/views/notification_view.ex
index 27e9cab06..ec8eadcaa 100644
--- a/lib/pleroma/web/mastodon_api/views/notification_view.ex
+++ b/lib/pleroma/web/mastodon_api/views/notification_view.ex
@@ -14,7 +14,7 @@ defmodule Pleroma.Web.MastodonAPI.NotificationView do
alias Pleroma.Web.MastodonAPI.StatusView
def render("index.json", %{notifications: notifications, for: user}) do
- render_many(notifications, NotificationView, "show.json", %{for: user})
+ safe_render_many(notifications, NotificationView, "show.json", %{for: user})
end
def render("show.json", %{
diff --git a/lib/pleroma/web/mastodon_api/views/status_view.ex b/lib/pleroma/web/mastodon_api/views/status_view.ex
index 4c3c8c564..ef796cddd 100644
--- a/lib/pleroma/web/mastodon_api/views/status_view.ex
+++ b/lib/pleroma/web/mastodon_api/views/status_view.ex
@@ -73,14 +73,12 @@ defmodule Pleroma.Web.MastodonAPI.StatusView do
def render("index.json", opts) do
replied_to_activities = get_replied_to_activities(opts.activities)
- parallel = unless is_nil(opts[:parallel]), do: opts[:parallel], else: true
opts.activities
|> safe_render_many(
StatusView,
"status.json",
- Map.put(opts, :replied_to_activities, replied_to_activities),
- parallel
+ Map.put(opts, :replied_to_activities, replied_to_activities)
)
end
@@ -385,16 +383,27 @@ defmodule Pleroma.Web.MastodonAPI.StatusView do
end
if options do
- end_time =
- (object.data["closed"] || object.data["endTime"])
- |> NaiveDateTime.from_iso8601!()
-
- expired =
- end_time
- |> NaiveDateTime.compare(NaiveDateTime.utc_now())
- |> case do
- :lt -> true
- _ -> false
+ {end_time, expired} =
+ case object.data["closed"] || object.data["endTime"] do
+ end_time when is_binary(end_time) ->
+ end_time =
+ (object.data["closed"] || object.data["endTime"])
+ |> NaiveDateTime.from_iso8601!()
+
+ expired =
+ end_time
+ |> NaiveDateTime.compare(NaiveDateTime.utc_now())
+ |> case do
+ :lt -> true
+ _ -> false
+ end
+
+ end_time = Utils.to_masto_date(end_time)
+
+ {end_time, expired}
+
+ _ ->
+ {nil, false}
end
voted =
@@ -421,7 +430,7 @@ defmodule Pleroma.Web.MastodonAPI.StatusView do
# Mastodon uses separate ids for polls, but an object can't have
# more than one poll embedded so object id is fine
id: to_string(object.id),
- expires_at: Utils.to_masto_date(end_time),
+ expires_at: end_time,
expired: expired,
multiple: multiple,
votes_count: votes_count,
@@ -488,7 +497,7 @@ defmodule Pleroma.Web.MastodonAPI.StatusView do
object_tags = for tag when is_binary(tag) <- object_tags, do: tag
Enum.reduce(object_tags, [], fn tag, tags ->
- tags ++ [%{name: tag, url: "/tag/#{tag}"}]
+ tags ++ [%{name: tag, url: "/tag/#{URI.encode(tag)}"}]
end)
end
diff --git a/lib/pleroma/web/mastodon_api/websocket_handler.ex b/lib/pleroma/web/mastodon_api/websocket_handler.ex
index dbd3542ea..3c26eb406 100644
--- a/lib/pleroma/web/mastodon_api/websocket_handler.ex
+++ b/lib/pleroma/web/mastodon_api/websocket_handler.ex
@@ -8,6 +8,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
alias Pleroma.Repo
alias Pleroma.User
alias Pleroma.Web.OAuth.Token
+ alias Pleroma.Web.Streamer
@behaviour :cowboy_websocket
@@ -24,7 +25,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
]
@anonymous_streams ["public", "public:local", "hashtag"]
- # Handled by periodic keepalive in Pleroma.Web.Streamer.
+ # Handled by periodic keepalive in Pleroma.Web.Streamer.Ping.
@timeout :infinity
def init(%{qs: qs} = req, state) do
@@ -65,7 +66,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
}, topic #{state.topic}"
)
- Pleroma.Web.Streamer.add_socket(state.topic, streamer_socket(state))
+ Streamer.add_socket(state.topic, streamer_socket(state))
{:ok, state}
end
@@ -80,7 +81,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
}, topic #{state.topic || "?"}: #{inspect(reason)}"
)
- Pleroma.Web.Streamer.remove_socket(state.topic, streamer_socket(state))
+ Streamer.remove_socket(state.topic, streamer_socket(state))
:ok
end
diff --git a/lib/pleroma/web/oauth/token/clean_worker.ex b/lib/pleroma/web/oauth/token/clean_worker.ex
index f50098302..eb94bf86f 100644
--- a/lib/pleroma/web/oauth/token/clean_worker.ex
+++ b/lib/pleroma/web/oauth/token/clean_worker.ex
@@ -17,6 +17,7 @@ defmodule Pleroma.Web.OAuth.Token.CleanWorker do
)
alias Pleroma.Web.OAuth.Token
+ alias Pleroma.Workers.BackgroundWorker
def start_link(_), do: GenServer.start_link(__MODULE__, %{})
@@ -27,9 +28,11 @@ defmodule Pleroma.Web.OAuth.Token.CleanWorker do
@doc false
def handle_info(:perform, state) do
- Token.delete_expired_tokens()
+ BackgroundWorker.enqueue("clean_expired_tokens", %{})
Process.send_after(self(), :perform, @interval)
{:noreply, state}
end
+
+ def perform(:clean), do: Token.delete_expired_tokens()
end
diff --git a/lib/pleroma/web/pleroma_api/pleroma_api_controller.ex b/lib/pleroma/web/pleroma_api/pleroma_api_controller.ex
index f4df3b024..d17ccf84d 100644
--- a/lib/pleroma/web/pleroma_api/pleroma_api_controller.ex
+++ b/lib/pleroma/web/pleroma_api/pleroma_api_controller.ex
@@ -5,7 +5,7 @@
defmodule Pleroma.Web.PleromaAPI.PleromaAPIController do
use Pleroma.Web, :controller
- import Pleroma.Web.ControllerHelper, only: [add_link_headers: 7]
+ import Pleroma.Web.ControllerHelper, only: [add_link_headers: 2]
alias Pleroma.Conversation.Participation
alias Pleroma.Notification
@@ -27,31 +27,22 @@ defmodule Pleroma.Web.PleromaAPI.PleromaAPIController do
%{assigns: %{user: user}} = conn,
%{"id" => participation_id} = params
) do
- params =
- params
- |> Map.put("blocking_user", user)
- |> Map.put("muting_user", user)
- |> Map.put("user", user)
-
- participation =
- participation_id
- |> Participation.get(preload: [:conversation])
+ participation = Participation.get(participation_id, preload: [:conversation])
if user.id == participation.user_id do
+ params =
+ params
+ |> Map.put("blocking_user", user)
+ |> Map.put("muting_user", user)
+ |> Map.put("user", user)
+
activities =
participation.conversation.ap_id
|> ActivityPub.fetch_activities_for_context(params)
|> Enum.reverse()
conn
- |> add_link_headers(
- :conversation_statuses,
- activities,
- participation_id,
- params,
- nil,
- &pleroma_api_url/4
- )
+ |> add_link_headers(activities)
|> put_view(StatusView)
|> render("index.json", %{activities: activities, for: user, as: :activity})
end
diff --git a/lib/pleroma/web/push/push.ex b/lib/pleroma/web/push/push.ex
index 729dad02a..7ef1532ac 100644
--- a/lib/pleroma/web/push/push.ex
+++ b/lib/pleroma/web/push/push.ex
@@ -3,7 +3,7 @@
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.Push do
- alias Pleroma.Web.Push.Impl
+ alias Pleroma.Workers.WebPusherWorker
require Logger
@@ -31,6 +31,7 @@ defmodule Pleroma.Web.Push do
end
end
- def send(notification),
- do: PleromaJobQueue.enqueue(:web_push, Impl, [notification])
+ def send(notification) do
+ WebPusherWorker.enqueue("web_push", %{"notification_id" => notification.id})
+ end
end
diff --git a/lib/pleroma/web/rich_media/parser.ex b/lib/pleroma/web/rich_media/parser.ex
index f5f9e358c..c06b0a0f2 100644
--- a/lib/pleroma/web/rich_media/parser.ex
+++ b/lib/pleroma/web/rich_media/parser.ex
@@ -81,6 +81,7 @@ defmodule Pleroma.Web.RichMedia.Parser do
{:ok, %Tesla.Env{body: html}} = Pleroma.HTTP.get(url, [], adapter: @hackney_options)
html
+ |> parse_html
|> maybe_parse()
|> Map.put(:url, url)
|> clean_parsed_data()
@@ -91,6 +92,8 @@ defmodule Pleroma.Web.RichMedia.Parser do
end
end
+ defp parse_html(html), do: Floki.parse(html)
+
defp maybe_parse(html) do
Enum.reduce_while(parsers(), %{}, fn parser, acc ->
case parser.parse(html, acc) do
@@ -100,7 +103,8 @@ defmodule Pleroma.Web.RichMedia.Parser do
end)
end
- defp check_parsed_data(%{title: title} = data) when is_binary(title) and byte_size(title) > 0 do
+ defp check_parsed_data(%{title: title} = data)
+ when is_binary(title) and byte_size(title) > 0 do
{:ok, data}
end
diff --git a/lib/pleroma/web/router.ex b/lib/pleroma/web/router.ex
index 44a4279f7..401133bf3 100644
--- a/lib/pleroma/web/router.ex
+++ b/lib/pleroma/web/router.ex
@@ -135,6 +135,7 @@ defmodule Pleroma.Web.Router do
pipeline :http_signature do
plug(Pleroma.Web.Plugs.HTTPSignaturePlug)
+ plug(Pleroma.Web.Plugs.MappedSignatureToIdentityPlug)
end
scope "/api/pleroma", Pleroma.Web.TwitterAPI do
@@ -224,6 +225,7 @@ defmodule Pleroma.Web.Router do
scope [] do
pipe_through(:oauth_write)
+ post("/change_email", UtilController, :change_email)
post("/change_password", UtilController, :change_password)
post("/delete_account", UtilController, :delete_account)
put("/notification_settings", UtilController, :update_notificaton_settings)
@@ -443,6 +445,7 @@ defmodule Pleroma.Web.Router do
get("/timelines/tag/:tag", MastodonAPIController, :hashtag_timeline)
get("/timelines/list/:list_id", MastodonAPIController, :list_timeline)
+ get("/statuses", MastodonAPIController, :get_statuses)
get("/statuses/:id", MastodonAPIController, :get_status)
get("/statuses/:id/context", MastodonAPIController, :get_context)
@@ -477,53 +480,12 @@ defmodule Pleroma.Web.Router do
scope "/api", Pleroma.Web do
pipe_through(:api)
- post("/account/register", TwitterAPI.Controller, :register)
- post("/account/password_reset", TwitterAPI.Controller, :password_reset)
-
- post("/account/resend_confirmation_email", TwitterAPI.Controller, :resend_confirmation_email)
-
get(
"/account/confirm_email/:user_id/:token",
TwitterAPI.Controller,
:confirm_email,
as: :confirm_email
)
-
- scope [] do
- pipe_through(:oauth_read_or_public)
-
- get("/statuses/user_timeline", TwitterAPI.Controller, :user_timeline)
- get("/qvitter/statuses/user_timeline", TwitterAPI.Controller, :user_timeline)
- get("/users/show", TwitterAPI.Controller, :show_user)
-
- get("/statuses/followers", TwitterAPI.Controller, :followers)
- get("/statuses/friends", TwitterAPI.Controller, :friends)
- get("/statuses/blocks", TwitterAPI.Controller, :blocks)
- get("/statuses/show/:id", TwitterAPI.Controller, :fetch_status)
- get("/statusnet/conversation/:id", TwitterAPI.Controller, :fetch_conversation)
-
- get("/search", TwitterAPI.Controller, :search)
- get("/statusnet/tags/timeline/:tag", TwitterAPI.Controller, :public_and_external_timeline)
- end
- end
-
- scope "/api", Pleroma.Web do
- pipe_through([:api, :oauth_read_or_public])
-
- get("/statuses/public_timeline", TwitterAPI.Controller, :public_timeline)
-
- get(
- "/statuses/public_and_external_timeline",
- TwitterAPI.Controller,
- :public_and_external_timeline
- )
-
- get("/statuses/networkpublic_timeline", TwitterAPI.Controller, :public_and_external_timeline)
- end
-
- scope "/api", Pleroma.Web, as: :twitter_api_search do
- pipe_through([:api, :oauth_read_or_public])
- get("/pleroma/search_user", TwitterAPI.Controller, :search_user)
end
scope "/api", Pleroma.Web, as: :authenticated_twitter_api do
@@ -535,67 +497,8 @@ defmodule Pleroma.Web.Router do
scope [] do
pipe_through(:oauth_read)
- get("/account/verify_credentials", TwitterAPI.Controller, :verify_credentials)
- post("/account/verify_credentials", TwitterAPI.Controller, :verify_credentials)
-
- get("/statuses/home_timeline", TwitterAPI.Controller, :friends_timeline)
- get("/statuses/friends_timeline", TwitterAPI.Controller, :friends_timeline)
- get("/statuses/mentions", TwitterAPI.Controller, :mentions_timeline)
- get("/statuses/mentions_timeline", TwitterAPI.Controller, :mentions_timeline)
- get("/statuses/dm_timeline", TwitterAPI.Controller, :dm_timeline)
- get("/qvitter/statuses/notifications", TwitterAPI.Controller, :notifications)
-
- get("/pleroma/friend_requests", TwitterAPI.Controller, :friend_requests)
-
- get("/friends/ids", TwitterAPI.Controller, :friends_ids)
- get("/friendships/no_retweets/ids", TwitterAPI.Controller, :empty_array)
-
- get("/mutes/users/ids", TwitterAPI.Controller, :empty_array)
- get("/qvitter/mutes", TwitterAPI.Controller, :raw_empty_array)
-
- get("/externalprofile/show", TwitterAPI.Controller, :external_profile)
-
post("/qvitter/statuses/notifications/read", TwitterAPI.Controller, :notifications_read)
end
-
- scope [] do
- pipe_through(:oauth_write)
-
- post("/account/update_profile", TwitterAPI.Controller, :update_profile)
- post("/account/update_profile_banner", TwitterAPI.Controller, :update_banner)
- post("/qvitter/update_background_image", TwitterAPI.Controller, :update_background)
-
- post("/statuses/update", TwitterAPI.Controller, :status_update)
- post("/statuses/retweet/:id", TwitterAPI.Controller, :retweet)
- post("/statuses/unretweet/:id", TwitterAPI.Controller, :unretweet)
- post("/statuses/destroy/:id", TwitterAPI.Controller, :delete_post)
-
- post("/statuses/pin/:id", TwitterAPI.Controller, :pin)
- post("/statuses/unpin/:id", TwitterAPI.Controller, :unpin)
-
- post("/statusnet/media/upload", TwitterAPI.Controller, :upload)
- post("/media/upload", TwitterAPI.Controller, :upload_json)
- post("/media/metadata/create", TwitterAPI.Controller, :update_media)
-
- post("/favorites/create/:id", TwitterAPI.Controller, :favorite)
- post("/favorites/create", TwitterAPI.Controller, :favorite)
- post("/favorites/destroy/:id", TwitterAPI.Controller, :unfavorite)
-
- post("/qvitter/update_avatar", TwitterAPI.Controller, :update_avatar)
- end
-
- scope [] do
- pipe_through(:oauth_follow)
-
- post("/pleroma/friendships/approve", TwitterAPI.Controller, :approve_friend_request)
- post("/pleroma/friendships/deny", TwitterAPI.Controller, :deny_friend_request)
-
- post("/friendships/create", TwitterAPI.Controller, :follow)
- post("/friendships/destroy", TwitterAPI.Controller, :unfollow)
-
- post("/blocks/create", TwitterAPI.Controller, :block)
- post("/blocks/destroy", TwitterAPI.Controller, :unblock)
- end
end
pipeline :ap_service_actor do
@@ -612,6 +515,7 @@ defmodule Pleroma.Web.Router do
scope "/", Pleroma.Web do
pipe_through(:ostatus)
+ pipe_through(:http_signature)
get("/objects/:uuid", OStatus.OStatusController, :object)
get("/activities/:uuid", OStatus.OStatusController, :activity)
diff --git a/lib/pleroma/web/salmon/salmon.ex b/lib/pleroma/web/salmon/salmon.ex
index 9b01ebcc6..8ba7380c0 100644
--- a/lib/pleroma/web/salmon/salmon.ex
+++ b/lib/pleroma/web/salmon/salmon.ex
@@ -170,6 +170,15 @@ defmodule Pleroma.Web.Salmon do
end
end
+ def publish_one(%{recipient_id: recipient_id} = params) do
+ recipient = User.get_cached_by_id(recipient_id)
+
+ params
+ |> Map.delete(:recipient_id)
+ |> Map.put(:recipient, recipient)
+ |> publish_one()
+ end
+
def publish_one(_), do: :noop
@supported_activities [
@@ -218,7 +227,7 @@ defmodule Pleroma.Web.Salmon do
Logger.debug(fn -> "Sending Salmon to #{remote_user.ap_id}" end)
Publisher.enqueue_one(__MODULE__, %{
- recipient: remote_user,
+ recipient_id: remote_user.id,
feed: feed,
unreachable_since: reachable_urls_metadata[remote_user.info.salmon]
})
diff --git a/lib/pleroma/web/streamer.ex b/lib/pleroma/web/streamer.ex
deleted file mode 100644
index 587c43f40..000000000
--- a/lib/pleroma/web/streamer.ex
+++ /dev/null
@@ -1,318 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Web.Streamer do
- use GenServer
- require Logger
- alias Pleroma.Activity
- alias Pleroma.Config
- alias Pleroma.Conversation.Participation
- alias Pleroma.Notification
- alias Pleroma.Object
- alias Pleroma.User
- alias Pleroma.Web.ActivityPub.ActivityPub
- alias Pleroma.Web.ActivityPub.Visibility
- alias Pleroma.Web.CommonAPI
- alias Pleroma.Web.MastodonAPI.NotificationView
-
- @keepalive_interval :timer.seconds(30)
-
- def start_link(_) do
- GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
- end
-
- def add_socket(topic, socket) do
- GenServer.cast(__MODULE__, %{action: :add, socket: socket, topic: topic})
- end
-
- def remove_socket(topic, socket) do
- GenServer.cast(__MODULE__, %{action: :remove, socket: socket, topic: topic})
- end
-
- def stream(topic, item) do
- GenServer.cast(__MODULE__, %{action: :stream, topic: topic, item: item})
- end
-
- def init(args) do
- Process.send_after(self(), %{action: :ping}, @keepalive_interval)
-
- {:ok, args}
- end
-
- def handle_info(%{action: :ping}, topics) do
- topics
- |> Map.values()
- |> List.flatten()
- |> Enum.each(fn socket ->
- Logger.debug("Sending keepalive ping")
- send(socket.transport_pid, {:text, ""})
- end)
-
- Process.send_after(self(), %{action: :ping}, @keepalive_interval)
-
- {:noreply, topics}
- end
-
- def handle_cast(%{action: :stream, topic: "direct", item: item}, topics) do
- recipient_topics =
- User.get_recipients_from_activity(item)
- |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
-
- Enum.each(recipient_topics || [], fn user_topic ->
- Logger.debug("Trying to push direct message to #{user_topic}\n\n")
- push_to_socket(topics, user_topic, item)
- end)
-
- {:noreply, topics}
- end
-
- def handle_cast(%{action: :stream, topic: "participation", item: participation}, topics) do
- user_topic = "direct:#{participation.user_id}"
- Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
-
- push_to_socket(topics, user_topic, participation)
-
- {:noreply, topics}
- end
-
- def handle_cast(%{action: :stream, topic: "list", item: item}, topics) do
- # filter the recipient list if the activity is not public, see #270.
- recipient_lists =
- case Visibility.is_public?(item) do
- true ->
- Pleroma.List.get_lists_from_activity(item)
-
- _ ->
- Pleroma.List.get_lists_from_activity(item)
- |> Enum.filter(fn list ->
- owner = User.get_cached_by_id(list.user_id)
-
- Visibility.visible_for_user?(item, owner)
- end)
- end
-
- recipient_topics =
- recipient_lists
- |> Enum.map(fn %{id: id} -> "list:#{id}" end)
-
- Enum.each(recipient_topics || [], fn list_topic ->
- Logger.debug("Trying to push message to #{list_topic}\n\n")
- push_to_socket(topics, list_topic, item)
- end)
-
- {:noreply, topics}
- end
-
- def handle_cast(
- %{action: :stream, topic: topic, item: %Notification{} = item},
- topics
- )
- when topic in ["user", "user:notification"] do
- topics
- |> Map.get("#{topic}:#{item.user_id}", [])
- |> Enum.each(fn socket ->
- with %User{} = user <- User.get_cached_by_ap_id(socket.assigns[:user].ap_id),
- true <- should_send?(user, item) do
- send(
- socket.transport_pid,
- {:text, represent_notification(socket.assigns[:user], item)}
- )
- end
- end)
-
- {:noreply, topics}
- end
-
- def handle_cast(%{action: :stream, topic: "user", item: item}, topics) do
- Logger.debug("Trying to push to users")
-
- recipient_topics =
- User.get_recipients_from_activity(item)
- |> Enum.map(fn %{id: id} -> "user:#{id}" end)
-
- Enum.each(recipient_topics, fn topic ->
- push_to_socket(topics, topic, item)
- end)
-
- {:noreply, topics}
- end
-
- def handle_cast(%{action: :stream, topic: topic, item: item}, topics) do
- Logger.debug("Trying to push to #{topic}")
- Logger.debug("Pushing item to #{topic}")
- push_to_socket(topics, topic, item)
- {:noreply, topics}
- end
-
- def handle_cast(%{action: :add, topic: topic, socket: socket}, sockets) do
- topic = internal_topic(topic, socket)
- sockets_for_topic = sockets[topic] || []
- sockets_for_topic = Enum.uniq([socket | sockets_for_topic])
- sockets = Map.put(sockets, topic, sockets_for_topic)
- Logger.debug("Got new conn for #{topic}")
- {:noreply, sockets}
- end
-
- def handle_cast(%{action: :remove, topic: topic, socket: socket}, sockets) do
- topic = internal_topic(topic, socket)
- sockets_for_topic = sockets[topic] || []
- sockets_for_topic = List.delete(sockets_for_topic, socket)
- sockets = Map.put(sockets, topic, sockets_for_topic)
- Logger.debug("Removed conn for #{topic}")
- {:noreply, sockets}
- end
-
- def handle_cast(m, state) do
- Logger.info("Unknown: #{inspect(m)}, #{inspect(state)}")
- {:noreply, state}
- end
-
- defp represent_update(%Activity{} = activity, %User{} = user) do
- %{
- event: "update",
- payload:
- Pleroma.Web.MastodonAPI.StatusView.render(
- "status.json",
- activity: activity,
- for: user
- )
- |> Jason.encode!()
- }
- |> Jason.encode!()
- end
-
- defp represent_update(%Activity{} = activity) do
- %{
- event: "update",
- payload:
- Pleroma.Web.MastodonAPI.StatusView.render(
- "status.json",
- activity: activity
- )
- |> Jason.encode!()
- }
- |> Jason.encode!()
- end
-
- def represent_conversation(%Participation{} = participation) do
- %{
- event: "conversation",
- payload:
- Pleroma.Web.MastodonAPI.ConversationView.render("participation.json", %{
- participation: participation,
- for: participation.user
- })
- |> Jason.encode!()
- }
- |> Jason.encode!()
- end
-
- @spec represent_notification(User.t(), Notification.t()) :: binary()
- defp represent_notification(%User{} = user, %Notification{} = notify) do
- %{
- event: "notification",
- payload:
- NotificationView.render(
- "show.json",
- %{notification: notify, for: user}
- )
- |> Jason.encode!()
- }
- |> Jason.encode!()
- end
-
- defp should_send?(%User{} = user, %Activity{} = item) do
- blocks = user.info.blocks || []
- mutes = user.info.mutes || []
- reblog_mutes = user.info.muted_reblogs || []
- domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.info.domain_blocks)
-
- with parent when not is_nil(parent) <- Object.normalize(item),
- true <- Enum.all?([blocks, mutes, reblog_mutes], &(item.actor not in &1)),
- true <- Enum.all?([blocks, mutes], &(parent.data["actor"] not in &1)),
- %{host: item_host} <- URI.parse(item.actor),
- %{host: parent_host} <- URI.parse(parent.data["actor"]),
- false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
- false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
- true <- thread_containment(item, user),
- false <- CommonAPI.thread_muted?(user, item) do
- true
- else
- _ -> false
- end
- end
-
- defp should_send?(%User{} = user, %Notification{activity: activity}) do
- should_send?(user, activity)
- end
-
- def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do
- Enum.each(topics[topic] || [], fn socket ->
- # Get the current user so we have up-to-date blocks etc.
- if socket.assigns[:user] do
- user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id)
-
- if should_send?(user, item) do
- send(socket.transport_pid, {:text, represent_update(item, user)})
- end
- else
- send(socket.transport_pid, {:text, represent_update(item)})
- end
- end)
- end
-
- def push_to_socket(topics, topic, %Participation{} = participation) do
- Enum.each(topics[topic] || [], fn socket ->
- send(socket.transport_pid, {:text, represent_conversation(participation)})
- end)
- end
-
- def push_to_socket(topics, topic, %Activity{
- data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
- }) do
- Enum.each(topics[topic] || [], fn socket ->
- send(
- socket.transport_pid,
- {:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()}
- )
- end)
- end
-
- def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
-
- def push_to_socket(topics, topic, item) do
- Enum.each(topics[topic] || [], fn socket ->
- # Get the current user so we have up-to-date blocks etc.
- if socket.assigns[:user] do
- user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id)
- blocks = user.info.blocks || []
- mutes = user.info.mutes || []
-
- with true <- Enum.all?([blocks, mutes], &(item.actor not in &1)),
- true <- thread_containment(item, user) do
- send(socket.transport_pid, {:text, represent_update(item, user)})
- end
- else
- send(socket.transport_pid, {:text, represent_update(item)})
- end
- end)
- end
-
- defp internal_topic(topic, socket) when topic in ~w[user user:notification direct] do
- "#{topic}:#{socket.assigns[:user].id}"
- end
-
- defp internal_topic(topic, _), do: topic
-
- @spec thread_containment(Activity.t(), User.t()) :: boolean()
- defp thread_containment(_activity, %User{info: %{skip_thread_containment: true}}), do: true
-
- defp thread_containment(activity, user) do
- if Config.get([:instance, :skip_thread_containment]) do
- true
- else
- ActivityPub.contain_activity(activity, user)
- end
- end
-end
diff --git a/lib/pleroma/web/streamer/ping.ex b/lib/pleroma/web/streamer/ping.ex
new file mode 100644
index 000000000..f77cbb95c
--- /dev/null
+++ b/lib/pleroma/web/streamer/ping.ex
@@ -0,0 +1,33 @@
+defmodule Pleroma.Web.Streamer.Ping do
+ use GenServer
+ require Logger
+
+ alias Pleroma.Web.Streamer.State
+ alias Pleroma.Web.Streamer.StreamerSocket
+
+ @keepalive_interval :timer.seconds(30)
+
+ def start_link(opts) do
+ ping_interval = Keyword.get(opts, :ping_interval, @keepalive_interval)
+ GenServer.start_link(__MODULE__, %{ping_interval: ping_interval}, name: __MODULE__)
+ end
+
+ def init(%{ping_interval: ping_interval} = args) do
+ Process.send_after(self(), :ping, ping_interval)
+ {:ok, args}
+ end
+
+ def handle_info(:ping, %{ping_interval: ping_interval} = state) do
+ State.get_sockets()
+ |> Map.values()
+ |> List.flatten()
+ |> Enum.each(fn %StreamerSocket{transport_pid: transport_pid} ->
+ Logger.debug("Sending keepalive ping")
+ send(transport_pid, {:text, ""})
+ end)
+
+ Process.send_after(self(), :ping, ping_interval)
+
+ {:noreply, state}
+ end
+end
diff --git a/lib/pleroma/web/streamer/state.ex b/lib/pleroma/web/streamer/state.ex
new file mode 100644
index 000000000..7b5199068
--- /dev/null
+++ b/lib/pleroma/web/streamer/state.ex
@@ -0,0 +1,68 @@
+defmodule Pleroma.Web.Streamer.State do
+ use GenServer
+ require Logger
+
+ alias Pleroma.Web.Streamer.StreamerSocket
+
+ def start_link(_) do
+ GenServer.start_link(__MODULE__, %{sockets: %{}}, name: __MODULE__)
+ end
+
+ def add_socket(topic, socket) do
+ GenServer.call(__MODULE__, {:add, socket, topic})
+ end
+
+ def remove_socket(topic, socket) do
+ GenServer.call(__MODULE__, {:remove, socket, topic})
+ end
+
+ def get_sockets do
+ %{sockets: stream_sockets} = GenServer.call(__MODULE__, :get_state)
+ stream_sockets
+ end
+
+ def init(init_arg) do
+ {:ok, init_arg}
+ end
+
+ def handle_call(:get_state, _from, state) do
+ {:reply, state, state}
+ end
+
+ def handle_call({:add, socket, topic}, _from, %{sockets: sockets} = state) do
+ internal_topic = internal_topic(topic, socket)
+ stream_socket = StreamerSocket.from_socket(socket)
+
+ sockets_for_topic =
+ sockets
+ |> Map.get(internal_topic, [])
+ |> List.insert_at(0, stream_socket)
+ |> Enum.uniq()
+
+ state = put_in(state, [:sockets, internal_topic], sockets_for_topic)
+ Logger.debug("Got new conn for #{topic}")
+ {:reply, state, state}
+ end
+
+ def handle_call({:remove, socket, topic}, _from, %{sockets: sockets} = state) do
+ internal_topic = internal_topic(topic, socket)
+ stream_socket = StreamerSocket.from_socket(socket)
+
+ sockets_for_topic =
+ sockets
+ |> Map.get(internal_topic, [])
+ |> List.delete(stream_socket)
+
+ state = Kernel.put_in(state, [:sockets, internal_topic], sockets_for_topic)
+ {:reply, state, state}
+ end
+
+ defp internal_topic(topic, socket)
+ when topic in ~w[user user:notification direct] do
+ "#{topic}:#{socket.assigns[:user].id}"
+ end
+
+ defp internal_topic(topic, _) do
+ topic
+ end
+end
diff --git a/lib/pleroma/web/streamer/streamer.ex b/lib/pleroma/web/streamer/streamer.ex
new file mode 100644
index 000000000..8cf719277
--- /dev/null
+++ b/lib/pleroma/web/streamer/streamer.ex
@@ -0,0 +1,55 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Web.Streamer do
+ alias Pleroma.Web.Streamer.State
+ alias Pleroma.Web.Streamer.Worker
+
+ @timeout 60_000
+ @mix_env Mix.env()
+
+ def add_socket(topic, socket) do
+ State.add_socket(topic, socket)
+ end
+
+ def remove_socket(topic, socket) do
+ State.remove_socket(topic, socket)
+ end
+
+ def get_sockets do
+ State.get_sockets()
+ end
+
+ def stream(topics, items) do
+ if should_send?() do
+ Task.async(fn ->
+ :poolboy.transaction(
+ :streamer_worker,
+ &Worker.stream(&1, topics, items),
+ @timeout
+ )
+ end)
+ end
+ end
+
+ def supervisor, do: Pleroma.Web.Streamer.Supervisor
+
+ defp should_send? do
+ handle_should_send(@mix_env)
+ end
+
+ defp handle_should_send(:test) do
+ case Process.whereis(:streamer_worker) do
+ nil ->
+ false
+
+ pid ->
+ Process.alive?(pid)
+ end
+ end
+
+ defp handle_should_send(_) do
+ true
+ end
+end
diff --git a/lib/pleroma/web/streamer/streamer_socket.ex b/lib/pleroma/web/streamer/streamer_socket.ex
new file mode 100644
index 000000000..f006c0306
--- /dev/null
+++ b/lib/pleroma/web/streamer/streamer_socket.ex
@@ -0,0 +1,31 @@
+defmodule Pleroma.Web.Streamer.StreamerSocket do
+ defstruct transport_pid: nil, user: nil
+
+ alias Pleroma.User
+ alias Pleroma.Web.Streamer.StreamerSocket
+
+ def from_socket(%{
+ transport_pid: transport_pid,
+ assigns: %{user: nil}
+ }) do
+ %StreamerSocket{
+ transport_pid: transport_pid
+ }
+ end
+
+ def from_socket(%{
+ transport_pid: transport_pid,
+ assigns: %{user: %User{} = user}
+ }) do
+ %StreamerSocket{
+ transport_pid: transport_pid,
+ user: user
+ }
+ end
+
+ def from_socket(%{transport_pid: transport_pid}) do
+ %StreamerSocket{
+ transport_pid: transport_pid
+ }
+ end
+end
diff --git a/lib/pleroma/web/streamer/supervisor.ex b/lib/pleroma/web/streamer/supervisor.ex
new file mode 100644
index 000000000..6afe19323
--- /dev/null
+++ b/lib/pleroma/web/streamer/supervisor.ex
@@ -0,0 +1,33 @@
+defmodule Pleroma.Web.Streamer.Supervisor do
+ use Supervisor
+
+ def start_link(opts) do
+ Supervisor.start_link(__MODULE__, opts, name: __MODULE__)
+ end
+
+ def init(args) do
+ children = [
+ {Pleroma.Web.Streamer.State, args},
+ {Pleroma.Web.Streamer.Ping, args},
+ :poolboy.child_spec(:streamer_worker, poolboy_config())
+ ]
+
+ opts = [strategy: :one_for_one, name: Pleroma.Web.Streamer.Supervisor]
+ Supervisor.init(children, opts)
+ end
+
+ defp poolboy_config do
+ opts =
+ Pleroma.Config.get(:streamer,
+ workers: 3,
+ overflow_workers: 2
+ )
+
+ [
+ {:name, {:local, :streamer_worker}},
+ {:worker_module, Pleroma.Web.Streamer.Worker},
+ {:size, opts[:workers]},
+ {:max_overflow, opts[:overflow_workers]}
+ ]
+ end
+end
diff --git a/lib/pleroma/web/streamer/worker.ex b/lib/pleroma/web/streamer/worker.ex
new file mode 100644
index 000000000..5804508eb
--- /dev/null
+++ b/lib/pleroma/web/streamer/worker.ex
@@ -0,0 +1,220 @@
+defmodule Pleroma.Web.Streamer.Worker do
+ use GenServer
+
+ require Logger
+
+ alias Pleroma.Activity
+ alias Pleroma.Config
+ alias Pleroma.Conversation.Participation
+ alias Pleroma.Notification
+ alias Pleroma.Object
+ alias Pleroma.User
+ alias Pleroma.Web.ActivityPub.ActivityPub
+ alias Pleroma.Web.ActivityPub.Visibility
+ alias Pleroma.Web.CommonAPI
+ alias Pleroma.Web.Streamer.State
+ alias Pleroma.Web.Streamer.StreamerSocket
+ alias Pleroma.Web.StreamerView
+
+ def start_link(_) do
+ GenServer.start_link(__MODULE__, %{}, [])
+ end
+
+ def init(init_arg) do
+ {:ok, init_arg}
+ end
+
+ def stream(pid, topics, items) do
+ GenServer.call(pid, {:stream, topics, items})
+ end
+
+ def handle_call({:stream, topics, item}, _from, state) when is_list(topics) do
+ Enum.each(topics, fn t ->
+ do_stream(%{topic: t, item: item})
+ end)
+
+ {:reply, state, state}
+ end
+
+ def handle_call({:stream, topic, items}, _from, state) when is_list(items) do
+ Enum.each(items, fn i ->
+ do_stream(%{topic: topic, item: i})
+ end)
+
+ {:reply, state, state}
+ end
+
+ def handle_call({:stream, topic, item}, _from, state) do
+ do_stream(%{topic: topic, item: item})
+
+ {:reply, state, state}
+ end
+
+ defp do_stream(%{topic: "direct", item: item}) do
+ recipient_topics =
+ User.get_recipients_from_activity(item)
+ |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
+
+ Enum.each(recipient_topics, fn user_topic ->
+ Logger.debug("Trying to push direct message to #{user_topic}\n\n")
+ push_to_socket(State.get_sockets(), user_topic, item)
+ end)
+ end
+
+ defp do_stream(%{topic: "participation", item: participation}) do
+ user_topic = "direct:#{participation.user_id}"
+ Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
+
+ push_to_socket(State.get_sockets(), user_topic, participation)
+ end
+
+ defp do_stream(%{topic: "list", item: item}) do
+ # filter the recipient list if the activity is not public, see #270.
+ recipient_lists =
+ case Visibility.is_public?(item) do
+ true ->
+ Pleroma.List.get_lists_from_activity(item)
+
+ _ ->
+ Pleroma.List.get_lists_from_activity(item)
+ |> Enum.filter(fn list ->
+ owner = User.get_cached_by_id(list.user_id)
+
+ Visibility.visible_for_user?(item, owner)
+ end)
+ end
+
+ recipient_topics =
+ recipient_lists
+ |> Enum.map(fn %{id: id} -> "list:#{id}" end)
+
+ Enum.each(recipient_topics, fn list_topic ->
+ Logger.debug("Trying to push message to #{list_topic}\n\n")
+ push_to_socket(State.get_sockets(), list_topic, item)
+ end)
+ end
+
+ defp do_stream(%{topic: topic, item: %Notification{} = item})
+ when topic in ["user", "user:notification"] do
+ State.get_sockets()
+ |> Map.get("#{topic}:#{item.user_id}", [])
+ |> Enum.each(fn %StreamerSocket{transport_pid: transport_pid, user: socket_user} ->
+ with %User{} = user <- User.get_cached_by_ap_id(socket_user.ap_id),
+ true <- should_send?(user, item) do
+ send(transport_pid, {:text, StreamerView.render("notification.json", socket_user, item)})
+ end
+ end)
+ end
+
+ defp do_stream(%{topic: "user", item: item}) do
+ Logger.debug("Trying to push to users")
+
+ recipient_topics =
+ User.get_recipients_from_activity(item)
+ |> Enum.map(fn %{id: id} -> "user:#{id}" end)
+
+ Enum.each(recipient_topics, fn topic ->
+ push_to_socket(State.get_sockets(), topic, item)
+ end)
+ end
+
+ defp do_stream(%{topic: topic, item: item}) do
+ Logger.debug("Trying to push to #{topic}")
+ Logger.debug("Pushing item to #{topic}")
+ push_to_socket(State.get_sockets(), topic, item)
+ end
+
+ defp should_send?(%User{} = user, %Activity{} = item) do
+ blocks = user.info.blocks || []
+ mutes = user.info.mutes || []
+ reblog_mutes = user.info.muted_reblogs || []
+ domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.info.domain_blocks)
+
+ with parent when not is_nil(parent) <- Object.normalize(item),
+ true <- Enum.all?([blocks, mutes, reblog_mutes], &(item.actor not in &1)),
+ true <- Enum.all?([blocks, mutes], &(parent.data["actor"] not in &1)),
+ %{host: item_host} <- URI.parse(item.actor),
+ %{host: parent_host} <- URI.parse(parent.data["actor"]),
+ false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
+ false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
+ true <- thread_containment(item, user),
+ false <- CommonAPI.thread_muted?(user, item) do
+ true
+ else
+ _ -> false
+ end
+ end
+
+ defp should_send?(%User{} = user, %Notification{activity: activity}) do
+ should_send?(user, activity)
+ end
+
+ def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do
+ Enum.each(topics[topic] || [], fn %StreamerSocket{
+ transport_pid: transport_pid,
+ user: socket_user
+ } ->
+ # Get the current user so we have up-to-date blocks etc.
+ if socket_user do
+ user = User.get_cached_by_ap_id(socket_user.ap_id)
+
+ if should_send?(user, item) do
+ send(transport_pid, {:text, StreamerView.render("update.json", item, user)})
+ end
+ else
+ send(transport_pid, {:text, StreamerView.render("update.json", item)})
+ end
+ end)
+ end
+
+ def push_to_socket(topics, topic, %Participation{} = participation) do
+ Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
+ send(transport_pid, {:text, StreamerView.render("conversation.json", participation)})
+ end)
+ end
+
+ def push_to_socket(topics, topic, %Activity{
+ data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
+ }) do
+ Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
+ send(
+ transport_pid,
+ {:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()}
+ )
+ end)
+ end
+
+ def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
+
+ def push_to_socket(topics, topic, item) do
+ Enum.each(topics[topic] || [], fn %StreamerSocket{
+ transport_pid: transport_pid,
+ user: socket_user
+ } ->
+ # Get the current user so we have up-to-date blocks etc.
+ if socket_user do
+ user = User.get_cached_by_ap_id(socket_user.ap_id)
+ blocks = user.info.blocks || []
+ mutes = user.info.mutes || []
+
+ with true <- Enum.all?([blocks, mutes], &(item.actor not in &1)),
+ true <- thread_containment(item, user) do
+ send(transport_pid, {:text, StreamerView.render("update.json", item, user)})
+ end
+ else
+ send(transport_pid, {:text, StreamerView.render("update.json", item)})
+ end
+ end)
+ end
+
+ @spec thread_containment(Activity.t(), User.t()) :: boolean()
+ defp thread_containment(_activity, %User{info: %{skip_thread_containment: true}}), do: true
+
+ defp thread_containment(activity, user) do
+ if Config.get([:instance, :skip_thread_containment]) do
+ true
+ else
+ ActivityPub.contain_activity(activity, user)
+ end
+ end
+end
diff --git a/lib/pleroma/web/twitter_api/controllers/util_controller.ex b/lib/pleroma/web/twitter_api/controllers/util_controller.ex
index 3405bd3b7..d7745ae7a 100644
--- a/lib/pleroma/web/twitter_api/controllers/util_controller.ex
+++ b/lib/pleroma/web/twitter_api/controllers/util_controller.ex
@@ -265,12 +265,7 @@ defmodule Pleroma.Web.TwitterAPI.UtilController do
String.split(line, ",") |> List.first()
end)
|> List.delete("Account address") do
- PleromaJobQueue.enqueue(:background, User, [
- :follow_import,
- follower,
- followed_identifiers
- ])
-
+ User.follow_import(follower, followed_identifiers)
json(conn, "job started")
end
end
@@ -281,12 +276,7 @@ defmodule Pleroma.Web.TwitterAPI.UtilController do
def blocks_import(%{assigns: %{user: blocker}} = conn, %{"list" => list}) do
with blocked_identifiers <- String.split(list) do
- PleromaJobQueue.enqueue(:background, User, [
- :blocks_import,
- blocker,
- blocked_identifiers
- ])
-
+ User.blocks_import(blocker, blocked_identifiers)
json(conn, "job started")
end
end
@@ -314,6 +304,25 @@ defmodule Pleroma.Web.TwitterAPI.UtilController do
end
end
+ def change_email(%{assigns: %{user: user}} = conn, params) do
+ case CommonAPI.Utils.confirm_current_password(user, params["password"]) do
+ {:ok, user} ->
+ with {:ok, _user} <- User.change_email(user, params["email"]) do
+ json(conn, %{status: "success"})
+ else
+ {:error, changeset} ->
+ {_, {error, _}} = Enum.at(changeset.errors, 0)
+ json(conn, %{error: "Email #{error}."})
+
+ _ ->
+ json(conn, %{error: "Unable to change email."})
+ end
+
+ {:error, msg} ->
+ json(conn, %{error: msg})
+ end
+ end
+
def delete_account(%{assigns: %{user: user}} = conn, params) do
case CommonAPI.Utils.confirm_current_password(user, params["password"]) do
{:ok, user} ->
diff --git a/lib/pleroma/web/twitter_api/representers/base_representer.ex b/lib/pleroma/web/twitter_api/representers/base_representer.ex
deleted file mode 100644
index 3d31e6079..000000000
--- a/lib/pleroma/web/twitter_api/representers/base_representer.ex
+++ /dev/null
@@ -1,38 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Web.TwitterAPI.Representers.BaseRepresenter do
- defmacro __using__(_opts) do
- quote do
- def to_json(object) do
- to_json(object, %{})
- end
-
- def to_json(object, options) do
- object
- |> to_map(options)
- |> Jason.encode!()
- end
-
- def enum_to_list(enum, options) do
- mapping = fn el -> to_map(el, options) end
- Enum.map(enum, mapping)
- end
-
- def to_map(object) do
- to_map(object, %{})
- end
-
- def enum_to_json(enum) do
- enum_to_json(enum, %{})
- end
-
- def enum_to_json(enum, options) do
- enum
- |> enum_to_list(options)
- |> Jason.encode!()
- end
- end
- end
-end
diff --git a/lib/pleroma/web/twitter_api/representers/object_representer.ex b/lib/pleroma/web/twitter_api/representers/object_representer.ex
deleted file mode 100644
index 47130ba06..000000000
--- a/lib/pleroma/web/twitter_api/representers/object_representer.ex
+++ /dev/null
@@ -1,39 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Web.TwitterAPI.Representers.ObjectRepresenter do
- use Pleroma.Web.TwitterAPI.Representers.BaseRepresenter
- alias Pleroma.Object
-
- def to_map(%Object{data: %{"url" => [url | _]}} = object, _opts) do
- data = object.data
-
- %{
- url: url["href"] |> Pleroma.Web.MediaProxy.url(),
- mimetype: url["mediaType"] || url["mimeType"],
- id: data["uuid"],
- oembed: false,
- description: data["name"]
- }
- end
-
- def to_map(%Object{data: %{"url" => url} = data}, _opts) when is_binary(url) do
- %{
- url: url |> Pleroma.Web.MediaProxy.url(),
- mimetype: data["mediaType"] || data["mimeType"],
- id: data["uuid"],
- oembed: false,
- description: data["name"]
- }
- end
-
- def to_map(%Object{}, _opts) do
- %{}
- end
-
- # If we only get the naked data, wrap in an object
- def to_map(%{} = data, opts) do
- to_map(%Object{data: data}, opts)
- end
-end
diff --git a/lib/pleroma/web/twitter_api/twitter_api.ex b/lib/pleroma/web/twitter_api/twitter_api.ex
index 80082ea84..8eda762c7 100644
--- a/lib/pleroma/web/twitter_api/twitter_api.ex
+++ b/lib/pleroma/web/twitter_api/twitter_api.ex
@@ -3,133 +3,14 @@
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.TwitterAPI.TwitterAPI do
- alias Pleroma.Activity
alias Pleroma.Emails.Mailer
alias Pleroma.Emails.UserEmail
alias Pleroma.Repo
alias Pleroma.User
alias Pleroma.UserInviteToken
- alias Pleroma.Web.ActivityPub.ActivityPub
- alias Pleroma.Web.CommonAPI
- alias Pleroma.Web.TwitterAPI.UserView
-
- import Ecto.Query
require Pleroma.Constants
- def create_status(%User{} = user, %{"status" => _} = data) do
- CommonAPI.post(user, data)
- end
-
- def delete(%User{} = user, id) do
- with %Activity{data: %{"type" => _type}} <- Activity.get_by_id(id),
- {:ok, activity} <- CommonAPI.delete(id, user) do
- {:ok, activity}
- end
- end
-
- def follow(%User{} = follower, params) do
- with {:ok, %User{} = followed} <- get_user(params) do
- CommonAPI.follow(follower, followed)
- end
- end
-
- def unfollow(%User{} = follower, params) do
- with {:ok, %User{} = unfollowed} <- get_user(params),
- {:ok, follower} <- CommonAPI.unfollow(follower, unfollowed) do
- {:ok, follower, unfollowed}
- end
- end
-
- def block(%User{} = blocker, params) do
- with {:ok, %User{} = blocked} <- get_user(params),
- {:ok, blocker} <- User.block(blocker, blocked),
- {:ok, _activity} <- ActivityPub.block(blocker, blocked) do
- {:ok, blocker, blocked}
- else
- err -> err
- end
- end
-
- def unblock(%User{} = blocker, params) do
- with {:ok, %User{} = blocked} <- get_user(params),
- {:ok, blocker} <- User.unblock(blocker, blocked),
- {:ok, _activity} <- ActivityPub.unblock(blocker, blocked) do
- {:ok, blocker, blocked}
- else
- err -> err
- end
- end
-
- def repeat(%User{} = user, ap_id_or_id) do
- with {:ok, _announce, %{data: %{"id" => id}}} <- CommonAPI.repeat(ap_id_or_id, user),
- %Activity{} = activity <- Activity.get_create_by_object_ap_id(id) do
- {:ok, activity}
- end
- end
-
- def unrepeat(%User{} = user, ap_id_or_id) do
- with {:ok, _unannounce, %{data: %{"id" => id}}} <- CommonAPI.unrepeat(ap_id_or_id, user),
- %Activity{} = activity <- Activity.get_create_by_object_ap_id(id) do
- {:ok, activity}
- end
- end
-
- def pin(%User{} = user, ap_id_or_id) do
- CommonAPI.pin(ap_id_or_id, user)
- end
-
- def unpin(%User{} = user, ap_id_or_id) do
- CommonAPI.unpin(ap_id_or_id, user)
- end
-
- def fav(%User{} = user, ap_id_or_id) do
- with {:ok, _fav, %{data: %{"id" => id}}} <- CommonAPI.favorite(ap_id_or_id, user),
- %Activity{} = activity <- Activity.get_create_by_object_ap_id(id) do
- {:ok, activity}
- end
- end
-
- def unfav(%User{} = user, ap_id_or_id) do
- with {:ok, _unfav, _fav, %{data: %{"id" => id}}} <- CommonAPI.unfavorite(ap_id_or_id, user),
- %Activity{} = activity <- Activity.get_create_by_object_ap_id(id) do
- {:ok, activity}
- end
- end
-
- def upload(%Plug.Upload{} = file, %User{} = user, format \\ "xml") do
- {:ok, object} = ActivityPub.upload(file, actor: User.ap_id(user))
-
- url = List.first(object.data["url"])
- href = url["href"]
- type = url["mediaType"]
-
- case format do
- "xml" ->
- # Fake this as good as possible...
- """
- <?xml version="1.0" encoding="UTF-8"?>
- <rsp stat="ok" xmlns:atom="http://www.w3.org/2005/Atom">
- <mediaid>#{object.id}</mediaid>
- <media_id>#{object.id}</media_id>
- <media_id_string>#{object.id}</media_id_string>
- <media_url>#{href}</media_url>
- <mediaurl>#{href}</mediaurl>
- <atom:link rel="enclosure" href="#{href}" type="#{type}"></atom:link>
- </rsp>
- """
-
- "json" ->
- %{
- media_id: object.id,
- media_id_string: "#{object.id}}",
- media_url: href,
- size: 0
- }
- |> Jason.encode!()
- end
- end
-
def register_user(params, opts \\ []) do
token = params["token"]
@@ -236,80 +117,4 @@ defmodule Pleroma.Web.TwitterAPI.TwitterAPI do
{:error, "unknown user"}
end
end
-
- def get_user(user \\ nil, params) do
- case params do
- %{"user_id" => user_id} ->
- case User.get_cached_by_nickname_or_id(user_id) do
- nil ->
- {:error, "No user with such user_id"}
-
- %User{info: %{deactivated: true}} ->
- {:error, "User has been disabled"}
-
- user ->
- {:ok, user}
- end
-
- %{"screen_name" => nickname} ->
- case User.get_cached_by_nickname(nickname) do
- nil -> {:error, "No user with such screen_name"}
- target -> {:ok, target}
- end
-
- _ ->
- if user do
- {:ok, user}
- else
- {:error, "You need to specify screen_name or user_id"}
- end
- end
- end
-
- defp parse_int(string, default)
-
- defp parse_int(string, default) when is_binary(string) do
- with {n, _} <- Integer.parse(string) do
- n
- else
- _e -> default
- end
- end
-
- defp parse_int(_, default), do: default
-
- # TODO: unify the search query with MastoAPI one and do only pagination here
- def search(_user, %{"q" => query} = params) do
- limit = parse_int(params["rpp"], 20)
- page = parse_int(params["page"], 1)
- offset = (page - 1) * limit
-
- q =
- from(
- [a, o] in Activity.with_preloaded_object(Activity),
- where: fragment("?->>'type' = 'Create'", a.data),
- where: ^Pleroma.Constants.as_public() in a.recipients,
- where:
- fragment(
- "to_tsvector('english', ?->>'content') @@ plainto_tsquery('english', ?)",
- o.data,
- ^query
- ),
- limit: ^limit,
- offset: ^offset,
- # this one isn't indexed so psql won't take the wrong index.
- order_by: [desc: :inserted_at]
- )
-
- _activities = Repo.all(q)
- end
-
- def get_external_profile(for_user, uri) do
- with {:ok, %User{} = user} <- User.get_or_fetch(uri) do
- {:ok, UserView.render("show.json", %{user: user, for: for_user})}
- else
- _e ->
- {:error, "Couldn't find user"}
- end
- end
end
diff --git a/lib/pleroma/web/twitter_api/twitter_api_controller.ex b/lib/pleroma/web/twitter_api/twitter_api_controller.ex
index 5dfab6a6c..42234ae09 100644
--- a/lib/pleroma/web/twitter_api/twitter_api_controller.ex
+++ b/lib/pleroma/web/twitter_api/twitter_api_controller.ex
@@ -5,448 +5,16 @@
defmodule Pleroma.Web.TwitterAPI.Controller do
use Pleroma.Web, :controller
- import Pleroma.Web.ControllerHelper, only: [json_response: 3]
-
alias Ecto.Changeset
- alias Pleroma.Activity
- alias Pleroma.Formatter
alias Pleroma.Notification
- alias Pleroma.Object
- alias Pleroma.Repo
alias Pleroma.User
- alias Pleroma.Web.ActivityPub.ActivityPub
- alias Pleroma.Web.ActivityPub.Visibility
- alias Pleroma.Web.CommonAPI
- alias Pleroma.Web.CommonAPI.Utils
alias Pleroma.Web.OAuth.Token
- alias Pleroma.Web.TwitterAPI.ActivityView
- alias Pleroma.Web.TwitterAPI.NotificationView
alias Pleroma.Web.TwitterAPI.TokenView
- alias Pleroma.Web.TwitterAPI.TwitterAPI
- alias Pleroma.Web.TwitterAPI.UserView
require Logger
- plug(Pleroma.Plugs.RateLimiter, :password_reset when action == :password_reset)
- plug(:only_if_public_instance when action in [:public_timeline, :public_and_external_timeline])
action_fallback(:errors)
- def verify_credentials(%{assigns: %{user: user}} = conn, _params) do
- token = Phoenix.Token.sign(conn, "user socket", user.id)
-
- conn
- |> put_view(UserView)
- |> render("show.json", %{user: user, token: token, for: user})
- end
-
- def status_update(%{assigns: %{user: user}} = conn, %{"status" => _} = status_data) do
- with media_ids <- extract_media_ids(status_data),
- {:ok, activity} <-
- TwitterAPI.create_status(user, Map.put(status_data, "media_ids", media_ids)) do
- conn
- |> json(ActivityView.render("activity.json", activity: activity, for: user))
- else
- _ -> empty_status_reply(conn)
- end
- end
-
- def status_update(conn, _status_data) do
- empty_status_reply(conn)
- end
-
- defp empty_status_reply(conn) do
- bad_request_reply(conn, "Client must provide a 'status' parameter with a value.")
- end
-
- defp extract_media_ids(status_data) do
- with media_ids when not is_nil(media_ids) <- status_data["media_ids"],
- split_ids <- String.split(media_ids, ","),
- clean_ids <- Enum.reject(split_ids, fn id -> String.length(id) == 0 end) do
- clean_ids
- else
- _e -> []
- end
- end
-
- def public_and_external_timeline(%{assigns: %{user: user}} = conn, params) do
- params =
- params
- |> Map.put("type", ["Create", "Announce"])
- |> Map.put("blocking_user", user)
-
- activities = ActivityPub.fetch_public_activities(params)
-
- conn
- |> put_view(ActivityView)
- |> render("index.json", %{activities: activities, for: user})
- end
-
- def public_timeline(%{assigns: %{user: user}} = conn, params) do
- params =
- params
- |> Map.put("type", ["Create", "Announce"])
- |> Map.put("local_only", true)
- |> Map.put("blocking_user", user)
-
- activities = ActivityPub.fetch_public_activities(params)
-
- conn
- |> put_view(ActivityView)
- |> render("index.json", %{activities: activities, for: user})
- end
-
- def friends_timeline(%{assigns: %{user: user}} = conn, params) do
- params =
- params
- |> Map.put("type", ["Create", "Announce", "Follow", "Like"])
- |> Map.put("blocking_user", user)
- |> Map.put("user", user)
-
- activities = ActivityPub.fetch_activities([user.ap_id | user.following], params)
-
- conn
- |> put_view(ActivityView)
- |> render("index.json", %{activities: activities, for: user})
- end
-
- def show_user(conn, params) do
- for_user = conn.assigns.user
-
- with {:ok, shown} <- TwitterAPI.get_user(params),
- true <-
- User.auth_active?(shown) ||
- (for_user && (for_user.id == shown.id || User.superuser?(for_user))) do
- params =
- if for_user do
- %{user: shown, for: for_user}
- else
- %{user: shown}
- end
-
- conn
- |> put_view(UserView)
- |> render("show.json", params)
- else
- {:error, msg} ->
- bad_request_reply(conn, msg)
-
- false ->
- conn
- |> put_status(404)
- |> json(%{error: "Unconfirmed user"})
- end
- end
-
- def user_timeline(%{assigns: %{user: user}} = conn, params) do
- case TwitterAPI.get_user(user, params) do
- {:ok, target_user} ->
- # Twitter and ActivityPub use a different name and sense for this parameter.
- {include_rts, params} = Map.pop(params, "include_rts")
-
- params =
- case include_rts do
- x when x == "false" or x == "0" -> Map.put(params, "exclude_reblogs", "true")
- _ -> params
- end
-
- activities = ActivityPub.fetch_user_activities(target_user, user, params)
-
- conn
- |> put_view(ActivityView)
- |> render("index.json", %{activities: activities, for: user})
-
- {:error, msg} ->
- bad_request_reply(conn, msg)
- end
- end
-
- def mentions_timeline(%{assigns: %{user: user}} = conn, params) do
- params =
- params
- |> Map.put("type", ["Create", "Announce", "Follow", "Like"])
- |> Map.put("blocking_user", user)
- |> Map.put(:visibility, ~w[unlisted public private])
-
- activities = ActivityPub.fetch_activities([user.ap_id], params)
-
- conn
- |> put_view(ActivityView)
- |> render("index.json", %{activities: activities, for: user})
- end
-
- def dm_timeline(%{assigns: %{user: user}} = conn, params) do
- params =
- params
- |> Map.put("type", "Create")
- |> Map.put("blocking_user", user)
- |> Map.put("user", user)
- |> Map.put(:visibility, "direct")
- |> Map.put(:order, :desc)
-
- activities =
- ActivityPub.fetch_activities_query([user.ap_id], params)
- |> Repo.all()
-
- conn
- |> put_view(ActivityView)
- |> render("index.json", %{activities: activities, for: user})
- end
-
- def notifications(%{assigns: %{user: user}} = conn, params) do
- params =
- if Map.has_key?(params, "with_muted") do
- Map.put(params, :with_muted, params["with_muted"] in [true, "True", "true", "1"])
- else
- params
- end
-
- notifications = Notification.for_user(user, params)
-
- conn
- |> put_view(NotificationView)
- |> render("notification.json", %{notifications: notifications, for: user})
- end
-
- def notifications_read(%{assigns: %{user: user}} = conn, %{"latest_id" => latest_id} = params) do
- Notification.set_read_up_to(user, latest_id)
-
- notifications = Notification.for_user(user, params)
-
- conn
- |> put_view(NotificationView)
- |> render("notification.json", %{notifications: notifications, for: user})
- end
-
- def notifications_read(%{assigns: %{user: _user}} = conn, _) do
- bad_request_reply(conn, "You need to specify latest_id")
- end
-
- def follow(%{assigns: %{user: user}} = conn, params) do
- case TwitterAPI.follow(user, params) do
- {:ok, user, followed, _activity} ->
- conn
- |> put_view(UserView)
- |> render("show.json", %{user: followed, for: user})
-
- {:error, msg} ->
- forbidden_json_reply(conn, msg)
- end
- end
-
- def block(%{assigns: %{user: user}} = conn, params) do
- case TwitterAPI.block(user, params) do
- {:ok, user, blocked} ->
- conn
- |> put_view(UserView)
- |> render("show.json", %{user: blocked, for: user})
-
- {:error, msg} ->
- forbidden_json_reply(conn, msg)
- end
- end
-
- def unblock(%{assigns: %{user: user}} = conn, params) do
- case TwitterAPI.unblock(user, params) do
- {:ok, user, blocked} ->
- conn
- |> put_view(UserView)
- |> render("show.json", %{user: blocked, for: user})
-
- {:error, msg} ->
- forbidden_json_reply(conn, msg)
- end
- end
-
- def delete_post(%{assigns: %{user: user}} = conn, %{"id" => id}) do
- with {:ok, activity} <- TwitterAPI.delete(user, id) do
- conn
- |> put_view(ActivityView)
- |> render("activity.json", %{activity: activity, for: user})
- end
- end
-
- def unfollow(%{assigns: %{user: user}} = conn, params) do
- case TwitterAPI.unfollow(user, params) do
- {:ok, user, unfollowed} ->
- conn
- |> put_view(UserView)
- |> render("show.json", %{user: unfollowed, for: user})
-
- {:error, msg} ->
- forbidden_json_reply(conn, msg)
- end
- end
-
- def fetch_status(%{assigns: %{user: user}} = conn, %{"id" => id}) do
- with %Activity{} = activity <- Activity.get_by_id(id),
- true <- Visibility.visible_for_user?(activity, user) do
- conn
- |> put_view(ActivityView)
- |> render("activity.json", %{activity: activity, for: user})
- end
- end
-
- def fetch_conversation(%{assigns: %{user: user}} = conn, %{"id" => id}) do
- with context when is_binary(context) <- Utils.conversation_id_to_context(id),
- activities <-
- ActivityPub.fetch_activities_for_context(context, %{
- "blocking_user" => user,
- "user" => user
- }) do
- conn
- |> put_view(ActivityView)
- |> render("index.json", %{activities: activities, for: user})
- end
- end
-
- @doc """
- Updates metadata of uploaded media object.
- Derived from [Twitter API endpoint](https://developer.twitter.com/en/docs/media/upload-media/api-reference/post-media-metadata-create).
- """
- def update_media(%{assigns: %{user: user}} = conn, %{"media_id" => id} = data) do
- object = Repo.get(Object, id)
- description = get_in(data, ["alt_text", "text"]) || data["name"] || data["description"]
-
- {conn, status, response_body} =
- cond do
- !object ->
- {halt(conn), :not_found, ""}
-
- !Object.authorize_mutation(object, user) ->
- {halt(conn), :forbidden, "You can only update your own uploads."}
-
- !is_binary(description) ->
- {conn, :not_modified, ""}
-
- true ->
- new_data = Map.put(object.data, "name", description)
-
- {:ok, _} =
- object
- |> Object.change(%{data: new_data})
- |> Repo.update()
-
- {conn, :no_content, ""}
- end
-
- conn
- |> put_status(status)
- |> json(response_body)
- end
-
- def upload(%{assigns: %{user: user}} = conn, %{"media" => media}) do
- response = TwitterAPI.upload(media, user)
-
- conn
- |> put_resp_content_type("application/atom+xml")
- |> send_resp(200, response)
- end
-
- def upload_json(%{assigns: %{user: user}} = conn, %{"media" => media}) do
- response = TwitterAPI.upload(media, user, "json")
-
- conn
- |> json_reply(200, response)
- end
-
- def get_by_id_or_ap_id(id) do
- activity = Activity.get_by_id(id) || Activity.get_create_by_object_ap_id(id)
-
- if activity.data["type"] == "Create" do
- activity
- else
- Activity.get_create_by_object_ap_id(activity.data["object"])
- end
- end
-
- def favorite(%{assigns: %{user: user}} = conn, %{"id" => id}) do
- with {:ok, activity} <- TwitterAPI.fav(user, id) do
- conn
- |> put_view(ActivityView)
- |> render("activity.json", %{activity: activity, for: user})
- else
- _ -> json_reply(conn, 400, Jason.encode!(%{}))
- end
- end
-
- def unfavorite(%{assigns: %{user: user}} = conn, %{"id" => id}) do
- with {:ok, activity} <- TwitterAPI.unfav(user, id) do
- conn
- |> put_view(ActivityView)
- |> render("activity.json", %{activity: activity, for: user})
- else
- _ -> json_reply(conn, 400, Jason.encode!(%{}))
- end
- end
-
- def retweet(%{assigns: %{user: user}} = conn, %{"id" => id}) do
- with {:ok, activity} <- TwitterAPI.repeat(user, id) do
- conn
- |> put_view(ActivityView)
- |> render("activity.json", %{activity: activity, for: user})
- else
- _ -> json_reply(conn, 400, Jason.encode!(%{}))
- end
- end
-
- def unretweet(%{assigns: %{user: user}} = conn, %{"id" => id}) do
- with {:ok, activity} <- TwitterAPI.unrepeat(user, id) do
- conn
- |> put_view(ActivityView)
- |> render("activity.json", %{activity: activity, for: user})
- else
- _ -> json_reply(conn, 400, Jason.encode!(%{}))
- end
- end
-
- def pin(%{assigns: %{user: user}} = conn, %{"id" => id}) do
- with {:ok, activity} <- TwitterAPI.pin(user, id) do
- conn
- |> put_view(ActivityView)
- |> render("activity.json", %{activity: activity, for: user})
- else
- {:error, message} -> bad_request_reply(conn, message)
- err -> err
- end
- end
-
- def unpin(%{assigns: %{user: user}} = conn, %{"id" => id}) do
- with {:ok, activity} <- TwitterAPI.unpin(user, id) do
- conn
- |> put_view(ActivityView)
- |> render("activity.json", %{activity: activity, for: user})
- else
- {:error, message} -> bad_request_reply(conn, message)
- err -> err
- end
- end
-
- def register(conn, params) do
- with {:ok, user} <- TwitterAPI.register_user(params) do
- conn
- |> put_view(UserView)
- |> render("show.json", %{user: user})
- else
- {:error, errors} ->
- conn
- |> json_reply(400, Jason.encode!(errors))
- end
- end
-
- def password_reset(conn, params) do
- nickname_or_email = params["email"] || params["nickname"]
-
- with {:ok, _} <- TwitterAPI.password_reset(nickname_or_email) do
- json_response(conn, :no_content, "")
- else
- {:error, "unknown user"} ->
- send_resp(conn, :not_found, "")
-
- {:error, _} ->
- send_resp(conn, :bad_request, "")
- end
- end
-
def confirm_email(conn, %{"user_id" => uid, "token" => token}) do
with %User{} = user <- User.get_cached_by_id(uid),
true <- user.local,
@@ -460,147 +28,6 @@ defmodule Pleroma.Web.TwitterAPI.Controller do
end
end
- def resend_confirmation_email(conn, params) do
- nickname_or_email = params["email"] || params["nickname"]
-
- with %User{} = user <- User.get_by_nickname_or_email(nickname_or_email),
- {:ok, _} <- User.try_send_confirmation_email(user) do
- conn
- |> json_response(:no_content, "")
- end
- end
-
- def update_avatar(%{assigns: %{user: user}} = conn, %{"img" => ""}) do
- change = Changeset.change(user, %{avatar: nil})
- {:ok, user} = User.update_and_set_cache(change)
- CommonAPI.update(user)
-
- conn
- |> put_view(UserView)
- |> render("show.json", %{user: user, for: user})
- end
-
- def update_avatar(%{assigns: %{user: user}} = conn, params) do
- {:ok, object} = ActivityPub.upload(params, type: :avatar)
- change = Changeset.change(user, %{avatar: object.data})
- {:ok, user} = User.update_and_set_cache(change)
- CommonAPI.update(user)
-
- conn
- |> put_view(UserView)
- |> render("show.json", %{user: user, for: user})
- end
-
- def update_banner(%{assigns: %{user: user}} = conn, %{"banner" => ""}) do
- with new_info <- %{"banner" => %{}},
- info_cng <- User.Info.profile_update(user.info, new_info),
- changeset <- Ecto.Changeset.change(user) |> Ecto.Changeset.put_embed(:info, info_cng),
- {:ok, user} <- User.update_and_set_cache(changeset) do
- CommonAPI.update(user)
- response = %{url: nil} |> Jason.encode!()
-
- conn
- |> json_reply(200, response)
- end
- end
-
- def update_banner(%{assigns: %{user: user}} = conn, params) do
- with {:ok, object} <- ActivityPub.upload(%{"img" => params["banner"]}, type: :banner),
- new_info <- %{"banner" => object.data},
- info_cng <- User.Info.profile_update(user.info, new_info),
- changeset <- Ecto.Changeset.change(user) |> Ecto.Changeset.put_embed(:info, info_cng),
- {:ok, user} <- User.update_and_set_cache(changeset) do
- CommonAPI.update(user)
- %{"url" => [%{"href" => href} | _]} = object.data
- response = %{url: href} |> Jason.encode!()
-
- conn
- |> json_reply(200, response)
- end
- end
-
- def update_background(%{assigns: %{user: user}} = conn, %{"img" => ""}) do
- with new_info <- %{"background" => %{}},
- info_cng <- User.Info.profile_update(user.info, new_info),
- changeset <- Ecto.Changeset.change(user) |> Ecto.Changeset.put_embed(:info, info_cng),
- {:ok, _user} <- User.update_and_set_cache(changeset) do
- response = %{url: nil} |> Jason.encode!()
-
- conn
- |> json_reply(200, response)
- end
- end
-
- def update_background(%{assigns: %{user: user}} = conn, params) do
- with {:ok, object} <- ActivityPub.upload(params, type: :background),
- new_info <- %{"background" => object.data},
- info_cng <- User.Info.profile_update(user.info, new_info),
- changeset <- Ecto.Changeset.change(user) |> Ecto.Changeset.put_embed(:info, info_cng),
- {:ok, _user} <- User.update_and_set_cache(changeset) do
- %{"url" => [%{"href" => href} | _]} = object.data
- response = %{url: href} |> Jason.encode!()
-
- conn
- |> json_reply(200, response)
- end
- end
-
- def external_profile(%{assigns: %{user: current_user}} = conn, %{"profileurl" => uri}) do
- with {:ok, user_map} <- TwitterAPI.get_external_profile(current_user, uri),
- response <- Jason.encode!(user_map) do
- conn
- |> json_reply(200, response)
- else
- _e ->
- conn
- |> put_status(404)
- |> json(%{error: "Can't find user"})
- end
- end
-
- def followers(%{assigns: %{user: for_user}} = conn, params) do
- {:ok, page} = Ecto.Type.cast(:integer, params["page"] || 1)
-
- with {:ok, user} <- TwitterAPI.get_user(for_user, params),
- {:ok, followers} <- User.get_followers(user, page) do
- followers =
- cond do
- for_user && user.id == for_user.id -> followers
- user.info.hide_followers -> []
- true -> followers
- end
-
- conn
- |> put_view(UserView)
- |> render("index.json", %{users: followers, for: conn.assigns[:user]})
- else
- _e -> bad_request_reply(conn, "Can't get followers")
- end
- end
-
- def friends(%{assigns: %{user: for_user}} = conn, params) do
- {:ok, page} = Ecto.Type.cast(:integer, params["page"] || 1)
- {:ok, export} = Ecto.Type.cast(:boolean, params["all"] || false)
-
- page = if export, do: nil, else: page
-
- with {:ok, user} <- TwitterAPI.get_user(conn.assigns[:user], params),
- {:ok, friends} <- User.get_friends(user, page) do
- friends =
- cond do
- for_user && user.id == for_user.id -> friends
- user.info.hide_follows -> []
- true -> friends
- end
-
- conn
- |> put_view(UserView)
- |> render("index.json", %{users: friends, for: conn.assigns[:user]})
- else
- _e -> bad_request_reply(conn, "Can't get friends")
- end
- end
-
def oauth_tokens(%{assigns: %{user: user}} = conn, _params) do
with oauth_tokens <- Token.get_user_tokens(user) do
conn
@@ -615,160 +42,16 @@ defmodule Pleroma.Web.TwitterAPI.Controller do
json_reply(conn, 201, "")
end
- def blocks(%{assigns: %{user: user}} = conn, _params) do
- with blocked_users <- User.blocked_users(user) do
- conn
- |> put_view(UserView)
- |> render("index.json", %{users: blocked_users, for: user})
- end
- end
-
- def friend_requests(conn, params) do
- with {:ok, user} <- TwitterAPI.get_user(conn.assigns[:user], params),
- {:ok, friend_requests} <- User.get_follow_requests(user) do
- conn
- |> put_view(UserView)
- |> render("index.json", %{users: friend_requests, for: conn.assigns[:user]})
- else
- _e -> bad_request_reply(conn, "Can't get friend requests")
- end
- end
-
- def approve_friend_request(conn, %{"user_id" => uid} = _params) do
- with followed <- conn.assigns[:user],
- %User{} = follower <- User.get_cached_by_id(uid),
- {:ok, follower} <- CommonAPI.accept_follow_request(follower, followed) do
- conn
- |> put_view(UserView)
- |> render("show.json", %{user: follower, for: followed})
- else
- e -> bad_request_reply(conn, "Can't approve user: #{inspect(e)}")
- end
- end
-
- def deny_friend_request(conn, %{"user_id" => uid} = _params) do
- with followed <- conn.assigns[:user],
- %User{} = follower <- User.get_cached_by_id(uid),
- {:ok, follower} <- CommonAPI.reject_follow_request(follower, followed) do
- conn
- |> put_view(UserView)
- |> render("show.json", %{user: follower, for: followed})
- else
- e -> bad_request_reply(conn, "Can't deny user: #{inspect(e)}")
- end
- end
-
- def friends_ids(%{assigns: %{user: user}} = conn, _params) do
- with {:ok, friends} <- User.get_friends(user) do
- ids =
- friends
- |> Enum.map(fn x -> x.id end)
- |> Jason.encode!()
-
- json(conn, ids)
- else
- _e -> bad_request_reply(conn, "Can't get friends")
- end
- end
-
- def empty_array(conn, _params) do
- json(conn, Jason.encode!([]))
- end
-
- def raw_empty_array(conn, _params) do
- json(conn, [])
- end
-
- defp build_info_cng(user, params) do
- info_params =
- [
- "no_rich_text",
- "locked",
- "hide_followers",
- "hide_follows",
- "hide_favorites",
- "show_role",
- "skip_thread_containment"
- ]
- |> Enum.reduce(%{}, fn key, res ->
- if value = params[key] do
- Map.put(res, key, value == "true")
- else
- res
- end
- end)
-
- info_params =
- if value = params["default_scope"] do
- Map.put(info_params, "default_scope", value)
- else
- info_params
- end
-
- User.Info.profile_update(user.info, info_params)
- end
-
- defp parse_profile_bio(user, params) do
- if bio = params["description"] do
- emojis_text = (params["description"] || "") <> " " <> (params["name"] || "")
-
- emojis =
- ((user.info.emoji || []) ++ Formatter.get_emoji_map(emojis_text))
- |> Enum.dedup()
-
- user_info =
- user.info
- |> Map.put(
- "emoji",
- emojis
- )
-
- params
- |> Map.put("bio", User.parse_bio(bio, user))
- |> Map.put("info", user_info)
- else
- params
- end
- end
-
- def update_profile(%{assigns: %{user: user}} = conn, params) do
- params = parse_profile_bio(user, params)
- info_cng = build_info_cng(user, params)
-
- with changeset <- User.update_changeset(user, params),
- changeset <- Ecto.Changeset.put_embed(changeset, :info, info_cng),
- {:ok, user} <- User.update_and_set_cache(changeset) do
- CommonAPI.update(user)
-
- conn
- |> put_view(UserView)
- |> render("user.json", %{user: user, for: user})
- else
- error ->
- Logger.debug("Can't update user: #{inspect(error)}")
- bad_request_reply(conn, "Can't update user")
- end
- end
-
- def search(%{assigns: %{user: user}} = conn, %{"q" => _query} = params) do
- activities = TwitterAPI.search(user, params)
-
+ def errors(conn, {:param_cast, _}) do
conn
- |> put_view(ActivityView)
- |> render("index.json", %{activities: activities, for: user})
+ |> put_status(400)
+ |> json("Invalid parameters")
end
- def search_user(%{assigns: %{user: user}} = conn, %{"query" => query}) do
- users = User.search(query, resolve: true, for_user: user)
-
+ def errors(conn, _) do
conn
- |> put_view(UserView)
- |> render("index.json", %{users: users, for: user})
- end
-
- defp bad_request_reply(conn, error_message) do
- json = error_json(conn, error_message)
- json_reply(conn, 400, json)
+ |> put_status(500)
+ |> json("Something went wrong")
end
defp json_reply(conn, status, json) do
@@ -777,36 +60,27 @@ defmodule Pleroma.Web.TwitterAPI.Controller do
|> send_resp(status, json)
end
- defp forbidden_json_reply(conn, error_message) do
- json = error_json(conn, error_message)
- json_reply(conn, 403, json)
- end
+ def notifications_read(%{assigns: %{user: user}} = conn, %{"latest_id" => latest_id} = params) do
+ Notification.set_read_up_to(user, latest_id)
- def only_if_public_instance(%{assigns: %{user: %User{}}} = conn, _), do: conn
+ notifications = Notification.for_user(user, params)
- def only_if_public_instance(conn, _) do
- if Pleroma.Config.get([:instance, :public]) do
- conn
- else
- conn
- |> forbidden_json_reply("Invalid credentials.")
- |> halt()
- end
+ conn
+ # XXX: This is a hack because pleroma-fe still uses that API.
+ |> put_view(Pleroma.Web.MastodonAPI.NotificationView)
+ |> render("index.json", %{notifications: notifications, for: user})
end
- defp error_json(conn, error_message) do
- %{"error" => error_message, "request" => conn.request_path} |> Jason.encode!()
+ def notifications_read(%{assigns: %{user: _user}} = conn, _) do
+ bad_request_reply(conn, "You need to specify latest_id")
end
- def errors(conn, {:param_cast, _}) do
- conn
- |> put_status(400)
- |> json("Invalid parameters")
+ defp bad_request_reply(conn, error_message) do
+ json = error_json(conn, error_message)
+ json_reply(conn, 400, json)
end
- def errors(conn, _) do
- conn
- |> put_status(500)
- |> json("Something went wrong")
+ defp error_json(conn, error_message) do
+ %{"error" => error_message, "request" => conn.request_path} |> Jason.encode!()
end
end
diff --git a/lib/pleroma/web/twitter_api/views/activity_view.ex b/lib/pleroma/web/twitter_api/views/activity_view.ex
deleted file mode 100644
index abae63877..000000000
--- a/lib/pleroma/web/twitter_api/views/activity_view.ex
+++ /dev/null
@@ -1,366 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Web.TwitterAPI.ActivityView do
- use Pleroma.Web, :view
- alias Pleroma.Activity
- alias Pleroma.Formatter
- alias Pleroma.HTML
- alias Pleroma.Object
- alias Pleroma.Repo
- alias Pleroma.User
- alias Pleroma.Web.CommonAPI
- alias Pleroma.Web.CommonAPI.Utils
- alias Pleroma.Web.MastodonAPI.StatusView
- alias Pleroma.Web.TwitterAPI.ActivityView
- alias Pleroma.Web.TwitterAPI.Representers.ObjectRepresenter
- alias Pleroma.Web.TwitterAPI.UserView
-
- import Ecto.Query
- require Logger
- require Pleroma.Constants
-
- defp query_context_ids([]), do: []
-
- defp query_context_ids(contexts) do
- query = from(o in Object, where: fragment("(?)->>'id' = ANY(?)", o.data, ^contexts))
-
- Repo.all(query)
- end
-
- defp query_users([]), do: []
-
- defp query_users(user_ids) do
- query = from(user in User, where: user.ap_id in ^user_ids)
-
- Repo.all(query)
- end
-
- defp collect_context_ids(activities) do
- _contexts =
- activities
- |> Enum.reject(& &1.data["context_id"])
- |> Enum.map(fn %{data: data} ->
- data["context"]
- end)
- |> Enum.filter(& &1)
- |> query_context_ids()
- |> Enum.reduce(%{}, fn %{data: %{"id" => ap_id}, id: id}, acc ->
- Map.put(acc, ap_id, id)
- end)
- end
-
- defp collect_users(activities) do
- activities
- |> Enum.map(fn activity ->
- case activity.data do
- data = %{"type" => "Follow"} ->
- [data["actor"], data["object"]]
-
- data ->
- [data["actor"]]
- end ++ activity.recipients
- end)
- |> List.flatten()
- |> Enum.uniq()
- |> query_users()
- |> Enum.reduce(%{}, fn user, acc ->
- Map.put(acc, user.ap_id, user)
- end)
- end
-
- defp get_context_id(%{data: %{"context_id" => context_id}}, _) when not is_nil(context_id),
- do: context_id
-
- defp get_context_id(%{data: %{"context" => nil}}, _), do: nil
-
- defp get_context_id(%{data: %{"context" => context}}, options) do
- cond do
- id = options[:context_ids][context] -> id
- true -> Utils.context_to_conversation_id(context)
- end
- end
-
- defp get_context_id(_, _), do: nil
-
- defp get_user(ap_id, opts) do
- cond do
- user = opts[:users][ap_id] ->
- user
-
- String.ends_with?(ap_id, "/followers") ->
- nil
-
- ap_id == Pleroma.Constants.as_public() ->
- nil
-
- user = User.get_cached_by_ap_id(ap_id) ->
- user
-
- user = User.get_by_guessed_nickname(ap_id) ->
- user
-
- true ->
- User.error_user(ap_id)
- end
- end
-
- def render("index.json", opts) do
- context_ids = collect_context_ids(opts.activities)
- users = collect_users(opts.activities)
-
- opts =
- opts
- |> Map.put(:context_ids, context_ids)
- |> Map.put(:users, users)
-
- safe_render_many(
- opts.activities,
- ActivityView,
- "activity.json",
- opts
- )
- end
-
- def render("activity.json", %{activity: %{data: %{"type" => "Delete"}} = activity} = opts) do
- user = get_user(activity.data["actor"], opts)
- created_at = activity.data["published"] |> Utils.date_to_asctime()
-
- %{
- "id" => activity.id,
- "uri" => activity.data["object"],
- "user" => UserView.render("show.json", %{user: user, for: opts[:for]}),
- "attentions" => [],
- "statusnet_html" => "deleted notice {{tag",
- "text" => "deleted notice {{tag",
- "is_local" => activity.local,
- "is_post_verb" => false,
- "created_at" => created_at,
- "in_reply_to_status_id" => nil,
- "external_url" => activity.data["id"],
- "activity_type" => "delete"
- }
- end
-
- def render("activity.json", %{activity: %{data: %{"type" => "Follow"}} = activity} = opts) do
- user = get_user(activity.data["actor"], opts)
- created_at = activity.data["published"] || DateTime.to_iso8601(activity.inserted_at)
- created_at = created_at |> Utils.date_to_asctime()
-
- followed = get_user(activity.data["object"], opts)
- text = "#{user.nickname} started following #{followed.nickname}"
-
- %{
- "id" => activity.id,
- "user" => UserView.render("show.json", %{user: user, for: opts[:for]}),
- "attentions" => [],
- "statusnet_html" => text,
- "text" => text,
- "is_local" => activity.local,
- "is_post_verb" => false,
- "created_at" => created_at,
- "in_reply_to_status_id" => nil,
- "external_url" => activity.data["id"],
- "activity_type" => "follow"
- }
- end
-
- def render("activity.json", %{activity: %{data: %{"type" => "Announce"}} = activity} = opts) do
- user = get_user(activity.data["actor"], opts)
- created_at = activity.data["published"] |> Utils.date_to_asctime()
- announced_activity = Activity.get_create_by_object_ap_id(activity.data["object"])
-
- text = "#{user.nickname} repeated a status."
-
- retweeted_status = render("activity.json", Map.merge(opts, %{activity: announced_activity}))
-
- %{
- "id" => activity.id,
- "user" => UserView.render("show.json", %{user: user, for: opts[:for]}),
- "statusnet_html" => text,
- "text" => text,
- "is_local" => activity.local,
- "is_post_verb" => false,
- "uri" => "tag:#{activity.data["id"]}:objectType=note",
- "created_at" => created_at,
- "retweeted_status" => retweeted_status,
- "statusnet_conversation_id" => get_context_id(announced_activity, opts),
- "external_url" => activity.data["id"],
- "activity_type" => "repeat"
- }
- end
-
- def render("activity.json", %{activity: %{data: %{"type" => "Like"}} = activity} = opts) do
- user = get_user(activity.data["actor"], opts)
- liked_activity = Activity.get_create_by_object_ap_id(activity.data["object"])
- liked_activity_id = if liked_activity, do: liked_activity.id, else: nil
-
- created_at =
- activity.data["published"]
- |> Utils.date_to_asctime()
-
- text = "#{user.nickname} favorited a status."
-
- favorited_status =
- if liked_activity,
- do: render("activity.json", Map.merge(opts, %{activity: liked_activity})),
- else: nil
-
- %{
- "id" => activity.id,
- "user" => UserView.render("show.json", %{user: user, for: opts[:for]}),
- "statusnet_html" => text,
- "text" => text,
- "is_local" => activity.local,
- "is_post_verb" => false,
- "uri" => "tag:#{activity.data["id"]}:objectType=Favourite",
- "created_at" => created_at,
- "favorited_status" => favorited_status,
- "in_reply_to_status_id" => liked_activity_id,
- "external_url" => activity.data["id"],
- "activity_type" => "like"
- }
- end
-
- def render(
- "activity.json",
- %{activity: %{data: %{"type" => "Create", "object" => object_id}} = activity} = opts
- ) do
- user = get_user(activity.data["actor"], opts)
-
- object = Object.normalize(object_id)
-
- created_at = object.data["published"] |> Utils.date_to_asctime()
- like_count = object.data["like_count"] || 0
- announcement_count = object.data["announcement_count"] || 0
- favorited = opts[:for] && opts[:for].ap_id in (object.data["likes"] || [])
- repeated = opts[:for] && opts[:for].ap_id in (object.data["announcements"] || [])
- pinned = activity.id in user.info.pinned_activities
-
- attentions =
- []
- |> Utils.maybe_notify_to_recipients(activity)
- |> Utils.maybe_notify_mentioned_recipients(activity)
- |> Enum.map(fn ap_id -> get_user(ap_id, opts) end)
- |> Enum.filter(& &1)
- |> Enum.map(fn user -> UserView.render("show.json", %{user: user, for: opts[:for]}) end)
-
- conversation_id = get_context_id(activity, opts)
-
- tags = object.data["tag"] || []
- possibly_sensitive = object.data["sensitive"] || Enum.member?(tags, "nsfw")
-
- tags = if possibly_sensitive, do: Enum.uniq(["nsfw" | tags]), else: tags
-
- {summary, content} = render_content(object.data)
-
- html =
- content
- |> HTML.get_cached_scrubbed_html_for_activity(
- User.html_filter_policy(opts[:for]),
- activity,
- "twitterapi:content"
- )
- |> Formatter.emojify(object.data["emoji"])
-
- text =
- if content do
- content
- |> String.replace(~r/<br\s?\/?>/, "\n")
- |> HTML.get_cached_stripped_html_for_activity(activity, "twitterapi:content")
- else
- ""
- end
-
- reply_parent = Activity.get_in_reply_to_activity(activity)
-
- reply_user = reply_parent && User.get_cached_by_ap_id(reply_parent.actor)
-
- summary = HTML.strip_tags(summary)
-
- card =
- StatusView.render(
- "card.json",
- Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
- )
-
- thread_muted? =
- case activity.thread_muted? do
- thread_muted? when is_boolean(thread_muted?) -> thread_muted?
- nil -> CommonAPI.thread_muted?(user, activity)
- end
-
- %{
- "id" => activity.id,
- "uri" => object.data["id"],
- "user" => UserView.render("show.json", %{user: user, for: opts[:for]}),
- "statusnet_html" => html,
- "text" => text,
- "is_local" => activity.local,
- "is_post_verb" => true,
- "created_at" => created_at,
- "in_reply_to_status_id" => reply_parent && reply_parent.id,
- "in_reply_to_screen_name" => reply_user && reply_user.nickname,
- "in_reply_to_profileurl" => User.profile_url(reply_user),
- "in_reply_to_ostatus_uri" => reply_user && reply_user.ap_id,
- "in_reply_to_user_id" => reply_user && reply_user.id,
- "statusnet_conversation_id" => conversation_id,
- "attachments" => (object.data["attachment"] || []) |> ObjectRepresenter.enum_to_list(opts),
- "attentions" => attentions,
- "fave_num" => like_count,
- "repeat_num" => announcement_count,
- "favorited" => !!favorited,
- "repeated" => !!repeated,
- "pinned" => pinned,
- "external_url" => object.data["external_url"] || object.data["id"],
- "tags" => tags,
- "activity_type" => "post",
- "possibly_sensitive" => possibly_sensitive,
- "visibility" => Pleroma.Web.ActivityPub.Visibility.get_visibility(object),
- "summary" => summary,
- "summary_html" => summary |> Formatter.emojify(object.data["emoji"]),
- "card" => card,
- "muted" => thread_muted? || User.mutes?(opts[:for], user)
- }
- end
-
- def render("activity.json", %{activity: unhandled_activity}) do
- Logger.warn("#{__MODULE__} unhandled activity: #{inspect(unhandled_activity)}")
- nil
- end
-
- def render_content(%{"type" => "Note"} = object) do
- summary = object["summary"]
-
- content =
- if !!summary and summary != "" do
- "<p>#{summary}</p>#{object["content"]}"
- else
- object["content"]
- end
-
- {summary, content}
- end
-
- def render_content(%{"type" => object_type} = object)
- when object_type in ["Article", "Page", "Video"] do
- summary = object["name"] || object["summary"]
-
- content =
- if !!summary and summary != "" and is_bitstring(object["url"]) do
- "<p><a href=\"#{object["url"]}\">#{summary}</a></p>#{object["content"]}"
- else
- object["content"]
- end
-
- {summary, content}
- end
-
- def render_content(object) do
- summary = object["summary"] || "Unhandled activity type: #{object["type"]}"
- content = "<p>#{summary}</p>#{object["content"]}"
-
- {summary, content}
- end
-end
diff --git a/lib/pleroma/web/twitter_api/views/notification_view.ex b/lib/pleroma/web/twitter_api/views/notification_view.ex
deleted file mode 100644
index 085cd5aa3..000000000
--- a/lib/pleroma/web/twitter_api/views/notification_view.ex
+++ /dev/null
@@ -1,71 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Web.TwitterAPI.NotificationView do
- use Pleroma.Web, :view
- alias Pleroma.Notification
- alias Pleroma.User
- alias Pleroma.Web.CommonAPI.Utils
- alias Pleroma.Web.TwitterAPI.ActivityView
- alias Pleroma.Web.TwitterAPI.UserView
-
- require Pleroma.Constants
-
- defp get_user(ap_id, opts) do
- cond do
- user = opts[:users][ap_id] ->
- user
-
- String.ends_with?(ap_id, "/followers") ->
- nil
-
- ap_id == Pleroma.Constants.as_public() ->
- nil
-
- true ->
- User.get_cached_by_ap_id(ap_id)
- end
- end
-
- def render("notification.json", %{notifications: notifications, for: user}) do
- render_many(
- notifications,
- Pleroma.Web.TwitterAPI.NotificationView,
- "notification.json",
- for: user
- )
- end
-
- def render(
- "notification.json",
- %{
- notification: %Notification{
- id: id,
- seen: seen,
- activity: activity,
- inserted_at: created_at
- },
- for: user
- } = opts
- ) do
- ntype =
- case activity.data["type"] do
- "Create" -> "mention"
- "Like" -> "like"
- "Announce" -> "repeat"
- "Follow" -> "follow"
- end
-
- from = get_user(activity.data["actor"], opts)
-
- %{
- "id" => id,
- "ntype" => ntype,
- "notice" => ActivityView.render("activity.json", %{activity: activity, for: user}),
- "from_profile" => UserView.render("show.json", %{user: from, for: user}),
- "is_seen" => if(seen, do: 1, else: 0),
- "created_at" => created_at |> Utils.format_naive_asctime()
- }
- end
-end
diff --git a/lib/pleroma/web/twitter_api/views/user_view.ex b/lib/pleroma/web/twitter_api/views/user_view.ex
deleted file mode 100644
index 8a7d2fc72..000000000
--- a/lib/pleroma/web/twitter_api/views/user_view.ex
+++ /dev/null
@@ -1,191 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Web.TwitterAPI.UserView do
- use Pleroma.Web, :view
- alias Pleroma.Formatter
- alias Pleroma.HTML
- alias Pleroma.User
- alias Pleroma.Web.CommonAPI.Utils
- alias Pleroma.Web.MediaProxy
-
- def render("show.json", %{user: user = %User{}} = assigns) do
- render_one(user, Pleroma.Web.TwitterAPI.UserView, "user.json", assigns)
- end
-
- def render("index.json", %{users: users, for: user}) do
- users
- |> render_many(Pleroma.Web.TwitterAPI.UserView, "user.json", for: user)
- |> Enum.filter(&Enum.any?/1)
- end
-
- def render("user.json", %{user: user = %User{}} = assigns) do
- if User.visible_for?(user, assigns[:for]),
- do: do_render("user.json", assigns),
- else: %{}
- end
-
- def render("short.json", %{
- user: %User{
- nickname: nickname,
- id: id,
- ap_id: ap_id,
- name: name
- }
- }) do
- %{
- "fullname" => name,
- "id" => id,
- "ostatus_uri" => ap_id,
- "profile_url" => ap_id,
- "screen_name" => nickname
- }
- end
-
- defp do_render("user.json", %{user: user = %User{}} = assigns) do
- for_user = assigns[:for]
- image = User.avatar_url(user) |> MediaProxy.url()
-
- {following, follows_you, statusnet_blocking} =
- if for_user do
- {
- User.following?(for_user, user),
- User.following?(user, for_user),
- User.blocks?(for_user, user)
- }
- else
- {false, false, false}
- end
-
- user_info = User.get_cached_user_info(user)
-
- emoji =
- (user.info.source_data["tag"] || [])
- |> Enum.filter(fn %{"type" => t} -> t == "Emoji" end)
- |> Enum.map(fn %{"icon" => %{"url" => url}, "name" => name} ->
- {String.trim(name, ":"), url}
- end)
-
- emoji = Enum.dedup(emoji ++ user.info.emoji)
-
- description_html =
- (user.bio || "")
- |> HTML.filter_tags(User.html_filter_policy(for_user))
- |> Formatter.emojify(emoji)
-
- fields =
- user.info
- |> User.Info.fields()
- |> Enum.map(fn %{"name" => name, "value" => value} ->
- %{
- "name" => Pleroma.HTML.strip_tags(name),
- "value" => Pleroma.HTML.filter_tags(value, Pleroma.HTML.Scrubber.LinksOnly)
- }
- end)
-
- data =
- %{
- "created_at" => user.inserted_at |> Utils.format_naive_asctime(),
- "description" => HTML.strip_tags((user.bio || "") |> String.replace("<br>", "\n")),
- "description_html" => description_html,
- "favourites_count" => 0,
- "followers_count" => user_info[:follower_count],
- "following" => following,
- "follows_you" => follows_you,
- "statusnet_blocking" => statusnet_blocking,
- "friends_count" => user_info[:following_count],
- "id" => user.id,
- "name" => user.name || user.nickname,
- "name_html" =>
- if(user.name,
- do: HTML.strip_tags(user.name) |> Formatter.emojify(emoji),
- else: user.nickname
- ),
- "profile_image_url" => image,
- "profile_image_url_https" => image,
- "profile_image_url_profile_size" => image,
- "profile_image_url_original" => image,
- "screen_name" => user.nickname,
- "statuses_count" => user_info[:note_count],
- "statusnet_profile_url" => user.ap_id,
- "cover_photo" => User.banner_url(user) |> MediaProxy.url(),
- "background_image" => image_url(user.info.background) |> MediaProxy.url(),
- "is_local" => user.local,
- "locked" => user.info.locked,
- "hide_followers" => user.info.hide_followers,
- "hide_follows" => user.info.hide_follows,
- "fields" => fields,
-
- # Pleroma extension
- "pleroma" =>
- %{
- "confirmation_pending" => user_info.confirmation_pending,
- "tags" => user.tags,
- "skip_thread_containment" => user.info.skip_thread_containment
- }
- |> maybe_with_activation_status(user, for_user)
- |> with_notification_settings(user, for_user)
- }
- |> maybe_with_user_settings(user, for_user)
- |> maybe_with_role(user, for_user)
-
- if assigns[:token] do
- Map.put(data, "token", token_string(assigns[:token]))
- else
- data
- end
- end
-
- defp with_notification_settings(data, %User{id: user_id} = user, %User{id: user_id}) do
- Map.put(data, "notification_settings", user.info.notification_settings)
- end
-
- defp with_notification_settings(data, _, _), do: data
-
- defp maybe_with_activation_status(data, user, %User{info: %{is_admin: true}}) do
- Map.put(data, "deactivated", user.info.deactivated)
- end
-
- defp maybe_with_activation_status(data, _, _), do: data
-
- defp maybe_with_role(data, %User{id: id} = user, %User{id: id}) do
- Map.merge(data, %{
- "role" => role(user),
- "show_role" => user.info.show_role,
- "rights" => %{
- "delete_others_notice" => !!user.info.is_moderator,
- "admin" => !!user.info.is_admin
- }
- })
- end
-
- defp maybe_with_role(data, %User{info: %{show_role: true}} = user, _user) do
- Map.merge(data, %{
- "role" => role(user),
- "rights" => %{
- "delete_others_notice" => !!user.info.is_moderator,
- "admin" => !!user.info.is_admin
- }
- })
- end
-
- defp maybe_with_role(data, _, _), do: data
-
- defp maybe_with_user_settings(data, %User{info: info, id: id} = _user, %User{id: id}) do
- data
- |> Kernel.put_in(["default_scope"], info.default_scope)
- |> Kernel.put_in(["no_rich_text"], info.no_rich_text)
- end
-
- defp maybe_with_user_settings(data, _, _), do: data
- defp role(%User{info: %{:is_admin => true}}), do: "admin"
- defp role(%User{info: %{:is_moderator => true}}), do: "moderator"
- defp role(_), do: "member"
-
- defp image_url(%{"url" => [%{"href" => href} | _]}), do: href
- defp image_url(_), do: nil
-
- defp token_string(%Pleroma.Web.OAuth.Token{token: token_str}), do: token_str
- defp token_string(token), do: token
-end
diff --git a/lib/pleroma/web/views/streamer_view.ex b/lib/pleroma/web/views/streamer_view.ex
new file mode 100644
index 000000000..b13030fa0
--- /dev/null
+++ b/lib/pleroma/web/views/streamer_view.ex
@@ -0,0 +1,66 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Web.StreamerView do
+ use Pleroma.Web, :view
+
+ alias Pleroma.Activity
+ alias Pleroma.Conversation.Participation
+ alias Pleroma.Notification
+ alias Pleroma.User
+ alias Pleroma.Web.MastodonAPI.NotificationView
+
+ def render("update.json", %Activity{} = activity, %User{} = user) do
+ %{
+ event: "update",
+ payload:
+ Pleroma.Web.MastodonAPI.StatusView.render(
+ "status.json",
+ activity: activity,
+ for: user
+ )
+ |> Jason.encode!()
+ }
+ |> Jason.encode!()
+ end
+
+ def render("notification.json", %User{} = user, %Notification{} = notify) do
+ %{
+ event: "notification",
+ payload:
+ NotificationView.render(
+ "show.json",
+ %{notification: notify, for: user}
+ )
+ |> Jason.encode!()
+ }
+ |> Jason.encode!()
+ end
+
+ def render("update.json", %Activity{} = activity) do
+ %{
+ event: "update",
+ payload:
+ Pleroma.Web.MastodonAPI.StatusView.render(
+ "status.json",
+ activity: activity
+ )
+ |> Jason.encode!()
+ }
+ |> Jason.encode!()
+ end
+
+ def render("conversation.json", %Participation{} = participation) do
+ %{
+ event: "conversation",
+ payload:
+ Pleroma.Web.MastodonAPI.ConversationView.render("participation.json", %{
+ participation: participation,
+ for: participation.user
+ })
+ |> Jason.encode!()
+ }
+ |> Jason.encode!()
+ end
+end
diff --git a/lib/pleroma/web/web.ex b/lib/pleroma/web/web.ex
index bfb6c7287..687346554 100644
--- a/lib/pleroma/web/web.ex
+++ b/lib/pleroma/web/web.ex
@@ -66,23 +66,9 @@ defmodule Pleroma.Web do
end
@doc """
- Same as `render_many/4` but wrapped in rescue block and parallelized (unless disabled by passing false as a fifth argument).
+ Same as `render_many/4` but wrapped in rescue block.
"""
- def safe_render_many(collection, view, template, assigns \\ %{}, parallel \\ true)
-
- def safe_render_many(collection, view, template, assigns, true) do
- Enum.map(collection, fn resource ->
- Task.async(fn ->
- as = Map.get(assigns, :as) || view.__resource__
- assigns = Map.put(assigns, as, resource)
- safe_render(view, template, assigns)
- end)
- end)
- |> Enum.map(&Task.await(&1, :infinity))
- |> Enum.filter(& &1)
- end
-
- def safe_render_many(collection, view, template, assigns, false) do
+ def safe_render_many(collection, view, template, assigns \\ %{}) do
Enum.map(collection, fn resource ->
as = Map.get(assigns, :as) || view.__resource__
assigns = Map.put(assigns, as, resource)
diff --git a/lib/pleroma/workers/activity_expiration_worker.ex b/lib/pleroma/workers/activity_expiration_worker.ex
new file mode 100644
index 000000000..4e3e4195f
--- /dev/null
+++ b/lib/pleroma/workers/activity_expiration_worker.ex
@@ -0,0 +1,18 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.ActivityExpirationWorker do
+ use Pleroma.Workers.WorkerHelper, queue: "activity_expiration"
+
+ @impl Oban.Worker
+ def perform(
+ %{
+ "op" => "activity_expiration",
+ "activity_expiration_id" => activity_expiration_id
+ },
+ _job
+ ) do
+ Pleroma.Daemons.ActivityExpirationDaemon.perform(:execute, activity_expiration_id)
+ end
+end
diff --git a/lib/pleroma/workers/background_worker.ex b/lib/pleroma/workers/background_worker.ex
new file mode 100644
index 000000000..082f20ab7
--- /dev/null
+++ b/lib/pleroma/workers/background_worker.ex
@@ -0,0 +1,69 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.BackgroundWorker do
+ alias Pleroma.Activity
+ alias Pleroma.User
+ alias Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy
+ alias Pleroma.Web.OAuth.Token.CleanWorker
+
+ use Pleroma.Workers.WorkerHelper, queue: "background"
+
+ @impl Oban.Worker
+ def perform(%{"op" => "fetch_initial_posts", "user_id" => user_id}, _job) do
+ user = User.get_cached_by_id(user_id)
+ User.perform(:fetch_initial_posts, user)
+ end
+
+ def perform(%{"op" => "deactivate_user", "user_id" => user_id, "status" => status}, _job) do
+ user = User.get_cached_by_id(user_id)
+ User.perform(:deactivate_async, user, status)
+ end
+
+ def perform(%{"op" => "delete_user", "user_id" => user_id}, _job) do
+ user = User.get_cached_by_id(user_id)
+ User.perform(:delete, user)
+ end
+
+ def perform(
+ %{
+ "op" => "blocks_import",
+ "blocker_id" => blocker_id,
+ "blocked_identifiers" => blocked_identifiers
+ },
+ _job
+ ) do
+ blocker = User.get_cached_by_id(blocker_id)
+ User.perform(:blocks_import, blocker, blocked_identifiers)
+ end
+
+ def perform(
+ %{
+ "op" => "follow_import",
+ "follower_id" => follower_id,
+ "followed_identifiers" => followed_identifiers
+ },
+ _job
+ ) do
+ follower = User.get_cached_by_id(follower_id)
+ User.perform(:follow_import, follower, followed_identifiers)
+ end
+
+ def perform(%{"op" => "clean_expired_tokens"}, _job) do
+ CleanWorker.perform(:clean)
+ end
+
+ def perform(%{"op" => "media_proxy_preload", "message" => message}, _job) do
+ MediaProxyWarmingPolicy.perform(:preload, message)
+ end
+
+ def perform(%{"op" => "media_proxy_prefetch", "url" => url}, _job) do
+ MediaProxyWarmingPolicy.perform(:prefetch, url)
+ end
+
+ def perform(%{"op" => "fetch_data_for_activity", "activity_id" => activity_id}, _job) do
+ activity = Activity.get_by_id(activity_id)
+ Pleroma.Web.RichMedia.Helpers.perform(:fetch, activity)
+ end
+end
diff --git a/lib/pleroma/workers/digest_emails_worker.ex b/lib/pleroma/workers/digest_emails_worker.ex
new file mode 100644
index 000000000..3e5a836d0
--- /dev/null
+++ b/lib/pleroma/workers/digest_emails_worker.ex
@@ -0,0 +1,16 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.DigestEmailsWorker do
+ alias Pleroma.User
+
+ use Pleroma.Workers.WorkerHelper, queue: "digest_emails"
+
+ @impl Oban.Worker
+ def perform(%{"op" => "digest_email", "user_id" => user_id}, _job) do
+ user_id
+ |> User.get_cached_by_id()
+ |> Pleroma.Daemons.DigestEmailDaemon.perform()
+ end
+end
diff --git a/lib/pleroma/workers/mailer_worker.ex b/lib/pleroma/workers/mailer_worker.ex
new file mode 100644
index 000000000..1b7a0eb3e
--- /dev/null
+++ b/lib/pleroma/workers/mailer_worker.ex
@@ -0,0 +1,15 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.MailerWorker do
+ use Pleroma.Workers.WorkerHelper, queue: "mailer"
+
+ @impl Oban.Worker
+ def perform(%{"op" => "email", "encoded_email" => encoded_email, "config" => config}, _job) do
+ encoded_email
+ |> Base.decode64!()
+ |> :erlang.binary_to_term()
+ |> Pleroma.Emails.Mailer.deliver(config)
+ end
+end
diff --git a/lib/pleroma/workers/publisher_worker.ex b/lib/pleroma/workers/publisher_worker.ex
new file mode 100644
index 000000000..455f7fc7e
--- /dev/null
+++ b/lib/pleroma/workers/publisher_worker.ex
@@ -0,0 +1,25 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.PublisherWorker do
+ alias Pleroma.Activity
+ alias Pleroma.Web.Federator
+
+ use Pleroma.Workers.WorkerHelper, queue: "federator_outgoing"
+
+ def backoff(attempt) when is_integer(attempt) do
+ Pleroma.Workers.WorkerHelper.sidekiq_backoff(attempt, 5)
+ end
+
+ @impl Oban.Worker
+ def perform(%{"op" => "publish", "activity_id" => activity_id}, _job) do
+ activity = Activity.get_by_id(activity_id)
+ Federator.perform(:publish, activity)
+ end
+
+ def perform(%{"op" => "publish_one", "module" => module_name, "params" => params}, _job) do
+ params = Map.new(params, fn {k, v} -> {String.to_atom(k), v} end)
+ Federator.perform(:publish_one, String.to_atom(module_name), params)
+ end
+end
diff --git a/lib/pleroma/workers/receiver_worker.ex b/lib/pleroma/workers/receiver_worker.ex
new file mode 100644
index 000000000..83d528a66
--- /dev/null
+++ b/lib/pleroma/workers/receiver_worker.ex
@@ -0,0 +1,18 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.ReceiverWorker do
+ alias Pleroma.Web.Federator
+
+ use Pleroma.Workers.WorkerHelper, queue: "federator_incoming"
+
+ @impl Oban.Worker
+ def perform(%{"op" => "incoming_doc", "body" => doc}, _job) do
+ Federator.perform(:incoming_doc, doc)
+ end
+
+ def perform(%{"op" => "incoming_ap_doc", "params" => params}, _job) do
+ Federator.perform(:incoming_ap_doc, params)
+ end
+end
diff --git a/lib/pleroma/workers/scheduled_activity_worker.ex b/lib/pleroma/workers/scheduled_activity_worker.ex
new file mode 100644
index 000000000..ca7d53af1
--- /dev/null
+++ b/lib/pleroma/workers/scheduled_activity_worker.ex
@@ -0,0 +1,12 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.ScheduledActivityWorker do
+ use Pleroma.Workers.WorkerHelper, queue: "scheduled_activities"
+
+ @impl Oban.Worker
+ def perform(%{"op" => "execute", "activity_id" => activity_id}, _job) do
+ Pleroma.Daemons.ScheduledActivityDaemon.perform(:execute, activity_id)
+ end
+end
diff --git a/lib/pleroma/workers/subscriber_worker.ex b/lib/pleroma/workers/subscriber_worker.ex
new file mode 100644
index 000000000..fc490e300
--- /dev/null
+++ b/lib/pleroma/workers/subscriber_worker.ex
@@ -0,0 +1,26 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.SubscriberWorker do
+ alias Pleroma.Repo
+ alias Pleroma.Web.Federator
+ alias Pleroma.Web.Websub
+
+ use Pleroma.Workers.WorkerHelper, queue: "federator_outgoing"
+
+ @impl Oban.Worker
+ def perform(%{"op" => "refresh_subscriptions"}, _job) do
+ Federator.perform(:refresh_subscriptions)
+ end
+
+ def perform(%{"op" => "request_subscription", "websub_id" => websub_id}, _job) do
+ websub = Repo.get(Websub.WebsubClientSubscription, websub_id)
+ Federator.perform(:request_subscription, websub)
+ end
+
+ def perform(%{"op" => "verify_websub", "websub_id" => websub_id}, _job) do
+ websub = Repo.get(Websub.WebsubServerSubscription, websub_id)
+ Federator.perform(:verify_websub, websub)
+ end
+end
diff --git a/lib/pleroma/workers/transmogrifier_worker.ex b/lib/pleroma/workers/transmogrifier_worker.ex
new file mode 100644
index 000000000..b581a2f86
--- /dev/null
+++ b/lib/pleroma/workers/transmogrifier_worker.ex
@@ -0,0 +1,15 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.TransmogrifierWorker do
+ alias Pleroma.User
+
+ use Pleroma.Workers.WorkerHelper, queue: "transmogrifier"
+
+ @impl Oban.Worker
+ def perform(%{"op" => "user_upgrade", "user_id" => user_id}, _job) do
+ user = User.get_cached_by_id(user_id)
+ Pleroma.Web.ActivityPub.Transmogrifier.perform(:user_upgrade, user)
+ end
+end
diff --git a/lib/pleroma/workers/web_pusher_worker.ex b/lib/pleroma/workers/web_pusher_worker.ex
new file mode 100644
index 000000000..bea2baffb
--- /dev/null
+++ b/lib/pleroma/workers/web_pusher_worker.ex
@@ -0,0 +1,16 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.WebPusherWorker do
+ alias Pleroma.Notification
+ alias Pleroma.Repo
+
+ use Pleroma.Workers.WorkerHelper, queue: "web_push"
+
+ @impl Oban.Worker
+ def perform(%{"op" => "web_push", "notification_id" => notification_id}, _job) do
+ notification = Repo.get(Notification, notification_id)
+ Pleroma.Web.Push.Impl.perform(notification)
+ end
+end
diff --git a/lib/pleroma/workers/worker_helper.ex b/lib/pleroma/workers/worker_helper.ex
new file mode 100644
index 000000000..358efa14a
--- /dev/null
+++ b/lib/pleroma/workers/worker_helper.ex
@@ -0,0 +1,46 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.WorkerHelper do
+ alias Pleroma.Config
+ alias Pleroma.Workers.WorkerHelper
+
+ def worker_args(queue) do
+ case Config.get([:workers, :retries, queue]) do
+ nil -> []
+ max_attempts -> [max_attempts: max_attempts]
+ end
+ end
+
+ def sidekiq_backoff(attempt, pow \\ 4, base_backoff \\ 15) do
+ backoff =
+ :math.pow(attempt, pow) +
+ base_backoff +
+ :rand.uniform(2 * base_backoff) * attempt
+
+ trunc(backoff)
+ end
+
+ defmacro __using__(opts) do
+ caller_module = __CALLER__.module
+ queue = Keyword.fetch!(opts, :queue)
+
+ quote do
+ # Note: `max_attempts` is intended to be overridden in `new/2` call
+ use Oban.Worker,
+ queue: unquote(queue),
+ max_attempts: 1
+
+ def enqueue(op, params, worker_args \\ []) do
+ params = Map.merge(%{"op" => op}, params)
+ queue_atom = String.to_atom(unquote(queue))
+ worker_args = worker_args ++ WorkerHelper.worker_args(queue_atom)
+
+ unquote(caller_module)
+ |> apply(:new, [params, worker_args])
+ |> Pleroma.Repo.insert()
+ end
+ end
+ end
+end