diff options
Diffstat (limited to 'lib')
50 files changed, 1182 insertions, 770 deletions
diff --git a/lib/pleroma/activity/ir/topics.ex b/lib/pleroma/activity/ir/topics.ex new file mode 100644 index 000000000..010897abc --- /dev/null +++ b/lib/pleroma/activity/ir/topics.ex @@ -0,0 +1,63 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Activity.Ir.Topics do + alias Pleroma.Object + alias Pleroma.Web.ActivityPub.Visibility + + def get_activity_topics(activity) do + activity + |> Object.normalize() + |> generate_topics(activity) + |> List.flatten() + end + + defp generate_topics(%{data: %{"type" => "Answer"}}, _) do + [] + end + + defp generate_topics(object, activity) do + ["user", "list"] ++ visibility_tags(object, activity) + end + + defp visibility_tags(object, activity) do + case Visibility.get_visibility(activity) do + "public" -> + if activity.local do + ["public", "public:local"] + else + ["public"] + end + |> item_creation_tags(object, activity) + + "direct" -> + ["direct"] + + _ -> + [] + end + end + + defp item_creation_tags(tags, %{data: %{"type" => "Create"}} = object, activity) do + tags ++ hashtags_to_topics(object) ++ attachment_topics(object, activity) + end + + defp item_creation_tags(tags, _, _) do + tags + end + + defp hashtags_to_topics(%{data: %{"tag" => tags}}) do + tags + |> Enum.filter(&is_bitstring(&1)) + |> Enum.map(fn tag -> "hashtag:" <> tag end) + end + + defp hashtags_to_topics(_), do: [] + + defp attachment_topics(%{data: %{"attachment" => []}}, _act), do: [] + + defp attachment_topics(_object, %{local: true}), do: ["public:media", "public:local:media"] + + defp attachment_topics(_object, _act), do: ["public:media"] +end diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex index 1d46925f8..3b37ce630 100644 --- a/lib/pleroma/application.ex +++ b/lib/pleroma/application.ex @@ -31,18 +31,19 @@ defmodule Pleroma.Application do children = [ Pleroma.Repo, + Pleroma.Scheduler, Pleroma.Config.TransferTask, Pleroma.Emoji, Pleroma.Captcha, Pleroma.FlakeId, - Pleroma.ScheduledActivityWorker, - Pleroma.ActivityExpirationWorker + Pleroma.Daemons.ScheduledActivityDaemon, + Pleroma.Daemons.ActivityExpirationDaemon ] ++ cachex_children() ++ hackney_pool_children() ++ [ - Pleroma.Web.Federator.RetryQueue, Pleroma.Stats, + {Oban, Pleroma.Config.get(Oban)}, %{ id: :web_push_init, start: {Task, :start_link, [&Pleroma.Web.Push.init/0]}, @@ -70,9 +71,7 @@ defmodule Pleroma.Application do # See http://elixir-lang.org/docs/stable/elixir/Supervisor.html # for other strategies and supported options opts = [strategy: :one_for_one, name: Pleroma.Supervisor] - result = Supervisor.start_link(children, opts) - :ok = after_supervisor_start() - result + Supervisor.start_link(children, opts) end defp setup_instrumenters do @@ -142,7 +141,7 @@ defmodule Pleroma.Application do defp streamer_child(:test), do: [] defp streamer_child(_) do - [Pleroma.Web.Streamer] + [Pleroma.Web.Streamer.supervisor()] end defp oauth_cleanup_child(true), @@ -164,17 +163,4 @@ defmodule Pleroma.Application do :hackney_pool.child_spec(pool, options) end end - - defp after_supervisor_start do - with digest_config <- Application.get_env(:pleroma, :email_notifications)[:digest], - true <- digest_config[:active] do - PleromaJobQueue.schedule( - digest_config[:schedule], - :digest_emails, - Pleroma.DigestEmailWorker - ) - end - - :ok - end end diff --git a/lib/pleroma/activity_expiration_worker.ex b/lib/pleroma/daemons/activity_expiration_daemon.ex index 0f9e715f8..cab7628c4 100644 --- a/lib/pleroma/activity_expiration_worker.ex +++ b/lib/pleroma/daemons/activity_expiration_daemon.ex @@ -2,13 +2,14 @@ # Copyright © 2019 Pleroma Authors <https://pleroma.social/> # SPDX-License-Identifier: AGPL-3.0-only -defmodule Pleroma.ActivityExpirationWorker do +defmodule Pleroma.Daemons.ActivityExpirationDaemon do alias Pleroma.Activity alias Pleroma.ActivityExpiration alias Pleroma.Config alias Pleroma.Repo alias Pleroma.User alias Pleroma.Web.CommonAPI + require Logger use GenServer import Ecto.Query @@ -49,7 +50,10 @@ defmodule Pleroma.ActivityExpirationWorker do def handle_info(:perform, state) do ActivityExpiration.due_expirations(@schedule_interval) |> Enum.each(fn expiration -> - PleromaJobQueue.enqueue(:activity_expiration, __MODULE__, [:execute, expiration.id]) + Pleroma.Workers.ActivityExpirationWorker.enqueue( + "activity_expiration", + %{"activity_expiration_id" => expiration.id} + ) end) schedule_next() diff --git a/lib/pleroma/digest_email_worker.ex b/lib/pleroma/daemons/digest_email_daemon.ex index 5644d6a67..462ad2c55 100644 --- a/lib/pleroma/digest_email_worker.ex +++ b/lib/pleroma/daemons/digest_email_daemon.ex @@ -2,10 +2,11 @@ # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> # SPDX-License-Identifier: AGPL-3.0-only -defmodule Pleroma.DigestEmailWorker do - import Ecto.Query +defmodule Pleroma.Daemons.DigestEmailDaemon do + alias Pleroma.Repo + alias Pleroma.Workers.DigestEmailsWorker - @queue_name :digest_emails + import Ecto.Query def perform do config = Pleroma.Config.get([:email_notifications, :digest]) @@ -20,8 +21,10 @@ defmodule Pleroma.DigestEmailWorker do where: u.last_digest_emailed_at < datetime_add(^now, ^negative_interval, "day"), select: u ) - |> Pleroma.Repo.all() - |> Enum.each(&PleromaJobQueue.enqueue(@queue_name, __MODULE__, [&1])) + |> Repo.all() + |> Enum.each(fn user -> + DigestEmailsWorker.enqueue("digest_email", %{"user_id" => user.id}) + end) end @doc """ diff --git a/lib/pleroma/scheduled_activity_worker.ex b/lib/pleroma/daemons/scheduled_activity_daemon.ex index 8578cab5e..aee5f723a 100644 --- a/lib/pleroma/scheduled_activity_worker.ex +++ b/lib/pleroma/daemons/scheduled_activity_daemon.ex @@ -2,7 +2,7 @@ # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> # SPDX-License-Identifier: AGPL-3.0-only -defmodule Pleroma.ScheduledActivityWorker do +defmodule Pleroma.Daemons.ScheduledActivityDaemon do @moduledoc """ Sends scheduled activities to the job queue. """ @@ -11,6 +11,7 @@ defmodule Pleroma.ScheduledActivityWorker do alias Pleroma.ScheduledActivity alias Pleroma.User alias Pleroma.Web.CommonAPI + use GenServer require Logger @@ -45,7 +46,10 @@ defmodule Pleroma.ScheduledActivityWorker do def handle_info(:perform, state) do ScheduledActivity.due_activities(@schedule_interval) |> Enum.each(fn scheduled_activity -> - PleromaJobQueue.enqueue(:scheduled_activities, __MODULE__, [:execute, scheduled_activity.id]) + Pleroma.Workers.ScheduledActivityWorker.enqueue( + "execute", + %{"activity_id" => scheduled_activity.id} + ) end) schedule_next() diff --git a/lib/pleroma/delivery.ex b/lib/pleroma/delivery.ex new file mode 100644 index 000000000..29a1e5a77 --- /dev/null +++ b/lib/pleroma/delivery.ex @@ -0,0 +1,51 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Delivery do + use Ecto.Schema + + alias Pleroma.Delivery + alias Pleroma.FlakeId + alias Pleroma.Object + alias Pleroma.Repo + alias Pleroma.User + alias Pleroma.User + + import Ecto.Changeset + import Ecto.Query + + schema "deliveries" do + belongs_to(:user, User, type: FlakeId) + belongs_to(:object, Object) + end + + def changeset(delivery, params \\ %{}) do + delivery + |> cast(params, [:user_id, :object_id]) + |> validate_required([:user_id, :object_id]) + |> foreign_key_constraint(:object_id) + |> foreign_key_constraint(:user_id) + |> unique_constraint(:user_id, name: :deliveries_user_id_object_id_index) + end + + def create(object_id, user_id) do + %Delivery{} + |> changeset(%{user_id: user_id, object_id: object_id}) + |> Repo.insert(on_conflict: :nothing) + end + + def get(object_id, user_id) do + from(d in Delivery, where: d.user_id == ^user_id and d.object_id == ^object_id) + |> Repo.one() + end + + # A hack because user delete activities have a fake id for whatever reason + # TODO: Get rid of this + def delete_all_by_object_id("pleroma:fake_object_id"), do: {0, []} + + def delete_all_by_object_id(object_id) do + from(d in Delivery, where: d.object_id == ^object_id) + |> Repo.delete_all() + end +end diff --git a/lib/pleroma/docs/markdown.ex b/lib/pleroma/docs/markdown.ex index 24930cc9f..8386dc2fb 100644 --- a/lib/pleroma/docs/markdown.ex +++ b/lib/pleroma/docs/markdown.ex @@ -3,9 +3,9 @@ defmodule Pleroma.Docs.Markdown do @spec process(keyword()) :: {:ok, String.t()} def process(descriptions) do - config_path = "docs/config.md" + config_path = "docs/generated_config.md" {:ok, file} = File.open(config_path, [:utf8, :write]) - IO.write(file, "# Configuration\n") + IO.write(file, "# Generated configuration\n") IO.write(file, "Date of generation: #{Date.utc_today()}\n\n") IO.write( diff --git a/lib/pleroma/emails/mailer.ex b/lib/pleroma/emails/mailer.ex index 2e4657b7c..eb96f2e8b 100644 --- a/lib/pleroma/emails/mailer.ex +++ b/lib/pleroma/emails/mailer.ex @@ -9,6 +9,7 @@ defmodule Pleroma.Emails.Mailer do The module contains functions to delivery email using Swoosh.Mailer. """ + alias Pleroma.Workers.MailerWorker alias Swoosh.DeliveryError @otp_app :pleroma @@ -19,7 +20,12 @@ defmodule Pleroma.Emails.Mailer do @doc "add email to queue" def deliver_async(email, config \\ []) do - PleromaJobQueue.enqueue(:mailer, __MODULE__, [:deliver_async, email, config]) + encoded_email = + email + |> :erlang.term_to_binary() + |> Base.encode64() + + MailerWorker.enqueue("email", %{"encoded_email" => encoded_email, "config" => config}) end @doc "callback to perform send email from queue" diff --git a/lib/pleroma/instances/instance.ex b/lib/pleroma/instances/instance.ex index 4d7ed4ca1..544c4b687 100644 --- a/lib/pleroma/instances/instance.ex +++ b/lib/pleroma/instances/instance.ex @@ -90,7 +90,7 @@ defmodule Pleroma.Instances.Instance do def set_unreachable(url_or_host, unreachable_since \\ nil) def set_unreachable(url_or_host, unreachable_since) when is_binary(url_or_host) do - unreachable_since = unreachable_since || DateTime.utc_now() + unreachable_since = parse_datetime(unreachable_since) || NaiveDateTime.utc_now() host = host(url_or_host) existing_record = Repo.get_by(Instance, %{host: host}) @@ -114,4 +114,10 @@ defmodule Pleroma.Instances.Instance do end def set_unreachable(_, _), do: {:error, nil} + + defp parse_datetime(datetime) when is_binary(datetime) do + NaiveDateTime.from_iso8601(datetime) + end + + defp parse_datetime(datetime), do: datetime end diff --git a/lib/pleroma/notification.ex b/lib/pleroma/notification.ex index 716d98733..931cabc3f 100644 --- a/lib/pleroma/notification.ex +++ b/lib/pleroma/notification.ex @@ -210,8 +210,10 @@ defmodule Pleroma.Notification do unless skip?(activity, user) do notification = %Notification{user_id: user.id, activity: activity} {:ok, notification} = Repo.insert(notification) - Streamer.stream("user", notification) - Streamer.stream("user:notification", notification) + + ["user", "user:notification"] + |> Streamer.stream(notification) + Push.send(notification) notification end diff --git a/lib/pleroma/plugs/cache.ex b/lib/pleroma/plugs/cache.ex index a81a861d0..50b534e7b 100644 --- a/lib/pleroma/plugs/cache.ex +++ b/lib/pleroma/plugs/cache.ex @@ -20,6 +20,7 @@ defmodule Pleroma.Plugs.Cache do - `ttl`: An expiration time (time-to-live). This value should be in milliseconds or `nil` to disable expiration. Defaults to `nil`. - `query_params`: Take URL query string into account (`true`), ignore it (`false`) or limit to specific params only (list). Defaults to `true`. + - `tracking_fun`: A function that is called on successfull responses, no matter if the request is cached or not. It should accept a conn as the first argument and the value assigned to `tracking_fun_data` as the second. Additionally, you can overwrite the TTL inside a controller action by assigning `cache_ttl` to the connection struct: @@ -56,6 +57,11 @@ defmodule Pleroma.Plugs.Cache do {:ok, nil} -> cache_resp(conn, opts) + {:ok, {content_type, body, tracking_fun_data}} -> + conn = opts.tracking_fun.(conn, tracking_fun_data) + + send_cached(conn, {content_type, body}) + {:ok, record} -> send_cached(conn, record) @@ -88,9 +94,17 @@ defmodule Pleroma.Plugs.Cache do ttl = Map.get(conn.assigns, :cache_ttl, opts.ttl) key = cache_key(conn, opts) content_type = content_type(conn) - record = {content_type, body} - Cachex.put(:web_resp_cache, key, record, ttl: ttl) + conn = + unless opts[:tracking_fun] do + Cachex.put(:web_resp_cache, key, {content_type, body}, ttl: ttl) + conn + else + tracking_fun_data = Map.get(conn.assigns, :tracking_fun_data, nil) + Cachex.put(:web_resp_cache, key, {content_type, body, tracking_fun_data}, ttl: ttl) + + opts.tracking_fun.(conn, tracking_fun_data) + end put_resp_header(conn, "x-cache", "MISS from Pleroma") diff --git a/lib/pleroma/plugs/http_signature.ex b/lib/pleroma/plugs/http_signature.ex index d87fa52fa..23d22a712 100644 --- a/lib/pleroma/plugs/http_signature.ex +++ b/lib/pleroma/plugs/http_signature.ex @@ -15,7 +15,8 @@ defmodule Pleroma.Web.Plugs.HTTPSignaturePlug do end def call(conn, _opts) do - [signature | _] = get_req_header(conn, "signature") + headers = get_req_header(conn, "signature") + signature = Enum.at(headers, 0) if signature do # set (request-target) header to the appropriate value diff --git a/lib/pleroma/scheduler.ex b/lib/pleroma/scheduler.ex new file mode 100644 index 000000000..d84cd99ad --- /dev/null +++ b/lib/pleroma/scheduler.ex @@ -0,0 +1,7 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Scheduler do + use Quantum.Scheduler, otp_app: :pleroma +end diff --git a/lib/pleroma/user.ex b/lib/pleroma/user.ex index 5e10ac25f..dd2b1c8c4 100644 --- a/lib/pleroma/user.ex +++ b/lib/pleroma/user.ex @@ -11,6 +11,7 @@ defmodule Pleroma.User do alias Comeonin.Pbkdf2 alias Ecto.Multi alias Pleroma.Activity + alias Pleroma.Delivery alias Pleroma.Keys alias Pleroma.Notification alias Pleroma.Object @@ -27,6 +28,7 @@ defmodule Pleroma.User do alias Pleroma.Web.OStatus alias Pleroma.Web.RelMe alias Pleroma.Web.Websub + alias Pleroma.Workers.BackgroundWorker require Logger @@ -61,6 +63,7 @@ defmodule Pleroma.User do field(:last_digest_emailed_at, :naive_datetime) has_many(:notifications, Notification) has_many(:registrations, Registration) + has_many(:deliveries, Delivery) embeds_one(:info, User.Info) timestamps() @@ -174,11 +177,25 @@ defmodule Pleroma.User do |> Repo.aggregate(:count, :id) end + defp truncate_if_exists(params, key, max_length) do + if Map.has_key?(params, key) and is_binary(params[key]) do + {value, _chopped} = String.split_at(params[key], max_length) + Map.put(params, key, value) + else + params + end + end + def remote_user_creation(params) do bio_limit = Pleroma.Config.get([:instance, :user_bio_length], 5000) name_limit = Pleroma.Config.get([:instance, :user_name_length], 100) - params = Map.put(params, :info, params[:info] || %{}) + params = + params + |> Map.put(:info, params[:info] || %{}) + |> truncate_if_exists(:name, name_limit) + |> truncate_if_exists(:bio, bio_limit) + info_cng = User.Info.remote_user_creation(%User.Info{}, params[:info]) changes = @@ -633,8 +650,9 @@ defmodule Pleroma.User do end @doc "Fetch some posts when the user has just been federated with" - def fetch_initial_posts(user), - do: PleromaJobQueue.enqueue(:background, __MODULE__, [:fetch_initial_posts, user]) + def fetch_initial_posts(user) do + BackgroundWorker.enqueue("fetch_initial_posts", %{"user_id" => user.id}) + end @spec get_followers_query(User.t(), pos_integer() | nil) :: Ecto.Query.t() def get_followers_query(%User{} = user, nil) do @@ -1064,7 +1082,7 @@ defmodule Pleroma.User do end def deactivate_async(user, status \\ true) do - PleromaJobQueue.enqueue(:background, __MODULE__, [:deactivate_async, user, status]) + BackgroundWorker.enqueue("deactivate_user", %{"user_id" => user.id, "status" => status}) end def deactivate(%User{} = user, status \\ true) do @@ -1092,9 +1110,9 @@ defmodule Pleroma.User do |> update_and_set_cache() end - @spec delete(User.t()) :: :ok - def delete(%User{} = user), - do: PleromaJobQueue.enqueue(:background, __MODULE__, [:delete, user]) + def delete(%User{} = user) do + BackgroundWorker.enqueue("delete_user", %{"user_id" => user.id}) + end @spec perform(atom(), User.t()) :: {:ok, User.t()} def perform(:delete, %User{} = user) do @@ -1201,21 +1219,20 @@ defmodule Pleroma.User do Repo.all(query) end - def blocks_import(%User{} = blocker, blocked_identifiers) when is_list(blocked_identifiers), - do: - PleromaJobQueue.enqueue(:background, __MODULE__, [ - :blocks_import, - blocker, - blocked_identifiers - ]) + def blocks_import(%User{} = blocker, blocked_identifiers) when is_list(blocked_identifiers) do + BackgroundWorker.enqueue("blocks_import", %{ + "blocker_id" => blocker.id, + "blocked_identifiers" => blocked_identifiers + }) + end - def follow_import(%User{} = follower, followed_identifiers) when is_list(followed_identifiers), - do: - PleromaJobQueue.enqueue(:background, __MODULE__, [ - :follow_import, - follower, - followed_identifiers - ]) + def follow_import(%User{} = follower, followed_identifiers) + when is_list(followed_identifiers) do + BackgroundWorker.enqueue("follow_import", %{ + "follower_id" => follower.id, + "followed_identifiers" => followed_identifiers + }) + end def delete_user_activities(%User{ap_id: ap_id} = user) do ap_id @@ -1625,6 +1642,18 @@ defmodule Pleroma.User do def is_internal_user?(%User{local: true, nickname: "internal." <> _}), do: true def is_internal_user?(_), do: false + # A hack because user delete activities have a fake id for whatever reason + # TODO: Get rid of this + def get_delivered_users_by_object_id("pleroma:fake_object_id"), do: [] + + def get_delivered_users_by_object_id(object_id) do + from(u in User, + inner_join: delivery in assoc(u, :deliveries), + where: delivery.object_id == ^object_id + ) + |> Repo.all() + end + def change_email(user, email) do user |> cast(%{email: email}, [:email]) diff --git a/lib/pleroma/user/info.ex b/lib/pleroma/user/info.ex index 779bfbc18..151e025de 100644 --- a/lib/pleroma/user/info.ex +++ b/lib/pleroma/user/info.ex @@ -242,6 +242,13 @@ defmodule Pleroma.User.Info do end def remote_user_creation(info, params) do + params = + if Map.has_key?(params, :fields) do + Map.put(params, :fields, Enum.map(params[:fields], &truncate_field/1)) + else + params + end + info |> cast(params, [ :ap_enabled, @@ -326,6 +333,16 @@ defmodule Pleroma.User.Info do defp valid_field?(_), do: false + defp truncate_field(%{"name" => name, "value" => value}) do + {name, _chopped} = + String.split_at(name, Pleroma.Config.get([:instance, :account_field_name_length], 255)) + + {value, _chopped} = + String.split_at(value, Pleroma.Config.get([:instance, :account_field_value_length], 255)) + + %{"name" => name, "value" => value} + end + @spec confirmation_changeset(Info.t(), keyword()) :: Changeset.t() def confirmation_changeset(info, opts) do need_confirmation? = Keyword.get(opts, :need_confirmation) diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index bc9a7a2d6..eac0b5a99 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -4,6 +4,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do alias Pleroma.Activity + alias Pleroma.Activity.Ir.Topics alias Pleroma.Config alias Pleroma.Conversation alias Pleroma.Notification @@ -17,7 +18,9 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do alias Pleroma.User alias Pleroma.Web.ActivityPub.MRF alias Pleroma.Web.ActivityPub.Transmogrifier + alias Pleroma.Web.Streamer alias Pleroma.Web.WebFinger + alias Pleroma.Workers.BackgroundWorker import Ecto.Query import Pleroma.Web.ActivityPub.Utils @@ -146,7 +149,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do activity end - PleromaJobQueue.enqueue(:background, Pleroma.Web.RichMedia.Helpers, [:fetch, activity]) + BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id}) Notification.create_notifications(activity) SubscriptionNotification.create_notifications(activity) @@ -188,9 +191,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do participations |> Repo.preload(:user) - Enum.each(participations, fn participation -> - Pleroma.Web.Streamer.stream("participation", participation) - end) + Streamer.stream("participation", participations) end def stream_out_participations(%Object{data: %{"context" => context}}, user) do @@ -209,41 +210,15 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do def stream_out_participations(_, _), do: :noop - def stream_out(activity) do - if activity.data["type"] in ["Create", "Announce", "Delete"] do - object = Object.normalize(activity) - # Do not stream out poll replies - unless object.data["type"] == "Answer" do - Pleroma.Web.Streamer.stream("user", activity) - Pleroma.Web.Streamer.stream("list", activity) - - if get_visibility(activity) == "public" do - Pleroma.Web.Streamer.stream("public", activity) - - if activity.local do - Pleroma.Web.Streamer.stream("public:local", activity) - end - - if activity.data["type"] in ["Create"] do - object.data - |> Map.get("tag", []) - |> Enum.filter(fn tag -> is_bitstring(tag) end) - |> Enum.each(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end) - - if object.data["attachment"] != [] do - Pleroma.Web.Streamer.stream("public:media", activity) - - if activity.local do - Pleroma.Web.Streamer.stream("public:local:media", activity) - end - end - end - else - if get_visibility(activity) == "direct", - do: Pleroma.Web.Streamer.stream("direct", activity) - end - end - end + def stream_out(%Activity{data: %{"type" => data_type}} = activity) + when data_type in ["Create", "Announce", "Delete"] do + activity + |> Topics.get_activity_topics() + |> Streamer.stream(activity) + end + + def stream_out(_activity) do + :noop end def create(%{to: to, actor: actor, context: context, object: object} = params, fake \\ false) do diff --git a/lib/pleroma/web/activity_pub/activity_pub_controller.ex b/lib/pleroma/web/activity_pub/activity_pub_controller.ex index 705dbc1c2..01b34fb1d 100644 --- a/lib/pleroma/web/activity_pub/activity_pub_controller.ex +++ b/lib/pleroma/web/activity_pub/activity_pub_controller.ex @@ -6,6 +6,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do use Pleroma.Web, :controller alias Pleroma.Activity + alias Pleroma.Delivery alias Pleroma.Object alias Pleroma.Object.Fetcher alias Pleroma.User @@ -23,7 +24,12 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do action_fallback(:errors) - plug(Pleroma.Plugs.Cache, [query_params: false] when action in [:activity, :object]) + plug( + Pleroma.Plugs.Cache, + [query_params: false, tracking_fun: &__MODULE__.track_object_fetch/2] + when action in [:activity, :object] + ) + plug(Pleroma.Web.FederatingPlug when action in [:inbox, :relay]) plug(:set_requester_reachable when action in [:inbox]) plug(:relay_active? when action in [:relay]) @@ -54,6 +60,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do %Object{} = object <- Object.get_cached_by_ap_id(ap_id), {_, true} <- {:public?, Visibility.is_public?(object)} do conn + |> assign(:tracking_fun_data, object.id) |> set_cache_ttl_for(object) |> put_resp_content_type("application/activity+json") |> put_view(ObjectView) @@ -64,6 +71,16 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do end end + def track_object_fetch(conn, nil), do: conn + + def track_object_fetch(conn, object_id) do + with %{assigns: %{user: %User{id: user_id}}} <- conn do + Delivery.create(object_id, user_id) + end + + conn + end + def object_likes(conn, %{"uuid" => uuid, "page" => page}) do with ap_id <- o_status_url(conn, :object, uuid), %Object{} = object <- Object.get_cached_by_ap_id(ap_id), @@ -99,6 +116,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do %Activity{} = activity <- Activity.normalize(ap_id), {_, true} <- {:public?, Visibility.is_public?(activity)} do conn + |> maybe_set_tracking_data(activity) |> set_cache_ttl_for(activity) |> put_resp_content_type("application/activity+json") |> put_view(ObjectView) @@ -109,6 +127,13 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do end end + defp maybe_set_tracking_data(conn, %Activity{data: %{"type" => "Create"}} = activity) do + object_id = Object.normalize(activity).id + assign(conn, :tracking_fun_data, object_id) + end + + defp maybe_set_tracking_data(conn, _activity), do: conn + defp set_cache_ttl_for(conn, %Activity{object: object}) do set_cache_ttl_for(conn, object) end diff --git a/lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex b/lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex index a179dd54d..26b8539fe 100644 --- a/lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex +++ b/lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex @@ -8,6 +8,7 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do alias Pleroma.HTTP alias Pleroma.Web.MediaProxy + alias Pleroma.Workers.BackgroundWorker require Logger @@ -30,7 +31,7 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do url |> Enum.each(fn %{"href" => href} -> - PleromaJobQueue.enqueue(:background, __MODULE__, [:prefetch, href]) + BackgroundWorker.enqueue("media_proxy_prefetch", %{"url" => href}) x -> Logger.debug("Unhandled attachment URL object #{inspect(x)}") @@ -46,7 +47,7 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do %{"type" => "Create", "object" => %{"attachment" => attachments} = _object} = message ) when is_list(attachments) and length(attachments) > 0 do - PleromaJobQueue.enqueue(:background, __MODULE__, [:preload, message]) + BackgroundWorker.enqueue("media_proxy_preload", %{"message" => message}) {:ok, message} end diff --git a/lib/pleroma/web/activity_pub/publisher.ex b/lib/pleroma/web/activity_pub/publisher.ex index c97405690..114251b24 100644 --- a/lib/pleroma/web/activity_pub/publisher.ex +++ b/lib/pleroma/web/activity_pub/publisher.ex @@ -5,8 +5,10 @@ defmodule Pleroma.Web.ActivityPub.Publisher do alias Pleroma.Activity alias Pleroma.Config + alias Pleroma.Delivery alias Pleroma.HTTP alias Pleroma.Instances + alias Pleroma.Object alias Pleroma.User alias Pleroma.Web.ActivityPub.Relay alias Pleroma.Web.ActivityPub.Transmogrifier @@ -84,6 +86,15 @@ defmodule Pleroma.Web.ActivityPub.Publisher do end end + def publish_one(%{actor_id: actor_id} = params) do + actor = User.get_cached_by_id(actor_id) + + params + |> Map.delete(:actor_id) + |> Map.put(:actor, actor) + |> publish_one() + end + defp should_federate?(inbox, public) do if public do true @@ -107,7 +118,18 @@ defmodule Pleroma.Web.ActivityPub.Publisher do {:ok, []} end - Pleroma.Web.Salmon.remote_users(actor, activity) ++ followers + fetchers = + with %Activity{data: %{"type" => "Delete"}} <- activity, + %Object{id: object_id} <- Object.normalize(activity), + fetchers <- User.get_delivered_users_by_object_id(object_id), + _ <- Delivery.delete_all_by_object_id(object_id) do + fetchers + else + _ -> + [] + end + + Pleroma.Web.Salmon.remote_users(actor, activity) ++ followers ++ fetchers end defp get_cc_ap_ids(ap_id, recipients) do @@ -159,7 +181,8 @@ defmodule Pleroma.Web.ActivityPub.Publisher do Publishes an activity with BCC to all relevant peers. """ - def publish(actor, %{data: %{"bcc" => bcc}} = activity) when is_list(bcc) and bcc != [] do + def publish(%User{} = actor, %{data: %{"bcc" => bcc}} = activity) + when is_list(bcc) and bcc != [] do public = is_public?(activity) {:ok, data} = Transmogrifier.prepare_outgoing(activity.data) @@ -186,7 +209,7 @@ defmodule Pleroma.Web.ActivityPub.Publisher do Pleroma.Web.Federator.Publisher.enqueue_one(__MODULE__, %{ inbox: inbox, json: json, - actor: actor, + actor_id: actor.id, id: activity.data["id"], unreachable_since: unreachable_since }) @@ -221,7 +244,7 @@ defmodule Pleroma.Web.ActivityPub.Publisher do %{ inbox: inbox, json: json, - actor: actor, + actor_id: actor.id, id: activity.data["id"], unreachable_since: unreachable_since } diff --git a/lib/pleroma/web/activity_pub/transmogrifier.ex b/lib/pleroma/web/activity_pub/transmogrifier.ex index 350b83abb..acb3087d0 100644 --- a/lib/pleroma/web/activity_pub/transmogrifier.ex +++ b/lib/pleroma/web/activity_pub/transmogrifier.ex @@ -15,6 +15,7 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do alias Pleroma.Web.ActivityPub.Utils alias Pleroma.Web.ActivityPub.Visibility alias Pleroma.Web.Federator + alias Pleroma.Workers.TransmogrifierWorker import Ecto.Query @@ -1051,7 +1052,7 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do already_ap <- User.ap_enabled?(user), {:ok, user} <- user |> User.upgrade_changeset(data) |> User.update_and_set_cache() do unless already_ap do - PleromaJobQueue.enqueue(:transmogrifier, __MODULE__, [:user_upgrade, user]) + TransmogrifierWorker.enqueue("user_upgrade", %{"user_id" => user.id}) end {:ok, user} diff --git a/lib/pleroma/web/activity_pub/utils.ex b/lib/pleroma/web/activity_pub/utils.ex index 47917f5d3..258e56066 100644 --- a/lib/pleroma/web/activity_pub/utils.ex +++ b/lib/pleroma/web/activity_pub/utils.ex @@ -167,14 +167,7 @@ defmodule Pleroma.Web.ActivityPub.Utils do @spec maybe_federate(any()) :: :ok def maybe_federate(%Activity{local: true} = activity) do if Pleroma.Config.get!([:instance, :federating]) do - priority = - case activity.data["type"] do - "Delete" -> 10 - "Create" -> 1 - _ -> 5 - end - - Pleroma.Web.Federator.publish(activity, priority) + Pleroma.Web.Federator.publish(activity) end :ok diff --git a/lib/pleroma/web/federator/federator.ex b/lib/pleroma/web/federator/federator.ex index f4f9e83e0..1a2da014a 100644 --- a/lib/pleroma/web/federator/federator.ex +++ b/lib/pleroma/web/federator/federator.ex @@ -10,16 +10,17 @@ defmodule Pleroma.Web.Federator do alias Pleroma.Web.ActivityPub.Transmogrifier alias Pleroma.Web.ActivityPub.Utils alias Pleroma.Web.Federator.Publisher - alias Pleroma.Web.Federator.RetryQueue alias Pleroma.Web.OStatus alias Pleroma.Web.Websub + alias Pleroma.Workers.PublisherWorker + alias Pleroma.Workers.ReceiverWorker + alias Pleroma.Workers.SubscriberWorker require Logger def init do - # 1 minute - Process.sleep(1000 * 60) - refresh_subscriptions() + # To do: consider removing this call in favor of scheduled execution (`quantum`-based) + refresh_subscriptions(schedule_in: 60) end @doc "Addresses [memory leaks on recursive replies fetching](https://git.pleroma.social/pleroma/pleroma/issues/161)" @@ -37,50 +38,38 @@ defmodule Pleroma.Web.Federator do # Client API def incoming_doc(doc) do - PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_doc, doc]) + ReceiverWorker.enqueue("incoming_doc", %{"body" => doc}) end def incoming_ap_doc(params) do - PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_ap_doc, params]) + ReceiverWorker.enqueue("incoming_ap_doc", %{"params" => params}) end - def publish(activity, priority \\ 1) do - PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish, activity], priority) + def publish(%{id: "pleroma:fakeid"} = activity) do + perform(:publish, activity) end - def verify_websub(websub) do - PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:verify_websub, websub]) + def publish(activity) do + PublisherWorker.enqueue("publish", %{"activity_id" => activity.id}) end - def request_subscription(sub) do - PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:request_subscription, sub]) + def verify_websub(websub) do + SubscriberWorker.enqueue("verify_websub", %{"websub_id" => websub.id}) end - def refresh_subscriptions do - PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:refresh_subscriptions]) + def request_subscription(websub) do + SubscriberWorker.enqueue("request_subscription", %{"websub_id" => websub.id}) end - # Job Worker Callbacks - - def perform(:refresh_subscriptions) do - Logger.debug("Federator running refresh subscriptions") - Websub.refresh_subscriptions() - - spawn(fn -> - # 6 hours - Process.sleep(1000 * 60 * 60 * 6) - refresh_subscriptions() - end) + def refresh_subscriptions(worker_args \\ []) do + SubscriberWorker.enqueue("refresh_subscriptions", %{}, worker_args ++ [max_attempts: 1]) end - def perform(:request_subscription, websub) do - Logger.debug("Refreshing #{websub.topic}") + # Job Worker Callbacks - with {:ok, websub} <- Websub.request_subscription(websub) do - Logger.debug("Successfully refreshed #{websub.topic}") - else - _e -> Logger.debug("Couldn't refresh #{websub.topic}") - end + @spec perform(atom(), module(), any()) :: {:ok, any()} | {:error, any()} + def perform(:publish_one, module, params) do + apply(module, :publish_one, [params]) end def perform(:publish, activity) do @@ -92,14 +81,6 @@ defmodule Pleroma.Web.Federator do end end - def perform(:verify_websub, websub) do - Logger.debug(fn -> - "Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})" - end) - - Websub.verify(websub) - end - def perform(:incoming_doc, doc) do Logger.info("Got document, trying to parse") OStatus.handle_incoming(doc) @@ -130,22 +111,27 @@ defmodule Pleroma.Web.Federator do end end - def perform( - :publish_single_websub, - %{xml: _xml, topic: _topic, callback: _callback, secret: _secret} = params - ) do - case Websub.publish_one(params) do - {:ok, _} -> - :ok + def perform(:request_subscription, websub) do + Logger.debug("Refreshing #{websub.topic}") - {:error, _} -> - RetryQueue.enqueue(params, Websub) + with {:ok, websub} <- Websub.request_subscription(websub) do + Logger.debug("Successfully refreshed #{websub.topic}") + else + _e -> Logger.debug("Couldn't refresh #{websub.topic}") end end - def perform(type, _) do - Logger.debug(fn -> "Unknown task: #{type}" end) - {:error, "Don't know what to do with this"} + def perform(:verify_websub, websub) do + Logger.debug(fn -> + "Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})" + end) + + Websub.verify(websub) + end + + def perform(:refresh_subscriptions) do + Logger.debug("Federator running refresh subscriptions") + Websub.refresh_subscriptions() end def ap_enabled_actor(id) do diff --git a/lib/pleroma/web/federator/publisher.ex b/lib/pleroma/web/federator/publisher.ex index 70f870244..937064638 100644 --- a/lib/pleroma/web/federator/publisher.ex +++ b/lib/pleroma/web/federator/publisher.ex @@ -6,7 +6,7 @@ defmodule Pleroma.Web.Federator.Publisher do alias Pleroma.Activity alias Pleroma.Config alias Pleroma.User - alias Pleroma.Web.Federator.RetryQueue + alias Pleroma.Workers.PublisherWorker require Logger @@ -30,23 +30,11 @@ defmodule Pleroma.Web.Federator.Publisher do Enqueue publishing a single activity. """ @spec enqueue_one(module(), Map.t()) :: :ok - def enqueue_one(module, %{} = params), - do: PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish_one, module, params]) - - @spec perform(atom(), module(), any()) :: {:ok, any()} | {:error, any()} - def perform(:publish_one, module, params) do - case apply(module, :publish_one, [params]) do - {:ok, _} -> - :ok - - {:error, _e} -> - RetryQueue.enqueue(params, module) - end - end - - def perform(type, _, _) do - Logger.debug("Unknown task: #{type}") - {:error, "Don't know what to do with this"} + def enqueue_one(module, %{} = params) do + PublisherWorker.enqueue( + "publish_one", + %{"module" => to_string(module), "params" => params} + ) end @doc """ diff --git a/lib/pleroma/web/federator/retry_queue.ex b/lib/pleroma/web/federator/retry_queue.ex deleted file mode 100644 index 9eab8c218..000000000 --- a/lib/pleroma/web/federator/retry_queue.ex +++ /dev/null @@ -1,239 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Web.Federator.RetryQueue do - use GenServer - - require Logger - - def init(args) do - queue_table = :ets.new(:pleroma_retry_queue, [:bag, :protected]) - - {:ok, %{args | queue_table: queue_table, running_jobs: :sets.new()}} - end - - def start_link(_) do - enabled = - if Pleroma.Config.get(:env) == :test, - do: true, - else: Pleroma.Config.get([__MODULE__, :enabled], false) - - if enabled do - Logger.info("Starting retry queue") - - linkres = - GenServer.start_link( - __MODULE__, - %{delivered: 0, dropped: 0, queue_table: nil, running_jobs: nil}, - name: __MODULE__ - ) - - maybe_kickoff_timer() - linkres - else - Logger.info("Retry queue disabled") - :ignore - end - end - - def enqueue(data, transport, retries \\ 0) do - GenServer.cast(__MODULE__, {:maybe_enqueue, data, transport, retries + 1}) - end - - def get_stats do - GenServer.call(__MODULE__, :get_stats) - end - - def reset_stats do - GenServer.call(__MODULE__, :reset_stats) - end - - def get_retry_params(retries) do - if retries > Pleroma.Config.get([__MODULE__, :max_retries]) do - {:drop, "Max retries reached"} - else - {:retry, growth_function(retries)} - end - end - - def get_retry_timer_interval do - Pleroma.Config.get([:retry_queue, :interval], 1000) - end - - defp ets_count_expires(table, current_time) do - :ets.select_count( - table, - [ - { - {:"$1", :"$2"}, - [{:"=<", :"$1", {:const, current_time}}], - [true] - } - ] - ) - end - - defp ets_pop_n_expired(table, current_time, desired) do - {popped, _continuation} = - :ets.select( - table, - [ - { - {:"$1", :"$2"}, - [{:"=<", :"$1", {:const, current_time}}], - [:"$_"] - } - ], - desired - ) - - popped - |> Enum.each(fn e -> - :ets.delete_object(table, e) - end) - - popped - end - - def maybe_start_job(running_jobs, queue_table) do - # we don't want to hit the ets or the DateTime more times than we have to - # could optimize slightly further by not using the count, and instead grabbing - # up to N objects early... - current_time = DateTime.to_unix(DateTime.utc_now()) - n_running_jobs = :sets.size(running_jobs) - - if n_running_jobs < Pleroma.Config.get([__MODULE__, :max_jobs]) do - n_ready_jobs = ets_count_expires(queue_table, current_time) - - if n_ready_jobs > 0 do - # figure out how many we could start - available_job_slots = Pleroma.Config.get([__MODULE__, :max_jobs]) - n_running_jobs - start_n_jobs(running_jobs, queue_table, current_time, available_job_slots) - else - running_jobs - end - else - running_jobs - end - end - - defp start_n_jobs(running_jobs, _queue_table, _current_time, 0) do - running_jobs - end - - defp start_n_jobs(running_jobs, queue_table, current_time, available_job_slots) - when available_job_slots > 0 do - candidates = ets_pop_n_expired(queue_table, current_time, available_job_slots) - - candidates - |> List.foldl(running_jobs, fn {_, e}, rj -> - {:ok, pid} = Task.start(fn -> worker(e) end) - mref = Process.monitor(pid) - :sets.add_element(mref, rj) - end) - end - - def worker({:send, data, transport, retries}) do - case transport.publish_one(data) do - {:ok, _} -> - GenServer.cast(__MODULE__, :inc_delivered) - :delivered - - {:error, _reason} -> - enqueue(data, transport, retries) - :retry - end - end - - def handle_call(:get_stats, _from, %{delivered: delivery_count, dropped: drop_count} = state) do - {:reply, %{delivered: delivery_count, dropped: drop_count}, state} - end - - def handle_call(:reset_stats, _from, %{delivered: delivery_count, dropped: drop_count} = state) do - {:reply, %{delivered: delivery_count, dropped: drop_count}, - %{state | delivered: 0, dropped: 0}} - end - - def handle_cast(:reset_stats, state) do - {:noreply, %{state | delivered: 0, dropped: 0}} - end - - def handle_cast( - {:maybe_enqueue, data, transport, retries}, - %{dropped: drop_count, queue_table: queue_table, running_jobs: running_jobs} = state - ) do - case get_retry_params(retries) do - {:retry, timeout} -> - :ets.insert(queue_table, {timeout, {:send, data, transport, retries}}) - running_jobs = maybe_start_job(running_jobs, queue_table) - {:noreply, %{state | running_jobs: running_jobs}} - - {:drop, message} -> - Logger.debug(message) - {:noreply, %{state | dropped: drop_count + 1}} - end - end - - def handle_cast(:kickoff_timer, state) do - retry_interval = get_retry_timer_interval() - Process.send_after(__MODULE__, :retry_timer_run, retry_interval) - {:noreply, state} - end - - def handle_cast(:inc_delivered, %{delivered: delivery_count} = state) do - {:noreply, %{state | delivered: delivery_count + 1}} - end - - def handle_cast(:inc_dropped, %{dropped: drop_count} = state) do - {:noreply, %{state | dropped: drop_count + 1}} - end - - def handle_info({:send, data, transport, retries}, %{delivered: delivery_count} = state) do - case transport.publish_one(data) do - {:ok, _} -> - {:noreply, %{state | delivered: delivery_count + 1}} - - {:error, _reason} -> - enqueue(data, transport, retries) - {:noreply, state} - end - end - - def handle_info( - :retry_timer_run, - %{queue_table: queue_table, running_jobs: running_jobs} = state - ) do - maybe_kickoff_timer() - running_jobs = maybe_start_job(running_jobs, queue_table) - {:noreply, %{state | running_jobs: running_jobs}} - end - - def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do - %{running_jobs: running_jobs, queue_table: queue_table} = state - running_jobs = :sets.del_element(ref, running_jobs) - running_jobs = maybe_start_job(running_jobs, queue_table) - {:noreply, %{state | running_jobs: running_jobs}} - end - - def handle_info(unknown, state) do - Logger.debug("RetryQueue: don't know what to do with #{inspect(unknown)}, ignoring") - {:noreply, state} - end - - if Pleroma.Config.get(:env) == :test do - defp growth_function(_retries) do - _shutit = Pleroma.Config.get([__MODULE__, :initial_timeout]) - DateTime.to_unix(DateTime.utc_now()) - 1 - end - else - defp growth_function(retries) do - round(Pleroma.Config.get([__MODULE__, :initial_timeout]) * :math.pow(retries, 3)) + - DateTime.to_unix(DateTime.utc_now()) - end - end - - defp maybe_kickoff_timer do - GenServer.cast(__MODULE__, :kickoff_timer) - end -end diff --git a/lib/pleroma/web/mastodon_api/websocket_handler.ex b/lib/pleroma/web/mastodon_api/websocket_handler.ex index dbd3542ea..3c26eb406 100644 --- a/lib/pleroma/web/mastodon_api/websocket_handler.ex +++ b/lib/pleroma/web/mastodon_api/websocket_handler.ex @@ -8,6 +8,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do alias Pleroma.Repo alias Pleroma.User alias Pleroma.Web.OAuth.Token + alias Pleroma.Web.Streamer @behaviour :cowboy_websocket @@ -24,7 +25,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do ] @anonymous_streams ["public", "public:local", "hashtag"] - # Handled by periodic keepalive in Pleroma.Web.Streamer. + # Handled by periodic keepalive in Pleroma.Web.Streamer.Ping. @timeout :infinity def init(%{qs: qs} = req, state) do @@ -65,7 +66,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do }, topic #{state.topic}" ) - Pleroma.Web.Streamer.add_socket(state.topic, streamer_socket(state)) + Streamer.add_socket(state.topic, streamer_socket(state)) {:ok, state} end @@ -80,7 +81,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do }, topic #{state.topic || "?"}: #{inspect(reason)}" ) - Pleroma.Web.Streamer.remove_socket(state.topic, streamer_socket(state)) + Streamer.remove_socket(state.topic, streamer_socket(state)) :ok end diff --git a/lib/pleroma/web/oauth/token/clean_worker.ex b/lib/pleroma/web/oauth/token/clean_worker.ex index f50098302..eb94bf86f 100644 --- a/lib/pleroma/web/oauth/token/clean_worker.ex +++ b/lib/pleroma/web/oauth/token/clean_worker.ex @@ -17,6 +17,7 @@ defmodule Pleroma.Web.OAuth.Token.CleanWorker do ) alias Pleroma.Web.OAuth.Token + alias Pleroma.Workers.BackgroundWorker def start_link(_), do: GenServer.start_link(__MODULE__, %{}) @@ -27,9 +28,11 @@ defmodule Pleroma.Web.OAuth.Token.CleanWorker do @doc false def handle_info(:perform, state) do - Token.delete_expired_tokens() + BackgroundWorker.enqueue("clean_expired_tokens", %{}) Process.send_after(self(), :perform, @interval) {:noreply, state} end + + def perform(:clean), do: Token.delete_expired_tokens() end diff --git a/lib/pleroma/web/push/push.ex b/lib/pleroma/web/push/push.ex index 729dad02a..7ef1532ac 100644 --- a/lib/pleroma/web/push/push.ex +++ b/lib/pleroma/web/push/push.ex @@ -3,7 +3,7 @@ # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Web.Push do - alias Pleroma.Web.Push.Impl + alias Pleroma.Workers.WebPusherWorker require Logger @@ -31,6 +31,7 @@ defmodule Pleroma.Web.Push do end end - def send(notification), - do: PleromaJobQueue.enqueue(:web_push, Impl, [notification]) + def send(notification) do + WebPusherWorker.enqueue("web_push", %{"notification_id" => notification.id}) + end end diff --git a/lib/pleroma/web/rich_media/parser.ex b/lib/pleroma/web/rich_media/parser.ex index f5f9e358c..c06b0a0f2 100644 --- a/lib/pleroma/web/rich_media/parser.ex +++ b/lib/pleroma/web/rich_media/parser.ex @@ -81,6 +81,7 @@ defmodule Pleroma.Web.RichMedia.Parser do {:ok, %Tesla.Env{body: html}} = Pleroma.HTTP.get(url, [], adapter: @hackney_options) html + |> parse_html |> maybe_parse() |> Map.put(:url, url) |> clean_parsed_data() @@ -91,6 +92,8 @@ defmodule Pleroma.Web.RichMedia.Parser do end end + defp parse_html(html), do: Floki.parse(html) + defp maybe_parse(html) do Enum.reduce_while(parsers(), %{}, fn parser, acc -> case parser.parse(html, acc) do @@ -100,7 +103,8 @@ defmodule Pleroma.Web.RichMedia.Parser do end) end - defp check_parsed_data(%{title: title} = data) when is_binary(title) and byte_size(title) > 0 do + defp check_parsed_data(%{title: title} = data) + when is_binary(title) and byte_size(title) > 0 do {:ok, data} end diff --git a/lib/pleroma/web/router.ex b/lib/pleroma/web/router.ex index dbd0deecd..409fc9eca 100644 --- a/lib/pleroma/web/router.ex +++ b/lib/pleroma/web/router.ex @@ -135,6 +135,7 @@ defmodule Pleroma.Web.Router do pipeline :http_signature do plug(Pleroma.Web.Plugs.HTTPSignaturePlug) + plug(Pleroma.Web.Plugs.MappedSignatureToIdentityPlug) end scope "/api/pleroma", Pleroma.Web.TwitterAPI do @@ -542,6 +543,7 @@ defmodule Pleroma.Web.Router do scope "/", Pleroma.Web do pipe_through(:ostatus) + pipe_through(:http_signature) get("/objects/:uuid", OStatus.OStatusController, :object) get("/activities/:uuid", OStatus.OStatusController, :activity) diff --git a/lib/pleroma/web/salmon/salmon.ex b/lib/pleroma/web/salmon/salmon.ex index 9b01ebcc6..8ba7380c0 100644 --- a/lib/pleroma/web/salmon/salmon.ex +++ b/lib/pleroma/web/salmon/salmon.ex @@ -170,6 +170,15 @@ defmodule Pleroma.Web.Salmon do end end + def publish_one(%{recipient_id: recipient_id} = params) do + recipient = User.get_cached_by_id(recipient_id) + + params + |> Map.delete(:recipient_id) + |> Map.put(:recipient, recipient) + |> publish_one() + end + def publish_one(_), do: :noop @supported_activities [ @@ -218,7 +227,7 @@ defmodule Pleroma.Web.Salmon do Logger.debug(fn -> "Sending Salmon to #{remote_user.ap_id}" end) Publisher.enqueue_one(__MODULE__, %{ - recipient: remote_user, + recipient_id: remote_user.id, feed: feed, unreachable_since: reachable_urls_metadata[remote_user.info.salmon] }) diff --git a/lib/pleroma/web/streamer.ex b/lib/pleroma/web/streamer.ex deleted file mode 100644 index 42d95e33a..000000000 --- a/lib/pleroma/web/streamer.ex +++ /dev/null @@ -1,326 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Web.Streamer do - use GenServer - require Logger - alias Pleroma.Activity - alias Pleroma.Config - alias Pleroma.Conversation.Participation - alias Pleroma.Notification - alias Pleroma.Object - alias Pleroma.SubscriptionNotification - alias Pleroma.User - alias Pleroma.Web.ActivityPub.ActivityPub - alias Pleroma.Web.ActivityPub.Visibility - alias Pleroma.Web.CommonAPI - alias Pleroma.Web.MastodonAPI.NotificationView - - @keepalive_interval :timer.seconds(30) - - def start_link(_) do - GenServer.start_link(__MODULE__, %{}, name: __MODULE__) - end - - def add_socket(topic, socket) do - GenServer.cast(__MODULE__, %{action: :add, socket: socket, topic: topic}) - end - - def remove_socket(topic, socket) do - GenServer.cast(__MODULE__, %{action: :remove, socket: socket, topic: topic}) - end - - def stream(topic, item) do - GenServer.cast(__MODULE__, %{action: :stream, topic: topic, item: item}) - end - - def init(args) do - Process.send_after(self(), %{action: :ping}, @keepalive_interval) - - {:ok, args} - end - - def handle_info(%{action: :ping}, topics) do - topics - |> Map.values() - |> List.flatten() - |> Enum.each(fn socket -> - Logger.debug("Sending keepalive ping") - send(socket.transport_pid, {:text, ""}) - end) - - Process.send_after(self(), %{action: :ping}, @keepalive_interval) - - {:noreply, topics} - end - - def handle_cast(%{action: :stream, topic: "direct", item: item}, topics) do - recipient_topics = - User.get_recipients_from_activity(item) - |> Enum.map(fn %{id: id} -> "direct:#{id}" end) - - Enum.each(recipient_topics || [], fn user_topic -> - Logger.debug("Trying to push direct message to #{user_topic}\n\n") - push_to_socket(topics, user_topic, item) - end) - - {:noreply, topics} - end - - def handle_cast(%{action: :stream, topic: "participation", item: participation}, topics) do - user_topic = "direct:#{participation.user_id}" - Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n") - - push_to_socket(topics, user_topic, participation) - - {:noreply, topics} - end - - def handle_cast(%{action: :stream, topic: "list", item: item}, topics) do - # filter the recipient list if the activity is not public, see #270. - recipient_lists = - case Visibility.is_public?(item) do - true -> - Pleroma.List.get_lists_from_activity(item) - - _ -> - Pleroma.List.get_lists_from_activity(item) - |> Enum.filter(fn list -> - owner = User.get_cached_by_id(list.user_id) - - Visibility.visible_for_user?(item, owner) - end) - end - - recipient_topics = - recipient_lists - |> Enum.map(fn %{id: id} -> "list:#{id}" end) - - Enum.each(recipient_topics || [], fn list_topic -> - Logger.debug("Trying to push message to #{list_topic}\n\n") - push_to_socket(topics, list_topic, item) - end) - - {:noreply, topics} - end - - def handle_cast( - %{action: :stream, topic: topic, item: %Notification{} = item}, - topics - ) - when topic in ["user", "user:notification"] do - topics - |> Map.get("#{topic}:#{item.user_id}", []) - |> Enum.each(fn socket -> - with %User{} = user <- User.get_cached_by_ap_id(socket.assigns[:user].ap_id), - true <- should_send?(user, item) do - send( - socket.transport_pid, - {:text, represent_notification(socket.assigns[:user], item)} - ) - end - end) - - {:noreply, topics} - end - - def handle_cast(%{action: :stream, topic: "user", item: item}, topics) do - Logger.debug("Trying to push to users") - - recipient_topics = - User.get_recipients_from_activity(item) - |> Enum.map(fn %{id: id} -> "user:#{id}" end) - - Enum.each(recipient_topics, fn topic -> - push_to_socket(topics, topic, item) - end) - - {:noreply, topics} - end - - def handle_cast(%{action: :stream, topic: topic, item: item}, topics) do - Logger.debug("Trying to push to #{topic}") - Logger.debug("Pushing item to #{topic}") - push_to_socket(topics, topic, item) - {:noreply, topics} - end - - def handle_cast(%{action: :add, topic: topic, socket: socket}, sockets) do - topic = internal_topic(topic, socket) - sockets_for_topic = sockets[topic] || [] - sockets_for_topic = Enum.uniq([socket | sockets_for_topic]) - sockets = Map.put(sockets, topic, sockets_for_topic) - Logger.debug("Got new conn for #{topic}") - {:noreply, sockets} - end - - def handle_cast(%{action: :remove, topic: topic, socket: socket}, sockets) do - topic = internal_topic(topic, socket) - sockets_for_topic = sockets[topic] || [] - sockets_for_topic = List.delete(sockets_for_topic, socket) - sockets = Map.put(sockets, topic, sockets_for_topic) - Logger.debug("Removed conn for #{topic}") - {:noreply, sockets} - end - - def handle_cast(m, state) do - Logger.info("Unknown: #{inspect(m)}, #{inspect(state)}") - {:noreply, state} - end - - defp represent_update(%Activity{} = activity, %User{} = user) do - %{ - event: "update", - payload: - Pleroma.Web.MastodonAPI.StatusView.render( - "status.json", - activity: activity, - for: user - ) - |> Jason.encode!() - } - |> Jason.encode!() - end - - defp represent_update(%Activity{} = activity) do - %{ - event: "update", - payload: - Pleroma.Web.MastodonAPI.StatusView.render( - "status.json", - activity: activity - ) - |> Jason.encode!() - } - |> Jason.encode!() - end - - def represent_conversation(%Participation{} = participation) do - %{ - event: "conversation", - payload: - Pleroma.Web.MastodonAPI.ConversationView.render("participation.json", %{ - participation: participation, - for: participation.user - }) - |> Jason.encode!() - } - |> Jason.encode!() - end - - @spec represent_notification(User.t(), Notification.t() | %SubscriptionNotification{}) :: - binary() - defp represent_notification(%User{} = user, notify) do - event = - case notify do - %Notification{} -> "notification" - %SubscriptionNotification{} -> "subscription_norification" - end - - %{ - event: event, - payload: - NotificationView.render( - "show.json", - %{notification: notify, for: user} - ) - |> Jason.encode!() - } - |> Jason.encode!() - end - - defp should_send?(%User{} = user, %Activity{} = item) do - blocks = user.info.blocks || [] - mutes = user.info.mutes || [] - reblog_mutes = user.info.muted_reblogs || [] - domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.info.domain_blocks) - - with parent when not is_nil(parent) <- Object.normalize(item), - true <- Enum.all?([blocks, mutes, reblog_mutes], &(item.actor not in &1)), - true <- Enum.all?([blocks, mutes], &(parent.data["actor"] not in &1)), - %{host: item_host} <- URI.parse(item.actor), - %{host: parent_host} <- URI.parse(parent.data["actor"]), - false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host), - false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host), - true <- thread_containment(item, user), - false <- CommonAPI.thread_muted?(user, item) do - true - else - _ -> false - end - end - - defp should_send?(%User{} = user, %Notification{activity: activity}) do - should_send?(user, activity) - end - - def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do - Enum.each(topics[topic] || [], fn socket -> - # Get the current user so we have up-to-date blocks etc. - if socket.assigns[:user] do - user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id) - - if should_send?(user, item) do - send(socket.transport_pid, {:text, represent_update(item, user)}) - end - else - send(socket.transport_pid, {:text, represent_update(item)}) - end - end) - end - - def push_to_socket(topics, topic, %Participation{} = participation) do - Enum.each(topics[topic] || [], fn socket -> - send(socket.transport_pid, {:text, represent_conversation(participation)}) - end) - end - - def push_to_socket(topics, topic, %Activity{ - data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id} - }) do - Enum.each(topics[topic] || [], fn socket -> - send( - socket.transport_pid, - {:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()} - ) - end) - end - - def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop - - def push_to_socket(topics, topic, item) do - Enum.each(topics[topic] || [], fn socket -> - # Get the current user so we have up-to-date blocks etc. - if socket.assigns[:user] do - user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id) - blocks = user.info.blocks || [] - mutes = user.info.mutes || [] - - with true <- Enum.all?([blocks, mutes], &(item.actor not in &1)), - true <- thread_containment(item, user) do - send(socket.transport_pid, {:text, represent_update(item, user)}) - end - else - send(socket.transport_pid, {:text, represent_update(item)}) - end - end) - end - - defp internal_topic(topic, socket) when topic in ~w[user user:notification direct] do - "#{topic}:#{socket.assigns[:user].id}" - end - - defp internal_topic(topic, _), do: topic - - @spec thread_containment(Activity.t(), User.t()) :: boolean() - defp thread_containment(_activity, %User{info: %{skip_thread_containment: true}}), do: true - - defp thread_containment(activity, user) do - if Config.get([:instance, :skip_thread_containment]) do - true - else - ActivityPub.contain_activity(activity, user) - end - end -end diff --git a/lib/pleroma/web/streamer/ping.ex b/lib/pleroma/web/streamer/ping.ex new file mode 100644 index 000000000..f77cbb95c --- /dev/null +++ b/lib/pleroma/web/streamer/ping.ex @@ -0,0 +1,33 @@ +defmodule Pleroma.Web.Streamer.Ping do + use GenServer + require Logger + + alias Pleroma.Web.Streamer.State + alias Pleroma.Web.Streamer.StreamerSocket + + @keepalive_interval :timer.seconds(30) + + def start_link(opts) do + ping_interval = Keyword.get(opts, :ping_interval, @keepalive_interval) + GenServer.start_link(__MODULE__, %{ping_interval: ping_interval}, name: __MODULE__) + end + + def init(%{ping_interval: ping_interval} = args) do + Process.send_after(self(), :ping, ping_interval) + {:ok, args} + end + + def handle_info(:ping, %{ping_interval: ping_interval} = state) do + State.get_sockets() + |> Map.values() + |> List.flatten() + |> Enum.each(fn %StreamerSocket{transport_pid: transport_pid} -> + Logger.debug("Sending keepalive ping") + send(transport_pid, {:text, ""}) + end) + + Process.send_after(self(), :ping, ping_interval) + + {:noreply, state} + end +end diff --git a/lib/pleroma/web/streamer/state.ex b/lib/pleroma/web/streamer/state.ex new file mode 100644 index 000000000..7b5199068 --- /dev/null +++ b/lib/pleroma/web/streamer/state.ex @@ -0,0 +1,68 @@ +defmodule Pleroma.Web.Streamer.State do + use GenServer + require Logger + + alias Pleroma.Web.Streamer.StreamerSocket + + def start_link(_) do + GenServer.start_link(__MODULE__, %{sockets: %{}}, name: __MODULE__) + end + + def add_socket(topic, socket) do + GenServer.call(__MODULE__, {:add, socket, topic}) + end + + def remove_socket(topic, socket) do + GenServer.call(__MODULE__, {:remove, socket, topic}) + end + + def get_sockets do + %{sockets: stream_sockets} = GenServer.call(__MODULE__, :get_state) + stream_sockets + end + + def init(init_arg) do + {:ok, init_arg} + end + + def handle_call(:get_state, _from, state) do + {:reply, state, state} + end + + def handle_call({:add, socket, topic}, _from, %{sockets: sockets} = state) do + internal_topic = internal_topic(topic, socket) + stream_socket = StreamerSocket.from_socket(socket) + + sockets_for_topic = + sockets + |> Map.get(internal_topic, []) + |> List.insert_at(0, stream_socket) + |> Enum.uniq() + + state = put_in(state, [:sockets, internal_topic], sockets_for_topic) + Logger.debug("Got new conn for #{topic}") + {:reply, state, state} + end + + def handle_call({:remove, socket, topic}, _from, %{sockets: sockets} = state) do + internal_topic = internal_topic(topic, socket) + stream_socket = StreamerSocket.from_socket(socket) + + sockets_for_topic = + sockets + |> Map.get(internal_topic, []) + |> List.delete(stream_socket) + + state = Kernel.put_in(state, [:sockets, internal_topic], sockets_for_topic) + {:reply, state, state} + end + + defp internal_topic(topic, socket) + when topic in ~w[user user:notification direct] do + "#{topic}:#{socket.assigns[:user].id}" + end + + defp internal_topic(topic, _) do + topic + end +end diff --git a/lib/pleroma/web/streamer/streamer.ex b/lib/pleroma/web/streamer/streamer.ex new file mode 100644 index 000000000..8cf719277 --- /dev/null +++ b/lib/pleroma/web/streamer/streamer.ex @@ -0,0 +1,55 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Web.Streamer do + alias Pleroma.Web.Streamer.State + alias Pleroma.Web.Streamer.Worker + + @timeout 60_000 + @mix_env Mix.env() + + def add_socket(topic, socket) do + State.add_socket(topic, socket) + end + + def remove_socket(topic, socket) do + State.remove_socket(topic, socket) + end + + def get_sockets do + State.get_sockets() + end + + def stream(topics, items) do + if should_send?() do + Task.async(fn -> + :poolboy.transaction( + :streamer_worker, + &Worker.stream(&1, topics, items), + @timeout + ) + end) + end + end + + def supervisor, do: Pleroma.Web.Streamer.Supervisor + + defp should_send? do + handle_should_send(@mix_env) + end + + defp handle_should_send(:test) do + case Process.whereis(:streamer_worker) do + nil -> + false + + pid -> + Process.alive?(pid) + end + end + + defp handle_should_send(_) do + true + end +end diff --git a/lib/pleroma/web/streamer/streamer_socket.ex b/lib/pleroma/web/streamer/streamer_socket.ex new file mode 100644 index 000000000..f006c0306 --- /dev/null +++ b/lib/pleroma/web/streamer/streamer_socket.ex @@ -0,0 +1,31 @@ +defmodule Pleroma.Web.Streamer.StreamerSocket do + defstruct transport_pid: nil, user: nil + + alias Pleroma.User + alias Pleroma.Web.Streamer.StreamerSocket + + def from_socket(%{ + transport_pid: transport_pid, + assigns: %{user: nil} + }) do + %StreamerSocket{ + transport_pid: transport_pid + } + end + + def from_socket(%{ + transport_pid: transport_pid, + assigns: %{user: %User{} = user} + }) do + %StreamerSocket{ + transport_pid: transport_pid, + user: user + } + end + + def from_socket(%{transport_pid: transport_pid}) do + %StreamerSocket{ + transport_pid: transport_pid + } + end +end diff --git a/lib/pleroma/web/streamer/supervisor.ex b/lib/pleroma/web/streamer/supervisor.ex new file mode 100644 index 000000000..6afe19323 --- /dev/null +++ b/lib/pleroma/web/streamer/supervisor.ex @@ -0,0 +1,33 @@ +defmodule Pleroma.Web.Streamer.Supervisor do + use Supervisor + + def start_link(opts) do + Supervisor.start_link(__MODULE__, opts, name: __MODULE__) + end + + def init(args) do + children = [ + {Pleroma.Web.Streamer.State, args}, + {Pleroma.Web.Streamer.Ping, args}, + :poolboy.child_spec(:streamer_worker, poolboy_config()) + ] + + opts = [strategy: :one_for_one, name: Pleroma.Web.Streamer.Supervisor] + Supervisor.init(children, opts) + end + + defp poolboy_config do + opts = + Pleroma.Config.get(:streamer, + workers: 3, + overflow_workers: 2 + ) + + [ + {:name, {:local, :streamer_worker}}, + {:worker_module, Pleroma.Web.Streamer.Worker}, + {:size, opts[:workers]}, + {:max_overflow, opts[:overflow_workers]} + ] + end +end diff --git a/lib/pleroma/web/streamer/worker.ex b/lib/pleroma/web/streamer/worker.ex new file mode 100644 index 000000000..5804508eb --- /dev/null +++ b/lib/pleroma/web/streamer/worker.ex @@ -0,0 +1,220 @@ +defmodule Pleroma.Web.Streamer.Worker do + use GenServer + + require Logger + + alias Pleroma.Activity + alias Pleroma.Config + alias Pleroma.Conversation.Participation + alias Pleroma.Notification + alias Pleroma.Object + alias Pleroma.User + alias Pleroma.Web.ActivityPub.ActivityPub + alias Pleroma.Web.ActivityPub.Visibility + alias Pleroma.Web.CommonAPI + alias Pleroma.Web.Streamer.State + alias Pleroma.Web.Streamer.StreamerSocket + alias Pleroma.Web.StreamerView + + def start_link(_) do + GenServer.start_link(__MODULE__, %{}, []) + end + + def init(init_arg) do + {:ok, init_arg} + end + + def stream(pid, topics, items) do + GenServer.call(pid, {:stream, topics, items}) + end + + def handle_call({:stream, topics, item}, _from, state) when is_list(topics) do + Enum.each(topics, fn t -> + do_stream(%{topic: t, item: item}) + end) + + {:reply, state, state} + end + + def handle_call({:stream, topic, items}, _from, state) when is_list(items) do + Enum.each(items, fn i -> + do_stream(%{topic: topic, item: i}) + end) + + {:reply, state, state} + end + + def handle_call({:stream, topic, item}, _from, state) do + do_stream(%{topic: topic, item: item}) + + {:reply, state, state} + end + + defp do_stream(%{topic: "direct", item: item}) do + recipient_topics = + User.get_recipients_from_activity(item) + |> Enum.map(fn %{id: id} -> "direct:#{id}" end) + + Enum.each(recipient_topics, fn user_topic -> + Logger.debug("Trying to push direct message to #{user_topic}\n\n") + push_to_socket(State.get_sockets(), user_topic, item) + end) + end + + defp do_stream(%{topic: "participation", item: participation}) do + user_topic = "direct:#{participation.user_id}" + Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n") + + push_to_socket(State.get_sockets(), user_topic, participation) + end + + defp do_stream(%{topic: "list", item: item}) do + # filter the recipient list if the activity is not public, see #270. + recipient_lists = + case Visibility.is_public?(item) do + true -> + Pleroma.List.get_lists_from_activity(item) + + _ -> + Pleroma.List.get_lists_from_activity(item) + |> Enum.filter(fn list -> + owner = User.get_cached_by_id(list.user_id) + + Visibility.visible_for_user?(item, owner) + end) + end + + recipient_topics = + recipient_lists + |> Enum.map(fn %{id: id} -> "list:#{id}" end) + + Enum.each(recipient_topics, fn list_topic -> + Logger.debug("Trying to push message to #{list_topic}\n\n") + push_to_socket(State.get_sockets(), list_topic, item) + end) + end + + defp do_stream(%{topic: topic, item: %Notification{} = item}) + when topic in ["user", "user:notification"] do + State.get_sockets() + |> Map.get("#{topic}:#{item.user_id}", []) + |> Enum.each(fn %StreamerSocket{transport_pid: transport_pid, user: socket_user} -> + with %User{} = user <- User.get_cached_by_ap_id(socket_user.ap_id), + true <- should_send?(user, item) do + send(transport_pid, {:text, StreamerView.render("notification.json", socket_user, item)}) + end + end) + end + + defp do_stream(%{topic: "user", item: item}) do + Logger.debug("Trying to push to users") + + recipient_topics = + User.get_recipients_from_activity(item) + |> Enum.map(fn %{id: id} -> "user:#{id}" end) + + Enum.each(recipient_topics, fn topic -> + push_to_socket(State.get_sockets(), topic, item) + end) + end + + defp do_stream(%{topic: topic, item: item}) do + Logger.debug("Trying to push to #{topic}") + Logger.debug("Pushing item to #{topic}") + push_to_socket(State.get_sockets(), topic, item) + end + + defp should_send?(%User{} = user, %Activity{} = item) do + blocks = user.info.blocks || [] + mutes = user.info.mutes || [] + reblog_mutes = user.info.muted_reblogs || [] + domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.info.domain_blocks) + + with parent when not is_nil(parent) <- Object.normalize(item), + true <- Enum.all?([blocks, mutes, reblog_mutes], &(item.actor not in &1)), + true <- Enum.all?([blocks, mutes], &(parent.data["actor"] not in &1)), + %{host: item_host} <- URI.parse(item.actor), + %{host: parent_host} <- URI.parse(parent.data["actor"]), + false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host), + false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host), + true <- thread_containment(item, user), + false <- CommonAPI.thread_muted?(user, item) do + true + else + _ -> false + end + end + + defp should_send?(%User{} = user, %Notification{activity: activity}) do + should_send?(user, activity) + end + + def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do + Enum.each(topics[topic] || [], fn %StreamerSocket{ + transport_pid: transport_pid, + user: socket_user + } -> + # Get the current user so we have up-to-date blocks etc. + if socket_user do + user = User.get_cached_by_ap_id(socket_user.ap_id) + + if should_send?(user, item) do + send(transport_pid, {:text, StreamerView.render("update.json", item, user)}) + end + else + send(transport_pid, {:text, StreamerView.render("update.json", item)}) + end + end) + end + + def push_to_socket(topics, topic, %Participation{} = participation) do + Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} -> + send(transport_pid, {:text, StreamerView.render("conversation.json", participation)}) + end) + end + + def push_to_socket(topics, topic, %Activity{ + data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id} + }) do + Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} -> + send( + transport_pid, + {:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()} + ) + end) + end + + def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop + + def push_to_socket(topics, topic, item) do + Enum.each(topics[topic] || [], fn %StreamerSocket{ + transport_pid: transport_pid, + user: socket_user + } -> + # Get the current user so we have up-to-date blocks etc. + if socket_user do + user = User.get_cached_by_ap_id(socket_user.ap_id) + blocks = user.info.blocks || [] + mutes = user.info.mutes || [] + + with true <- Enum.all?([blocks, mutes], &(item.actor not in &1)), + true <- thread_containment(item, user) do + send(transport_pid, {:text, StreamerView.render("update.json", item, user)}) + end + else + send(transport_pid, {:text, StreamerView.render("update.json", item)}) + end + end) + end + + @spec thread_containment(Activity.t(), User.t()) :: boolean() + defp thread_containment(_activity, %User{info: %{skip_thread_containment: true}}), do: true + + defp thread_containment(activity, user) do + if Config.get([:instance, :skip_thread_containment]) do + true + else + ActivityPub.contain_activity(activity, user) + end + end +end diff --git a/lib/pleroma/web/twitter_api/controllers/util_controller.ex b/lib/pleroma/web/twitter_api/controllers/util_controller.ex index 867787c57..d7745ae7a 100644 --- a/lib/pleroma/web/twitter_api/controllers/util_controller.ex +++ b/lib/pleroma/web/twitter_api/controllers/util_controller.ex @@ -265,12 +265,7 @@ defmodule Pleroma.Web.TwitterAPI.UtilController do String.split(line, ",") |> List.first() end) |> List.delete("Account address") do - PleromaJobQueue.enqueue(:background, User, [ - :follow_import, - follower, - followed_identifiers - ]) - + User.follow_import(follower, followed_identifiers) json(conn, "job started") end end @@ -281,12 +276,7 @@ defmodule Pleroma.Web.TwitterAPI.UtilController do def blocks_import(%{assigns: %{user: blocker}} = conn, %{"list" => list}) do with blocked_identifiers <- String.split(list) do - PleromaJobQueue.enqueue(:background, User, [ - :blocks_import, - blocker, - blocked_identifiers - ]) - + User.blocks_import(blocker, blocked_identifiers) json(conn, "job started") end end diff --git a/lib/pleroma/web/views/streamer_view.ex b/lib/pleroma/web/views/streamer_view.ex new file mode 100644 index 000000000..b13030fa0 --- /dev/null +++ b/lib/pleroma/web/views/streamer_view.ex @@ -0,0 +1,66 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Web.StreamerView do + use Pleroma.Web, :view + + alias Pleroma.Activity + alias Pleroma.Conversation.Participation + alias Pleroma.Notification + alias Pleroma.User + alias Pleroma.Web.MastodonAPI.NotificationView + + def render("update.json", %Activity{} = activity, %User{} = user) do + %{ + event: "update", + payload: + Pleroma.Web.MastodonAPI.StatusView.render( + "status.json", + activity: activity, + for: user + ) + |> Jason.encode!() + } + |> Jason.encode!() + end + + def render("notification.json", %User{} = user, %Notification{} = notify) do + %{ + event: "notification", + payload: + NotificationView.render( + "show.json", + %{notification: notify, for: user} + ) + |> Jason.encode!() + } + |> Jason.encode!() + end + + def render("update.json", %Activity{} = activity) do + %{ + event: "update", + payload: + Pleroma.Web.MastodonAPI.StatusView.render( + "status.json", + activity: activity + ) + |> Jason.encode!() + } + |> Jason.encode!() + end + + def render("conversation.json", %Participation{} = participation) do + %{ + event: "conversation", + payload: + Pleroma.Web.MastodonAPI.ConversationView.render("participation.json", %{ + participation: participation, + for: participation.user + }) + |> Jason.encode!() + } + |> Jason.encode!() + end +end diff --git a/lib/pleroma/workers/activity_expiration_worker.ex b/lib/pleroma/workers/activity_expiration_worker.ex new file mode 100644 index 000000000..4e3e4195f --- /dev/null +++ b/lib/pleroma/workers/activity_expiration_worker.ex @@ -0,0 +1,18 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.ActivityExpirationWorker do + use Pleroma.Workers.WorkerHelper, queue: "activity_expiration" + + @impl Oban.Worker + def perform( + %{ + "op" => "activity_expiration", + "activity_expiration_id" => activity_expiration_id + }, + _job + ) do + Pleroma.Daemons.ActivityExpirationDaemon.perform(:execute, activity_expiration_id) + end +end diff --git a/lib/pleroma/workers/background_worker.ex b/lib/pleroma/workers/background_worker.ex new file mode 100644 index 000000000..082f20ab7 --- /dev/null +++ b/lib/pleroma/workers/background_worker.ex @@ -0,0 +1,69 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.BackgroundWorker do + alias Pleroma.Activity + alias Pleroma.User + alias Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy + alias Pleroma.Web.OAuth.Token.CleanWorker + + use Pleroma.Workers.WorkerHelper, queue: "background" + + @impl Oban.Worker + def perform(%{"op" => "fetch_initial_posts", "user_id" => user_id}, _job) do + user = User.get_cached_by_id(user_id) + User.perform(:fetch_initial_posts, user) + end + + def perform(%{"op" => "deactivate_user", "user_id" => user_id, "status" => status}, _job) do + user = User.get_cached_by_id(user_id) + User.perform(:deactivate_async, user, status) + end + + def perform(%{"op" => "delete_user", "user_id" => user_id}, _job) do + user = User.get_cached_by_id(user_id) + User.perform(:delete, user) + end + + def perform( + %{ + "op" => "blocks_import", + "blocker_id" => blocker_id, + "blocked_identifiers" => blocked_identifiers + }, + _job + ) do + blocker = User.get_cached_by_id(blocker_id) + User.perform(:blocks_import, blocker, blocked_identifiers) + end + + def perform( + %{ + "op" => "follow_import", + "follower_id" => follower_id, + "followed_identifiers" => followed_identifiers + }, + _job + ) do + follower = User.get_cached_by_id(follower_id) + User.perform(:follow_import, follower, followed_identifiers) + end + + def perform(%{"op" => "clean_expired_tokens"}, _job) do + CleanWorker.perform(:clean) + end + + def perform(%{"op" => "media_proxy_preload", "message" => message}, _job) do + MediaProxyWarmingPolicy.perform(:preload, message) + end + + def perform(%{"op" => "media_proxy_prefetch", "url" => url}, _job) do + MediaProxyWarmingPolicy.perform(:prefetch, url) + end + + def perform(%{"op" => "fetch_data_for_activity", "activity_id" => activity_id}, _job) do + activity = Activity.get_by_id(activity_id) + Pleroma.Web.RichMedia.Helpers.perform(:fetch, activity) + end +end diff --git a/lib/pleroma/workers/digest_emails_worker.ex b/lib/pleroma/workers/digest_emails_worker.ex new file mode 100644 index 000000000..3e5a836d0 --- /dev/null +++ b/lib/pleroma/workers/digest_emails_worker.ex @@ -0,0 +1,16 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.DigestEmailsWorker do + alias Pleroma.User + + use Pleroma.Workers.WorkerHelper, queue: "digest_emails" + + @impl Oban.Worker + def perform(%{"op" => "digest_email", "user_id" => user_id}, _job) do + user_id + |> User.get_cached_by_id() + |> Pleroma.Daemons.DigestEmailDaemon.perform() + end +end diff --git a/lib/pleroma/workers/mailer_worker.ex b/lib/pleroma/workers/mailer_worker.ex new file mode 100644 index 000000000..1b7a0eb3e --- /dev/null +++ b/lib/pleroma/workers/mailer_worker.ex @@ -0,0 +1,15 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.MailerWorker do + use Pleroma.Workers.WorkerHelper, queue: "mailer" + + @impl Oban.Worker + def perform(%{"op" => "email", "encoded_email" => encoded_email, "config" => config}, _job) do + encoded_email + |> Base.decode64!() + |> :erlang.binary_to_term() + |> Pleroma.Emails.Mailer.deliver(config) + end +end diff --git a/lib/pleroma/workers/publisher_worker.ex b/lib/pleroma/workers/publisher_worker.ex new file mode 100644 index 000000000..455f7fc7e --- /dev/null +++ b/lib/pleroma/workers/publisher_worker.ex @@ -0,0 +1,25 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.PublisherWorker do + alias Pleroma.Activity + alias Pleroma.Web.Federator + + use Pleroma.Workers.WorkerHelper, queue: "federator_outgoing" + + def backoff(attempt) when is_integer(attempt) do + Pleroma.Workers.WorkerHelper.sidekiq_backoff(attempt, 5) + end + + @impl Oban.Worker + def perform(%{"op" => "publish", "activity_id" => activity_id}, _job) do + activity = Activity.get_by_id(activity_id) + Federator.perform(:publish, activity) + end + + def perform(%{"op" => "publish_one", "module" => module_name, "params" => params}, _job) do + params = Map.new(params, fn {k, v} -> {String.to_atom(k), v} end) + Federator.perform(:publish_one, String.to_atom(module_name), params) + end +end diff --git a/lib/pleroma/workers/receiver_worker.ex b/lib/pleroma/workers/receiver_worker.ex new file mode 100644 index 000000000..83d528a66 --- /dev/null +++ b/lib/pleroma/workers/receiver_worker.ex @@ -0,0 +1,18 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.ReceiverWorker do + alias Pleroma.Web.Federator + + use Pleroma.Workers.WorkerHelper, queue: "federator_incoming" + + @impl Oban.Worker + def perform(%{"op" => "incoming_doc", "body" => doc}, _job) do + Federator.perform(:incoming_doc, doc) + end + + def perform(%{"op" => "incoming_ap_doc", "params" => params}, _job) do + Federator.perform(:incoming_ap_doc, params) + end +end diff --git a/lib/pleroma/workers/scheduled_activity_worker.ex b/lib/pleroma/workers/scheduled_activity_worker.ex new file mode 100644 index 000000000..ca7d53af1 --- /dev/null +++ b/lib/pleroma/workers/scheduled_activity_worker.ex @@ -0,0 +1,12 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.ScheduledActivityWorker do + use Pleroma.Workers.WorkerHelper, queue: "scheduled_activities" + + @impl Oban.Worker + def perform(%{"op" => "execute", "activity_id" => activity_id}, _job) do + Pleroma.Daemons.ScheduledActivityDaemon.perform(:execute, activity_id) + end +end diff --git a/lib/pleroma/workers/subscriber_worker.ex b/lib/pleroma/workers/subscriber_worker.ex new file mode 100644 index 000000000..fc490e300 --- /dev/null +++ b/lib/pleroma/workers/subscriber_worker.ex @@ -0,0 +1,26 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.SubscriberWorker do + alias Pleroma.Repo + alias Pleroma.Web.Federator + alias Pleroma.Web.Websub + + use Pleroma.Workers.WorkerHelper, queue: "federator_outgoing" + + @impl Oban.Worker + def perform(%{"op" => "refresh_subscriptions"}, _job) do + Federator.perform(:refresh_subscriptions) + end + + def perform(%{"op" => "request_subscription", "websub_id" => websub_id}, _job) do + websub = Repo.get(Websub.WebsubClientSubscription, websub_id) + Federator.perform(:request_subscription, websub) + end + + def perform(%{"op" => "verify_websub", "websub_id" => websub_id}, _job) do + websub = Repo.get(Websub.WebsubServerSubscription, websub_id) + Federator.perform(:verify_websub, websub) + end +end diff --git a/lib/pleroma/workers/transmogrifier_worker.ex b/lib/pleroma/workers/transmogrifier_worker.ex new file mode 100644 index 000000000..b581a2f86 --- /dev/null +++ b/lib/pleroma/workers/transmogrifier_worker.ex @@ -0,0 +1,15 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.TransmogrifierWorker do + alias Pleroma.User + + use Pleroma.Workers.WorkerHelper, queue: "transmogrifier" + + @impl Oban.Worker + def perform(%{"op" => "user_upgrade", "user_id" => user_id}, _job) do + user = User.get_cached_by_id(user_id) + Pleroma.Web.ActivityPub.Transmogrifier.perform(:user_upgrade, user) + end +end diff --git a/lib/pleroma/workers/web_pusher_worker.ex b/lib/pleroma/workers/web_pusher_worker.ex new file mode 100644 index 000000000..bea2baffb --- /dev/null +++ b/lib/pleroma/workers/web_pusher_worker.ex @@ -0,0 +1,16 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.WebPusherWorker do + alias Pleroma.Notification + alias Pleroma.Repo + + use Pleroma.Workers.WorkerHelper, queue: "web_push" + + @impl Oban.Worker + def perform(%{"op" => "web_push", "notification_id" => notification_id}, _job) do + notification = Repo.get(Notification, notification_id) + Pleroma.Web.Push.Impl.perform(notification) + end +end diff --git a/lib/pleroma/workers/worker_helper.ex b/lib/pleroma/workers/worker_helper.ex new file mode 100644 index 000000000..358efa14a --- /dev/null +++ b/lib/pleroma/workers/worker_helper.ex @@ -0,0 +1,46 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.WorkerHelper do + alias Pleroma.Config + alias Pleroma.Workers.WorkerHelper + + def worker_args(queue) do + case Config.get([:workers, :retries, queue]) do + nil -> [] + max_attempts -> [max_attempts: max_attempts] + end + end + + def sidekiq_backoff(attempt, pow \\ 4, base_backoff \\ 15) do + backoff = + :math.pow(attempt, pow) + + base_backoff + + :rand.uniform(2 * base_backoff) * attempt + + trunc(backoff) + end + + defmacro __using__(opts) do + caller_module = __CALLER__.module + queue = Keyword.fetch!(opts, :queue) + + quote do + # Note: `max_attempts` is intended to be overridden in `new/2` call + use Oban.Worker, + queue: unquote(queue), + max_attempts: 1 + + def enqueue(op, params, worker_args \\ []) do + params = Map.merge(%{"op" => op}, params) + queue_atom = String.to_atom(unquote(queue)) + worker_args = worker_args ++ WorkerHelper.worker_args(queue_atom) + + unquote(caller_module) + |> apply(:new, [params, worker_args]) + |> Pleroma.Repo.insert() + end + end + end +end |