diff options
author | Maksim Pechnikov <parallel588@gmail.com> | 2019-09-25 12:24:12 +0300 |
---|---|---|
committer | Maksim Pechnikov <parallel588@gmail.com> | 2019-09-25 12:24:12 +0300 |
commit | 1a858134edabfe9a85e07fb801b9ed41649ba08a (patch) | |
tree | 9acff5da3ff7a09f5b6f1e33ee9ac225471b4da6 /lib/pleroma/web | |
parent | 8c6cdff3cc48101711d0f09852866311780d97db (diff) | |
parent | 29dd8ab9c0ef28f9649fe0a5b29a0bbcfb4c0965 (diff) | |
download | pleroma-1a858134edabfe9a85e07fb801b9ed41649ba08a.tar.gz |
Merge branch 'develop' into issue/1218
Diffstat (limited to 'lib/pleroma/web')
43 files changed, 1925 insertions, 1374 deletions
diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index d23ec66ac..1cf8b6151 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -4,6 +4,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do alias Pleroma.Activity + alias Pleroma.Activity.Ir.Topics alias Pleroma.Config alias Pleroma.Conversation alias Pleroma.Notification @@ -16,7 +17,9 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do alias Pleroma.User alias Pleroma.Web.ActivityPub.MRF alias Pleroma.Web.ActivityPub.Transmogrifier + alias Pleroma.Web.Streamer alias Pleroma.Web.WebFinger + alias Pleroma.Workers.BackgroundWorker import Ecto.Query import Pleroma.Web.ActivityPub.Utils @@ -145,7 +148,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do activity end - PleromaJobQueue.enqueue(:background, Pleroma.Web.RichMedia.Helpers, [:fetch, activity]) + BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id}) Notification.create_notifications(activity) @@ -186,9 +189,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do participations |> Repo.preload(:user) - Enum.each(participations, fn participation -> - Pleroma.Web.Streamer.stream("participation", participation) - end) + Streamer.stream("participation", participations) end def stream_out_participations(%Object{data: %{"context" => context}}, user) do @@ -207,41 +208,15 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do def stream_out_participations(_, _), do: :noop - def stream_out(activity) do - if activity.data["type"] in ["Create", "Announce", "Delete"] do - object = Object.normalize(activity) - # Do not stream out poll replies - unless object.data["type"] == "Answer" do - Pleroma.Web.Streamer.stream("user", activity) - Pleroma.Web.Streamer.stream("list", activity) - - if get_visibility(activity) == "public" do - Pleroma.Web.Streamer.stream("public", activity) - - if activity.local do - Pleroma.Web.Streamer.stream("public:local", activity) - end - - if activity.data["type"] in ["Create"] do - object.data - |> Map.get("tag", []) - |> Enum.filter(fn tag -> is_bitstring(tag) end) - |> Enum.each(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end) - - if object.data["attachment"] != [] do - Pleroma.Web.Streamer.stream("public:media", activity) - - if activity.local do - Pleroma.Web.Streamer.stream("public:local:media", activity) - end - end - end - else - if get_visibility(activity) == "direct", - do: Pleroma.Web.Streamer.stream("direct", activity) - end - end - end + def stream_out(%Activity{data: %{"type" => data_type}} = activity) + when data_type in ["Create", "Announce", "Delete"] do + activity + |> Topics.get_activity_topics() + |> Streamer.stream(activity) + end + + def stream_out(_activity) do + :noop end def create(%{to: to, actor: actor, context: context, object: object} = params, fake \\ false) do @@ -435,6 +410,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do end end + @spec block(User.t(), User.t(), String.t() | nil, boolean) :: {:ok, Activity.t() | nil} def block(blocker, blocked, activity_id \\ nil, local \\ true) do outgoing_blocks = Config.get([:activitypub, :outgoing_blocks]) unfollow_blocked = Config.get([:activitypub, :unfollow_blocked]) @@ -463,10 +439,11 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do end end + @spec flag(map()) :: {:ok, Activity.t()} | any def flag( %{ actor: actor, - context: context, + context: _context, account: account, statuses: statuses, content: content @@ -478,14 +455,6 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do additional = params[:additional] || %{} - params = %{ - actor: actor, - context: context, - account: account, - statuses: statuses, - content: content - } - additional = if forward do Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]}) @@ -551,9 +520,10 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do end def fetch_public_activities(opts \\ %{}) do - q = fetch_activities_query([Pleroma.Constants.as_public()], opts) + opts = Map.drop(opts, ["user"]) - q + [Pleroma.Constants.as_public()] + |> fetch_activities_query(opts) |> restrict_unlisted() |> Pagination.fetch_paginated(opts) |> Enum.reverse() diff --git a/lib/pleroma/web/activity_pub/activity_pub_controller.ex b/lib/pleroma/web/activity_pub/activity_pub_controller.ex index 08bf1c752..9eb86106f 100644 --- a/lib/pleroma/web/activity_pub/activity_pub_controller.ex +++ b/lib/pleroma/web/activity_pub/activity_pub_controller.ex @@ -6,6 +6,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do use Pleroma.Web, :controller alias Pleroma.Activity + alias Pleroma.Delivery alias Pleroma.Object alias Pleroma.Object.Fetcher alias Pleroma.User @@ -23,6 +24,12 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do action_fallback(:errors) + plug( + Pleroma.Plugs.Cache, + [query_params: false, tracking_fun: &__MODULE__.track_object_fetch/2] + when action in [:activity, :object] + ) + plug(Pleroma.Web.FederatingPlug when action in [:inbox, :relay]) plug(:set_requester_reachable when action in [:inbox]) plug(:relay_active? when action in [:relay]) @@ -42,7 +49,8 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do {:ok, user} <- User.ensure_keys_present(user) do conn |> put_resp_content_type("application/activity+json") - |> json(UserView.render("user.json", %{user: user})) + |> put_view(UserView) + |> render("user.json", %{user: user}) else nil -> {:error, :not_found} end @@ -53,14 +61,27 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do %Object{} = object <- Object.get_cached_by_ap_id(ap_id), {_, true} <- {:public?, Visibility.is_public?(object)} do conn + |> assign(:tracking_fun_data, object.id) + |> set_cache_ttl_for(object) |> put_resp_content_type("application/activity+json") - |> json(ObjectView.render("object.json", %{object: object})) + |> put_view(ObjectView) + |> render("object.json", object: object) else {:public?, false} -> {:error, :not_found} end end + def track_object_fetch(conn, nil), do: conn + + def track_object_fetch(conn, object_id) do + with %{assigns: %{user: %User{id: user_id}}} <- conn do + Delivery.create(object_id, user_id) + end + + conn + end + def object_likes(conn, %{"uuid" => uuid, "page" => page}) do with ap_id <- o_status_url(conn, :object, uuid), %Object{} = object <- Object.get_cached_by_ap_id(ap_id), @@ -70,7 +91,8 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do conn |> put_resp_content_type("application/activity+json") - |> json(ObjectView.render("likes.json", ap_id, likes, page)) + |> put_view(ObjectView) + |> render("likes.json", %{ap_id: ap_id, likes: likes, page: page}) else {:public?, false} -> {:error, :not_found} @@ -84,7 +106,8 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do likes <- Utils.get_object_likes(object) do conn |> put_resp_content_type("application/activity+json") - |> json(ObjectView.render("likes.json", ap_id, likes)) + |> put_view(ObjectView) + |> render("likes.json", %{ap_id: ap_id, likes: likes}) else {:public?, false} -> {:error, :not_found} @@ -96,19 +119,50 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do %Activity{} = activity <- Activity.normalize(ap_id), {_, true} <- {:public?, Visibility.is_public?(activity)} do conn + |> maybe_set_tracking_data(activity) + |> set_cache_ttl_for(activity) |> put_resp_content_type("application/activity+json") - |> json(ObjectView.render("object.json", %{object: activity})) + |> put_view(ObjectView) + |> render("object.json", object: activity) else - {:public?, false} -> - {:error, :not_found} + {:public?, false} -> {:error, :not_found} + nil -> {:error, :not_found} end end + defp maybe_set_tracking_data(conn, %Activity{data: %{"type" => "Create"}} = activity) do + object_id = Object.normalize(activity).id + assign(conn, :tracking_fun_data, object_id) + end + + defp maybe_set_tracking_data(conn, _activity), do: conn + + defp set_cache_ttl_for(conn, %Activity{object: object}) do + set_cache_ttl_for(conn, object) + end + + defp set_cache_ttl_for(conn, entity) do + ttl = + case entity do + %Object{data: %{"type" => "Question"}} -> + Pleroma.Config.get([:web_cache_ttl, :activity_pub_question]) + + %Object{} -> + Pleroma.Config.get([:web_cache_ttl, :activity_pub]) + + _ -> + nil + end + + assign(conn, :cache_ttl, ttl) + end + # GET /relay/following def following(%{assigns: %{relay: true}} = conn, _params) do conn |> put_resp_content_type("application/activity+json") - |> json(UserView.render("following.json", %{user: Relay.get_actor()})) + |> put_view(UserView) + |> render("following.json", %{user: Relay.get_actor()}) end def following(%{assigns: %{user: for_user}} = conn, %{"nickname" => nickname, "page" => page}) do @@ -120,7 +174,8 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do conn |> put_resp_content_type("application/activity+json") - |> json(UserView.render("following.json", %{user: user, page: page, for: for_user})) + |> put_view(UserView) + |> render("following.json", %{user: user, page: page, for: for_user}) else {:show_follows, _} -> conn @@ -134,7 +189,8 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do {user, for_user} <- ensure_user_keys_present_and_maybe_refresh_for_user(user, for_user) do conn |> put_resp_content_type("application/activity+json") - |> json(UserView.render("following.json", %{user: user, for: for_user})) + |> put_view(UserView) + |> render("following.json", %{user: user, for: for_user}) end end @@ -142,7 +198,8 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do def followers(%{assigns: %{relay: true}} = conn, _params) do conn |> put_resp_content_type("application/activity+json") - |> json(UserView.render("followers.json", %{user: Relay.get_actor()})) + |> put_view(UserView) + |> render("followers.json", %{user: Relay.get_actor()}) end def followers(%{assigns: %{user: for_user}} = conn, %{"nickname" => nickname, "page" => page}) do @@ -154,7 +211,8 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do conn |> put_resp_content_type("application/activity+json") - |> json(UserView.render("followers.json", %{user: user, page: page, for: for_user})) + |> put_view(UserView) + |> render("followers.json", %{user: user, page: page, for: for_user}) else {:show_followers, _} -> conn @@ -168,7 +226,8 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do {user, for_user} <- ensure_user_keys_present_and_maybe_refresh_for_user(user, for_user) do conn |> put_resp_content_type("application/activity+json") - |> json(UserView.render("followers.json", %{user: user, for: for_user})) + |> put_view(UserView) + |> render("followers.json", %{user: user, for: for_user}) end end @@ -177,7 +236,8 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do {:ok, user} <- User.ensure_keys_present(user) do conn |> put_resp_content_type("application/activity+json") - |> json(UserView.render("outbox.json", %{user: user, max_id: params["max_id"]})) + |> put_view(UserView) + |> render("outbox.json", %{user: user, max_id: params["max_id"]}) end end @@ -225,7 +285,8 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do with {:ok, user} <- User.ensure_keys_present(user) do conn |> put_resp_content_type("application/activity+json") - |> json(UserView.render("user.json", %{user: user})) + |> put_view(UserView) + |> render("user.json", %{user: user}) else nil -> {:error, :not_found} end @@ -246,27 +307,42 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do def whoami(%{assigns: %{user: %User{} = user}} = conn, _params) do conn |> put_resp_content_type("application/activity+json") - |> json(UserView.render("user.json", %{user: user})) + |> put_view(UserView) + |> render("user.json", %{user: user}) end def whoami(_conn, _params), do: {:error, :not_found} - def read_inbox(%{assigns: %{user: user}} = conn, %{"nickname" => nickname} = params) do - if nickname == user.nickname do - conn - |> put_resp_content_type("application/activity+json") - |> json(UserView.render("inbox.json", %{user: user, max_id: params["max_id"]})) - else - err = - dgettext("errors", "can't read inbox of %{nickname} as %{as_nickname}", - nickname: nickname, - as_nickname: user.nickname - ) + def read_inbox( + %{assigns: %{user: %{nickname: nickname} = user}} = conn, + %{"nickname" => nickname} = params + ) do + conn + |> put_resp_content_type("application/activity+json") + |> put_view(UserView) + |> render("inbox.json", user: user, max_id: params["max_id"]) + end - conn - |> put_status(:forbidden) - |> json(err) - end + def read_inbox(%{assigns: %{user: nil}} = conn, %{"nickname" => nickname}) do + err = dgettext("errors", "can't read inbox of %{nickname}", nickname: nickname) + + conn + |> put_status(:forbidden) + |> json(err) + end + + def read_inbox(%{assigns: %{user: %{nickname: as_nickname}}} = conn, %{ + "nickname" => nickname + }) do + err = + dgettext("errors", "can't read inbox of %{nickname} as %{as_nickname}", + nickname: nickname, + as_nickname: as_nickname + ) + + conn + |> put_status(:forbidden) + |> json(err) end def handle_user_activity(user, %{"type" => "Create"} = params) do diff --git a/lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex b/lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex index a179dd54d..26b8539fe 100644 --- a/lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex +++ b/lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex @@ -8,6 +8,7 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do alias Pleroma.HTTP alias Pleroma.Web.MediaProxy + alias Pleroma.Workers.BackgroundWorker require Logger @@ -30,7 +31,7 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do url |> Enum.each(fn %{"href" => href} -> - PleromaJobQueue.enqueue(:background, __MODULE__, [:prefetch, href]) + BackgroundWorker.enqueue("media_proxy_prefetch", %{"url" => href}) x -> Logger.debug("Unhandled attachment URL object #{inspect(x)}") @@ -46,7 +47,7 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do %{"type" => "Create", "object" => %{"attachment" => attachments} = _object} = message ) when is_list(attachments) and length(attachments) > 0 do - PleromaJobQueue.enqueue(:background, __MODULE__, [:preload, message]) + BackgroundWorker.enqueue("media_proxy_preload", %{"message" => message}) {:ok, message} end diff --git a/lib/pleroma/web/activity_pub/publisher.ex b/lib/pleroma/web/activity_pub/publisher.ex index c97405690..114251b24 100644 --- a/lib/pleroma/web/activity_pub/publisher.ex +++ b/lib/pleroma/web/activity_pub/publisher.ex @@ -5,8 +5,10 @@ defmodule Pleroma.Web.ActivityPub.Publisher do alias Pleroma.Activity alias Pleroma.Config + alias Pleroma.Delivery alias Pleroma.HTTP alias Pleroma.Instances + alias Pleroma.Object alias Pleroma.User alias Pleroma.Web.ActivityPub.Relay alias Pleroma.Web.ActivityPub.Transmogrifier @@ -84,6 +86,15 @@ defmodule Pleroma.Web.ActivityPub.Publisher do end end + def publish_one(%{actor_id: actor_id} = params) do + actor = User.get_cached_by_id(actor_id) + + params + |> Map.delete(:actor_id) + |> Map.put(:actor, actor) + |> publish_one() + end + defp should_federate?(inbox, public) do if public do true @@ -107,7 +118,18 @@ defmodule Pleroma.Web.ActivityPub.Publisher do {:ok, []} end - Pleroma.Web.Salmon.remote_users(actor, activity) ++ followers + fetchers = + with %Activity{data: %{"type" => "Delete"}} <- activity, + %Object{id: object_id} <- Object.normalize(activity), + fetchers <- User.get_delivered_users_by_object_id(object_id), + _ <- Delivery.delete_all_by_object_id(object_id) do + fetchers + else + _ -> + [] + end + + Pleroma.Web.Salmon.remote_users(actor, activity) ++ followers ++ fetchers end defp get_cc_ap_ids(ap_id, recipients) do @@ -159,7 +181,8 @@ defmodule Pleroma.Web.ActivityPub.Publisher do Publishes an activity with BCC to all relevant peers. """ - def publish(actor, %{data: %{"bcc" => bcc}} = activity) when is_list(bcc) and bcc != [] do + def publish(%User{} = actor, %{data: %{"bcc" => bcc}} = activity) + when is_list(bcc) and bcc != [] do public = is_public?(activity) {:ok, data} = Transmogrifier.prepare_outgoing(activity.data) @@ -186,7 +209,7 @@ defmodule Pleroma.Web.ActivityPub.Publisher do Pleroma.Web.Federator.Publisher.enqueue_one(__MODULE__, %{ inbox: inbox, json: json, - actor: actor, + actor_id: actor.id, id: activity.data["id"], unreachable_since: unreachable_since }) @@ -221,7 +244,7 @@ defmodule Pleroma.Web.ActivityPub.Publisher do %{ inbox: inbox, json: json, - actor: actor, + actor_id: actor.id, id: activity.data["id"], unreachable_since: unreachable_since } diff --git a/lib/pleroma/web/activity_pub/transmogrifier.ex b/lib/pleroma/web/activity_pub/transmogrifier.ex index 468961bd0..dad2fead8 100644 --- a/lib/pleroma/web/activity_pub/transmogrifier.ex +++ b/lib/pleroma/web/activity_pub/transmogrifier.ex @@ -15,6 +15,7 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do alias Pleroma.Web.ActivityPub.Utils alias Pleroma.Web.ActivityPub.Visibility alias Pleroma.Web.Federator + alias Pleroma.Workers.TransmogrifierWorker import Ecto.Query @@ -41,8 +42,7 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do end def fix_summary(%{"summary" => nil} = object) do - object - |> Map.put("summary", "") + Map.put(object, "summary", "") end def fix_summary(%{"summary" => _} = object) do @@ -50,10 +50,7 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do object end - def fix_summary(object) do - object - |> Map.put("summary", "") - end + def fix_summary(object), do: Map.put(object, "summary", "") def fix_addressing_list(map, field) do cond do @@ -73,13 +70,9 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do explicit_mentions, follower_collection ) do - explicit_to = - to - |> Enum.filter(fn x -> x in explicit_mentions end) + explicit_to = Enum.filter(to, fn x -> x in explicit_mentions end) - explicit_cc = - to - |> Enum.filter(fn x -> x not in explicit_mentions end) + explicit_cc = Enum.filter(to, fn x -> x not in explicit_mentions end) final_cc = (cc ++ explicit_cc) @@ -97,13 +90,19 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do def fix_explicit_addressing(%{"directMessage" => true} = object), do: object def fix_explicit_addressing(object) do - explicit_mentions = - object - |> Utils.determine_explicit_mentions() + explicit_mentions = Utils.determine_explicit_mentions(object) - follower_collection = User.get_cached_by_ap_id(Containment.get_actor(object)).follower_address + %User{follower_address: follower_collection} = + object + |> Containment.get_actor() + |> User.get_cached_by_ap_id() - explicit_mentions = explicit_mentions ++ [Pleroma.Constants.as_public(), follower_collection] + explicit_mentions = + explicit_mentions ++ + [ + Pleroma.Constants.as_public(), + follower_collection + ] fix_explicit_addressing(object, explicit_mentions, follower_collection) end @@ -147,50 +146,27 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do end def fix_actor(%{"attributedTo" => actor} = object) do - object - |> Map.put("actor", Containment.get_actor(%{"actor" => actor})) + Map.put(object, "actor", Containment.get_actor(%{"actor" => actor})) end def fix_in_reply_to(object, options \\ []) def fix_in_reply_to(%{"inReplyTo" => in_reply_to} = object, options) when not is_nil(in_reply_to) do - in_reply_to_id = - cond do - is_bitstring(in_reply_to) -> - in_reply_to - - is_map(in_reply_to) && is_bitstring(in_reply_to["id"]) -> - in_reply_to["id"] - - is_list(in_reply_to) && is_bitstring(Enum.at(in_reply_to, 0)) -> - Enum.at(in_reply_to, 0) - - # Maybe I should output an error too? - true -> - "" - end - + in_reply_to_id = prepare_in_reply_to(in_reply_to) object = Map.put(object, "inReplyToAtomUri", in_reply_to_id) if Federator.allowed_incoming_reply_depth?(options[:depth]) do - case get_obj_helper(in_reply_to_id, options) do - {:ok, replied_object} -> - with %Activity{} = _activity <- - Activity.get_create_by_object_ap_id(replied_object.data["id"]) do - object - |> Map.put("inReplyTo", replied_object.data["id"]) - |> Map.put("inReplyToAtomUri", object["inReplyToAtomUri"] || in_reply_to_id) - |> Map.put("conversation", replied_object.data["context"] || object["conversation"]) - |> Map.put("context", replied_object.data["context"] || object["conversation"]) - else - e -> - Logger.error("Couldn't fetch \"#{inspect(in_reply_to_id)}\", error: #{inspect(e)}") - object - end - + with {:ok, replied_object} <- get_obj_helper(in_reply_to_id, options), + %Activity{} = _ <- Activity.get_create_by_object_ap_id(replied_object.data["id"]) do + object + |> Map.put("inReplyTo", replied_object.data["id"]) + |> Map.put("inReplyToAtomUri", object["inReplyToAtomUri"] || in_reply_to_id) + |> Map.put("conversation", replied_object.data["context"] || object["conversation"]) + |> Map.put("context", replied_object.data["context"] || object["conversation"]) + else e -> - Logger.error("Couldn't fetch \"#{inspect(in_reply_to_id)}\", error: #{inspect(e)}") + Logger.error("Couldn't fetch #{inspect(in_reply_to_id)}, error: #{inspect(e)}") object end else @@ -200,6 +176,22 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do def fix_in_reply_to(object, _options), do: object + defp prepare_in_reply_to(in_reply_to) do + cond do + is_bitstring(in_reply_to) -> + in_reply_to + + is_map(in_reply_to) && is_bitstring(in_reply_to["id"]) -> + in_reply_to["id"] + + is_list(in_reply_to) && is_bitstring(Enum.at(in_reply_to, 0)) -> + Enum.at(in_reply_to, 0) + + true -> + "" + end + end + def fix_context(object) do context = object["context"] || object["conversation"] || Utils.generate_context_id() @@ -210,11 +202,9 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do def fix_attachments(%{"attachment" => attachment} = object) when is_list(attachment) do attachments = - attachment - |> Enum.map(fn data -> + Enum.map(attachment, fn data -> media_type = data["mediaType"] || data["mimeType"] href = data["url"] || data["href"] - url = [%{"type" => "Link", "mediaType" => media_type, "href" => href}] data @@ -222,30 +212,25 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do |> Map.put("url", url) end) - object - |> Map.put("attachment", attachments) + Map.put(object, "attachment", attachments) end def fix_attachments(%{"attachment" => attachment} = object) when is_map(attachment) do - Map.put(object, "attachment", [attachment]) + object + |> Map.put("attachment", [attachment]) |> fix_attachments() end def fix_attachments(object), do: object def fix_url(%{"url" => url} = object) when is_map(url) do - object - |> Map.put("url", url["href"]) + Map.put(object, "url", url["href"]) end def fix_url(%{"type" => "Video", "url" => url} = object) when is_list(url) do first_element = Enum.at(url, 0) - link_element = - url - |> Enum.filter(fn x -> is_map(x) end) - |> Enum.filter(fn x -> x["mimeType"] == "text/html" end) - |> Enum.at(0) + link_element = Enum.find(url, fn x -> is_map(x) and x["mimeType"] == "text/html" end) object |> Map.put("attachment", [first_element]) @@ -263,36 +248,32 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do true -> "" end - object - |> Map.put("url", url_string) + Map.put(object, "url", url_string) end def fix_url(object), do: object def fix_emoji(%{"tag" => tags} = object) when is_list(tags) do - emoji = tags |> Enum.filter(fn data -> data["type"] == "Emoji" and data["icon"] end) - emoji = - emoji + tags + |> Enum.filter(fn data -> data["type"] == "Emoji" and data["icon"] end) |> Enum.reduce(%{}, fn data, mapping -> name = String.trim(data["name"], ":") - mapping |> Map.put(name, data["icon"]["url"]) + Map.put(mapping, name, data["icon"]["url"]) end) # we merge mastodon and pleroma emoji into a single mapping, to allow for both wire formats emoji = Map.merge(object["emoji"] || %{}, emoji) - object - |> Map.put("emoji", emoji) + Map.put(object, "emoji", emoji) end def fix_emoji(%{"tag" => %{"type" => "Emoji"} = tag} = object) do name = String.trim(tag["name"], ":") emoji = %{name => tag["icon"]["url"]} - object - |> Map.put("emoji", emoji) + Map.put(object, "emoji", emoji) end def fix_emoji(object), do: object @@ -303,17 +284,13 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do |> Enum.filter(fn data -> data["type"] == "Hashtag" and data["name"] end) |> Enum.map(fn data -> String.slice(data["name"], 1..-1) end) - combined = tag ++ tags - - object - |> Map.put("tag", combined) + Map.put(object, "tag", tag ++ tags) end def fix_tag(%{"tag" => %{"type" => "Hashtag", "name" => hashtag} = tag} = object) do combined = [tag, String.slice(hashtag, 1..-1)] - object - |> Map.put("tag", combined) + Map.put(object, "tag", combined) end def fix_tag(%{"tag" => %{} = tag} = object), do: Map.put(object, "tag", [tag]) @@ -325,8 +302,7 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do content_groups = Map.to_list(content_map) {_, content} = Enum.at(content_groups, 0) - object - |> Map.put("content", content) + Map.put(object, "content", content) end def fix_content_map(object), do: object @@ -335,16 +311,11 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do def fix_type(%{"inReplyTo" => reply_id, "name" => _} = object, options) when is_binary(reply_id) do - reply = - with true <- Federator.allowed_incoming_reply_depth?(options[:depth]), - {:ok, object} <- get_obj_helper(reply_id, options) do - object - end - - if reply && reply.data["type"] == "Question" do + with true <- Federator.allowed_incoming_reply_depth?(options[:depth]), + {:ok, %{data: %{"type" => "Question"} = _} = _} <- get_obj_helper(reply_id, options) do Map.put(object, "type", "Answer") else - object + _ -> object end end @@ -376,6 +347,17 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do end end + # Reduce the object list to find the reported user. + defp get_reported(objects) do + Enum.reduce_while(objects, nil, fn ap_id, _ -> + with %User{} = user <- User.get_cached_by_ap_id(ap_id) do + {:halt, user} + else + _ -> {:cont, nil} + end + end) + end + def handle_incoming(data, options \\ []) # Flag objects are placed ahead of the ID check because Mastodon 2.8 and earlier send them @@ -384,31 +366,19 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do with context <- data["context"] || Utils.generate_context_id(), content <- data["content"] || "", %User{} = actor <- User.get_cached_by_ap_id(actor), - # Reduce the object list to find the reported user. - %User{} = account <- - Enum.reduce_while(objects, nil, fn ap_id, _ -> - with %User{} = user <- User.get_cached_by_ap_id(ap_id) do - {:halt, user} - else - _ -> {:cont, nil} - end - end), - + %User{} = account <- get_reported(objects), # Remove the reported user from the object list. statuses <- Enum.filter(objects, fn ap_id -> ap_id != account.ap_id end) do - params = %{ + %{ actor: actor, context: context, account: account, statuses: statuses, content: content, - additional: %{ - "cc" => [account.ap_id] - } + additional: %{"cc" => [account.ap_id]} } - - ActivityPub.flag(params) + |> ActivityPub.flag() end end @@ -755,8 +725,12 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do def handle_incoming(_, _), do: :error + @spec get_obj_helper(String.t(), Keyword.t()) :: {:ok, Object.t()} | nil def get_obj_helper(id, options \\ []) do - if object = Object.normalize(id, true, options), do: {:ok, object}, else: nil + case Object.normalize(id, true, options) do + %Object{} = object -> {:ok, object} + _ -> nil + end end def set_reply_to_uri(%{"inReplyTo" => in_reply_to} = object) when is_binary(in_reply_to) do @@ -855,27 +829,24 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do {:ok, data} end - def maybe_fix_object_url(data) do - if is_binary(data["object"]) and not String.starts_with?(data["object"], "http") do - case get_obj_helper(data["object"]) do - {:ok, relative_object} -> - if relative_object.data["external_url"] do - _data = - data - |> Map.put("object", relative_object.data["external_url"]) - else - data - end - - e -> - Logger.error("Couldn't fetch #{data["object"]} #{inspect(e)}") - data - end + def maybe_fix_object_url(%{"object" => object} = data) when is_binary(object) do + with false <- String.starts_with?(object, "http"), + {:fetch, {:ok, relative_object}} <- {:fetch, get_obj_helper(object)}, + %{data: %{"external_url" => external_url}} when not is_nil(external_url) <- + relative_object do + Map.put(data, "object", external_url) else - data + {:fetch, e} -> + Logger.error("Couldn't fetch #{object} #{inspect(e)}") + data + + _ -> + data end end + def maybe_fix_object_url(data), do: data + def add_hashtags(object) do tags = (object["tag"] || []) @@ -893,53 +864,49 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do tag end) - object - |> Map.put("tag", tags) + Map.put(object, "tag", tags) end def add_mention_tags(object) do mentions = object |> Utils.get_notified_from_object() - |> Enum.map(fn user -> - %{"type" => "Mention", "href" => user.ap_id, "name" => "@#{user.nickname}"} - end) + |> Enum.map(&build_mention_tag/1) tags = object["tag"] || [] - object - |> Map.put("tag", tags ++ mentions) + Map.put(object, "tag", tags ++ mentions) end - def add_emoji_tags(%User{info: %{"emoji" => _emoji} = user_info} = object) do - user_info = add_emoji_tags(user_info) + defp build_mention_tag(%{ap_id: ap_id, nickname: nickname} = _) do + %{"type" => "Mention", "href" => ap_id, "name" => "@#{nickname}"} + end - object - |> Map.put(:info, user_info) + def take_emoji_tags(%User{info: %{emoji: emoji} = _user_info} = _user) do + emoji + |> Enum.flat_map(&Map.to_list/1) + |> Enum.map(&build_emoji_tag/1) end # TODO: we should probably send mtime instead of unix epoch time for updated def add_emoji_tags(%{"emoji" => emoji} = object) do tags = object["tag"] || [] - out = - emoji - |> Enum.map(fn {name, url} -> - %{ - "icon" => %{"url" => url, "type" => "Image"}, - "name" => ":" <> name <> ":", - "type" => "Emoji", - "updated" => "1970-01-01T00:00:00Z", - "id" => url - } - end) + out = Enum.map(emoji, &build_emoji_tag/1) - object - |> Map.put("tag", tags ++ out) + Map.put(object, "tag", tags ++ out) end - def add_emoji_tags(object) do - object + def add_emoji_tags(object), do: object + + defp build_emoji_tag({name, url}) do + %{ + "icon" => %{"url" => url, "type" => "Image"}, + "name" => ":" <> name <> ":", + "type" => "Emoji", + "updated" => "1970-01-01T00:00:00Z", + "id" => url + } end def set_conversation(object) do @@ -959,9 +926,7 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do def add_attributed_to(object) do attributed_to = object["attributedTo"] || object["actor"] - - object - |> Map.put("attributedTo", attributed_to) + Map.put(object, "attributedTo", attributed_to) end def prepare_attachments(object) do @@ -972,30 +937,18 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do %{"url" => href, "mediaType" => media_type, "name" => data["name"], "type" => "Document"} end) - object - |> Map.put("attachment", attachments) + Map.put(object, "attachment", attachments) end defp strip_internal_fields(object) do object - |> Map.drop([ - "likes", - "like_count", - "announcements", - "announcement_count", - "emoji", - "context_id", - "deleted_activity_id" - ]) + |> Map.drop(Pleroma.Constants.object_internal_fields()) end defp strip_internal_tags(%{"tag" => tags} = object) do - tags = - tags - |> Enum.filter(fn x -> is_map(x) end) + tags = Enum.filter(tags, fn x -> is_map(x) end) - object - |> Map.put("tag", tags) + Map.put(object, "tag", tags) end defp strip_internal_tags(object), do: object @@ -1049,9 +1002,9 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do with %User{local: false} = user <- User.get_cached_by_ap_id(ap_id), {:ok, data} <- ActivityPub.fetch_and_prepare_user_from_ap_id(ap_id), already_ap <- User.ap_enabled?(user), - {:ok, user} <- user |> User.upgrade_changeset(data) |> User.update_and_set_cache() do - unless already_ap do - PleromaJobQueue.enqueue(:transmogrifier, __MODULE__, [:user_upgrade, user]) + {:ok, user} <- upgrade_user(user, data) do + if not already_ap do + TransmogrifierWorker.enqueue("user_upgrade", %{"user_id" => user.id}) end {:ok, user} @@ -1061,6 +1014,12 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do end end + defp upgrade_user(user, data) do + user + |> User.upgrade_changeset(data, true) + |> User.update_and_set_cache() + end + def maybe_retire_websub(ap_id) do # some sanity checks if is_binary(ap_id) && String.length(ap_id) > 8 do @@ -1074,16 +1033,11 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do end end - def maybe_fix_user_url(data) do - if is_map(data["url"]) do - Map.put(data, "url", data["url"]["href"]) - else - data - end + def maybe_fix_user_url(%{"url" => url} = data) when is_map(url) do + Map.put(data, "url", url["href"]) end - def maybe_fix_user_object(data) do - data - |> maybe_fix_user_url - end + def maybe_fix_user_url(data), do: data + + def maybe_fix_user_object(data), do: maybe_fix_user_url(data) end diff --git a/lib/pleroma/web/activity_pub/utils.ex b/lib/pleroma/web/activity_pub/utils.ex index c9c0c3763..30628a793 100644 --- a/lib/pleroma/web/activity_pub/utils.ex +++ b/lib/pleroma/web/activity_pub/utils.ex @@ -33,50 +33,40 @@ defmodule Pleroma.Web.ActivityPub.Utils do Map.put(params, "actor", get_ap_id(params["actor"])) end - def determine_explicit_mentions(%{"tag" => tag} = _object) when is_list(tag) do - tag - |> Enum.filter(fn x -> is_map(x) end) - |> Enum.filter(fn x -> x["type"] == "Mention" end) - |> Enum.map(fn x -> x["href"] end) + @spec determine_explicit_mentions(map()) :: map() + def determine_explicit_mentions(%{"tag" => tag} = _) when is_list(tag) do + Enum.flat_map(tag, fn + %{"type" => "Mention", "href" => href} -> [href] + _ -> [] + end) end def determine_explicit_mentions(%{"tag" => tag} = object) when is_map(tag) do - Map.put(object, "tag", [tag]) + object + |> Map.put("tag", [tag]) |> determine_explicit_mentions() end def determine_explicit_mentions(_), do: [] + @spec recipient_in_collection(any(), any()) :: boolean() defp recipient_in_collection(ap_id, coll) when is_binary(coll), do: ap_id == coll defp recipient_in_collection(ap_id, coll) when is_list(coll), do: ap_id in coll defp recipient_in_collection(_, _), do: false + @spec recipient_in_message(User.t(), User.t(), map()) :: boolean() def recipient_in_message(%User{ap_id: ap_id} = recipient, %User{} = actor, params) do - cond do - recipient_in_collection(ap_id, params["to"]) -> - true - - recipient_in_collection(ap_id, params["cc"]) -> - true - - recipient_in_collection(ap_id, params["bto"]) -> - true - - recipient_in_collection(ap_id, params["bcc"]) -> - true + addresses = [params["to"], params["cc"], params["bto"], params["bcc"]] + cond do + Enum.any?(addresses, &recipient_in_collection(ap_id, &1)) -> true # if the message is unaddressed at all, then assume it is directly addressed # to the recipient - !params["to"] && !params["cc"] && !params["bto"] && !params["bcc"] -> - true - + Enum.all?(addresses, &is_nil(&1)) -> true # if the message is sent from somebody the user is following, then assume it # is addressed to the recipient - User.following?(recipient, actor) -> - true - - true -> - false + User.following?(recipient, actor) -> true + true -> false end end @@ -85,15 +75,13 @@ defmodule Pleroma.Web.ActivityPub.Utils do defp extract_list(_), do: [] def maybe_splice_recipient(ap_id, params) do - need_splice = + need_splice? = !recipient_in_collection(ap_id, params["to"]) && !recipient_in_collection(ap_id, params["cc"]) - cc_list = extract_list(params["cc"]) - - if need_splice do - params - |> Map.put("cc", [ap_id | cc_list]) + if need_splice? do + cc_list = extract_list(params["cc"]) + Map.put(params, "cc", [ap_id | cc_list]) else params end @@ -139,7 +127,7 @@ defmodule Pleroma.Web.ActivityPub.Utils do "object" => object } - Notification.get_notified_from_activity(%Activity{data: fake_create_activity}, false) + get_notified_from_object(fake_create_activity) end def get_notified_from_object(object) do @@ -169,14 +157,7 @@ defmodule Pleroma.Web.ActivityPub.Utils do @spec maybe_federate(any()) :: :ok def maybe_federate(%Activity{local: true} = activity) do if Pleroma.Config.get!([:instance, :federating]) do - priority = - case activity.data["type"] do - "Delete" -> 10 - "Create" -> 1 - _ -> 5 - end - - Pleroma.Web.Federator.publish(activity, priority) + Pleroma.Web.Federator.publish(activity) end :ok @@ -188,63 +169,66 @@ defmodule Pleroma.Web.ActivityPub.Utils do Adds an id and a published data if they aren't there, also adds it to an included object """ - def lazy_put_activity_defaults(map, fake \\ false) do - map = - unless fake do - %{data: %{"id" => context}, id: context_id} = create_context(map["context"]) - - map - |> Map.put_new_lazy("id", &generate_activity_id/0) - |> Map.put_new_lazy("published", &make_date/0) - |> Map.put_new("context", context) - |> Map.put_new("context_id", context_id) - else - map - |> Map.put_new("id", "pleroma:fakeid") - |> Map.put_new_lazy("published", &make_date/0) - |> Map.put_new("context", "pleroma:fakecontext") - |> Map.put_new("context_id", -1) - end + @spec lazy_put_activity_defaults(map(), boolean) :: map() + def lazy_put_activity_defaults(map, fake? \\ false) - if is_map(map["object"]) do - object = lazy_put_object_defaults(map["object"], map, fake) - %{map | "object" => object} - else - map - end + def lazy_put_activity_defaults(map, true) do + map + |> Map.put_new("id", "pleroma:fakeid") + |> Map.put_new_lazy("published", &make_date/0) + |> Map.put_new("context", "pleroma:fakecontext") + |> Map.put_new("context_id", -1) + |> lazy_put_object_defaults(true) end - @doc """ - Adds an id and published date if they aren't there. - """ - def lazy_put_object_defaults(map, activity \\ %{}, fake) + def lazy_put_activity_defaults(map, _fake?) do + %{data: %{"id" => context}, id: context_id} = create_context(map["context"]) - def lazy_put_object_defaults(map, activity, true = _fake) do map + |> Map.put_new_lazy("id", &generate_activity_id/0) |> Map.put_new_lazy("published", &make_date/0) - |> Map.put_new("id", "pleroma:fake_object_id") - |> Map.put_new("context", activity["context"]) - |> Map.put_new("fake", true) - |> Map.put_new("context_id", activity["context_id"]) + |> Map.put_new("context", context) + |> Map.put_new("context_id", context_id) + |> lazy_put_object_defaults(false) end - def lazy_put_object_defaults(map, activity, _fake) do - map - |> Map.put_new_lazy("id", &generate_object_id/0) - |> Map.put_new_lazy("published", &make_date/0) - |> Map.put_new("context", activity["context"]) - |> Map.put_new("context_id", activity["context_id"]) + # Adds an id and published date if they aren't there. + # + @spec lazy_put_object_defaults(map(), boolean()) :: map() + defp lazy_put_object_defaults(%{"object" => map} = activity, true) + when is_map(map) do + object = + map + |> Map.put_new("id", "pleroma:fake_object_id") + |> Map.put_new_lazy("published", &make_date/0) + |> Map.put_new("context", activity["context"]) + |> Map.put_new("context_id", activity["context_id"]) + |> Map.put_new("fake", true) + + %{activity | "object" => object} end + defp lazy_put_object_defaults(%{"object" => map} = activity, _) + when is_map(map) do + object = + map + |> Map.put_new_lazy("id", &generate_object_id/0) + |> Map.put_new_lazy("published", &make_date/0) + |> Map.put_new("context", activity["context"]) + |> Map.put_new("context_id", activity["context_id"]) + + %{activity | "object" => object} + end + + defp lazy_put_object_defaults(activity, _), do: activity + @doc """ Inserts a full object if it is contained in an activity. """ def insert_full_object(%{"object" => %{"type" => type} = object_data} = map) when is_map(object_data) and type in @supported_object_types do with {:ok, object} <- Object.create(object_data) do - map = - map - |> Map.put("object", object.data["id"]) + map = Map.put(map, "object", object.data["id"]) {:ok, map, object} end @@ -263,7 +247,7 @@ defmodule Pleroma.Web.ActivityPub.Utils do |> Activity.Queries.by_actor() |> Activity.Queries.by_object_id(id) |> Activity.Queries.by_type("Like") - |> Activity.Queries.limit(1) + |> limit(1) |> Repo.one() end @@ -356,36 +340,35 @@ defmodule Pleroma.Web.ActivityPub.Utils do @doc """ Updates a follow activity's state (for locked accounts). """ + @spec update_follow_state_for_all(Activity.t(), String.t()) :: {:ok, Activity} | {:error, any()} def update_follow_state_for_all( %Activity{data: %{"actor" => actor, "object" => object}} = activity, state ) do - try do - Ecto.Adapters.SQL.query!( - Repo, - "UPDATE activities SET data = jsonb_set(data, '{state}', $1) WHERE data->>'type' = 'Follow' AND data->>'actor' = $2 AND data->>'object' = $3 AND data->>'state' = 'pending'", - [state, actor, object] - ) + "Follow" + |> Activity.Queries.by_type() + |> Activity.Queries.by_actor(actor) + |> Activity.Queries.by_object_id(object) + |> where(fragment("data->>'state' = 'pending'")) + |> update(set: [data: fragment("jsonb_set(data, '{state}', ?)", ^state)]) + |> Repo.update_all([]) - User.set_follow_state_cache(actor, object, state) - activity = Activity.get_by_id(activity.id) - {:ok, activity} - rescue - e -> - {:error, e} - end + User.set_follow_state_cache(actor, object, state) + + activity = Activity.get_by_id(activity.id) + + {:ok, activity} end def update_follow_state( %Activity{data: %{"actor" => actor, "object" => object}} = activity, state ) do - with new_data <- - activity.data - |> Map.put("state", state), - changeset <- Changeset.change(activity, data: new_data), - {:ok, activity} <- Repo.update(changeset), - _ <- User.set_follow_state_cache(actor, object, state) do + new_data = Map.put(activity.data, "state", state) + changeset = Changeset.change(activity, data: new_data) + + with {:ok, activity} <- Repo.update(changeset) do + User.set_follow_state_cache(actor, object, state) {:ok, activity} end end @@ -410,28 +393,14 @@ defmodule Pleroma.Web.ActivityPub.Utils do end def fetch_latest_follow(%User{ap_id: follower_id}, %User{ap_id: followed_id}) do - query = - from( - activity in Activity, - where: - fragment( - "? ->> 'type' = 'Follow'", - activity.data - ), - where: activity.actor == ^follower_id, - # this is to use the index - where: - fragment( - "coalesce((?)->'object'->>'id', (?)->>'object') = ?", - activity.data, - activity.data, - ^followed_id - ), - order_by: [fragment("? desc nulls last", activity.id)], - limit: 1 - ) - - Repo.one(query) + "Follow" + |> Activity.Queries.by_type() + |> where(actor: ^follower_id) + # this is to use the index + |> Activity.Queries.by_object_id(followed_id) + |> order_by([activity], fragment("? desc nulls last", activity.id)) + |> limit(1) + |> Repo.one() end #### Announce-related helpers @@ -439,23 +408,14 @@ defmodule Pleroma.Web.ActivityPub.Utils do @doc """ Retruns an existing announce activity if the notice has already been announced """ - def get_existing_announce(actor, %{data: %{"id" => id}}) do - query = - from( - activity in Activity, - where: activity.actor == ^actor, - # this is to use the index - where: - fragment( - "coalesce((?)->'object'->>'id', (?)->>'object') = ?", - activity.data, - activity.data, - ^id - ), - where: fragment("(?)->>'type' = 'Announce'", activity.data) - ) - - Repo.one(query) + @spec get_existing_announce(String.t(), map()) :: Activity.t() | nil + def get_existing_announce(actor, %{data: %{"id" => ap_id}}) do + "Announce" + |> Activity.Queries.by_type() + |> where(actor: ^actor) + # this is to use the index + |> Activity.Queries.by_object_id(ap_id) + |> Repo.one() end @doc """ @@ -531,31 +491,35 @@ defmodule Pleroma.Web.ActivityPub.Utils do |> maybe_put("id", activity_id) end + @spec add_announce_to_object(Activity.t(), Object.t()) :: + {:ok, Object.t()} | {:error, Ecto.Changeset.t()} def add_announce_to_object( - %Activity{ - data: %{"actor" => actor, "cc" => [Pleroma.Constants.as_public()]} - }, + %Activity{data: %{"actor" => actor, "cc" => [Pleroma.Constants.as_public()]}}, object ) do - announcements = - if is_list(object.data["announcements"]), do: object.data["announcements"], else: [] + announcements = take_announcements(object) - with announcements <- [actor | announcements] |> Enum.uniq() do + with announcements <- Enum.uniq([actor | announcements]) do update_element_in_object("announcement", announcements, object) end end def add_announce_to_object(_, object), do: {:ok, object} + @spec remove_announce_from_object(Activity.t(), Object.t()) :: + {:ok, Object.t()} | {:error, Ecto.Changeset.t()} def remove_announce_from_object(%Activity{data: %{"actor" => actor}}, object) do - announcements = - if is_list(object.data["announcements"]), do: object.data["announcements"], else: [] - - with announcements <- announcements |> List.delete(actor) do + with announcements <- List.delete(take_announcements(object), actor) do update_element_in_object("announcement", announcements, object) end end + defp take_announcements(%{data: %{"announcements" => announcements}} = _) + when is_list(announcements), + do: announcements + + defp take_announcements(_), do: [] + #### Unfollow-related helpers def make_unfollow_data(follower, followed, follow_activity, activity_id) do @@ -569,29 +533,16 @@ defmodule Pleroma.Web.ActivityPub.Utils do end #### Block-related helpers + @spec fetch_latest_block(User.t(), User.t()) :: Activity.t() | nil def fetch_latest_block(%User{ap_id: blocker_id}, %User{ap_id: blocked_id}) do - query = - from( - activity in Activity, - where: - fragment( - "? ->> 'type' = 'Block'", - activity.data - ), - where: activity.actor == ^blocker_id, - # this is to use the index - where: - fragment( - "coalesce((?)->'object'->>'id', (?)->>'object') = ?", - activity.data, - activity.data, - ^blocked_id - ), - order_by: [fragment("? desc nulls last", activity.id)], - limit: 1 - ) - - Repo.one(query) + "Block" + |> Activity.Queries.by_type() + |> where(actor: ^blocker_id) + # this is to use the index + |> Activity.Queries.by_object_id(blocked_id) + |> order_by([activity], fragment("? desc nulls last", activity.id)) + |> limit(1) + |> Repo.one() end def make_block_data(blocker, blocked, activity_id) do @@ -631,28 +582,32 @@ defmodule Pleroma.Web.ActivityPub.Utils do end #### Flag-related helpers - - def make_flag_data(params, additional) do - status_ap_ids = - Enum.map(params.statuses || [], fn - %Activity{} = act -> act.data["id"] - act when is_map(act) -> act["id"] - act when is_binary(act) -> act - end) - - object = [params.account.ap_id] ++ status_ap_ids - + @spec make_flag_data(map(), map()) :: map() + def make_flag_data(%{actor: actor, context: context, content: content} = params, additional) do %{ "type" => "Flag", - "actor" => params.actor.ap_id, - "content" => params.content, - "object" => object, - "context" => params.context, + "actor" => actor.ap_id, + "content" => content, + "object" => build_flag_object(params), + "context" => context, "state" => "open" } |> Map.merge(additional) end + def make_flag_data(_, _), do: %{} + + defp build_flag_object(%{account: account, statuses: statuses} = _) do + [account.ap_id] ++ + Enum.map(statuses || [], fn + %Activity{} = act -> act.data["id"] + act when is_map(act) -> act["id"] + act when is_binary(act) -> act + end) + end + + defp build_flag_object(_), do: [] + @doc """ Fetches the OrderedCollection/OrderedCollectionPage from `from`, limiting the amount of pages fetched after the first one to `pages_left` pages. @@ -695,11 +650,11 @@ defmodule Pleroma.Web.ActivityPub.Utils do #### Report-related helpers def update_report_state(%Activity{} = activity, state) when state in @supported_report_states do - with new_data <- Map.put(activity.data, "state", state), - changeset <- Changeset.change(activity, data: new_data), - {:ok, activity} <- Repo.update(changeset) do - {:ok, activity} - end + new_data = Map.put(activity.data, "state", state) + + activity + |> Changeset.change(data: new_data) + |> Repo.update() end def update_report_state(_, _), do: {:error, "Unsupported state"} @@ -766,21 +721,13 @@ defmodule Pleroma.Web.ActivityPub.Utils do end def get_existing_votes(actor, %{data: %{"id" => id}}) do - query = - from( - [activity, object: object] in Activity.with_preloaded_object(Activity), - where: fragment("(?)->>'type' = 'Create'", activity.data), - where: fragment("(?)->>'actor' = ?", activity.data, ^actor), - where: - fragment( - "(?)->>'inReplyTo' = ?", - object.data, - ^to_string(id) - ), - where: fragment("(?)->>'type' = 'Answer'", object.data) - ) - - Repo.all(query) + actor + |> Activity.Queries.by_actor() + |> Activity.Queries.by_type("Create") + |> Activity.with_preloaded_object() + |> where([a, object: o], fragment("(?)->>'inReplyTo' = ?", o.data, ^to_string(id))) + |> where([a, object: o], fragment("(?)->>'type' = 'Answer'", o.data)) + |> Repo.all() end defp maybe_put(map, _key, nil), do: map diff --git a/lib/pleroma/web/activity_pub/views/object_view.ex b/lib/pleroma/web/activity_pub/views/object_view.ex index 94d05f49b..0d63f0707 100644 --- a/lib/pleroma/web/activity_pub/views/object_view.ex +++ b/lib/pleroma/web/activity_pub/views/object_view.ex @@ -37,12 +37,12 @@ defmodule Pleroma.Web.ActivityPub.ObjectView do Map.merge(base, additional) end - def render("likes.json", ap_id, likes, page) do + def render("likes.json", %{ap_id: ap_id, likes: likes, page: page}) do collection(likes, "#{ap_id}/likes", page) |> Map.merge(Pleroma.Web.ActivityPub.Utils.make_json_ld_header()) end - def render("likes.json", ap_id, likes) do + def render("likes.json", %{ap_id: ap_id, likes: likes}) do %{ "id" => "#{ap_id}/likes", "type" => "OrderedCollection", diff --git a/lib/pleroma/web/activity_pub/views/user_view.ex b/lib/pleroma/web/activity_pub/views/user_view.ex index 7be734b26..352d856fa 100644 --- a/lib/pleroma/web/activity_pub/views/user_view.ex +++ b/lib/pleroma/web/activity_pub/views/user_view.ex @@ -75,10 +75,7 @@ defmodule Pleroma.Web.ActivityPub.UserView do endpoints = render("endpoints.json", %{user: user}) - user_tags = - user - |> Transmogrifier.add_emoji_tags() - |> Map.get("tag", []) + emoji_tags = Transmogrifier.take_emoji_tags(user) fields = user.info @@ -110,7 +107,7 @@ defmodule Pleroma.Web.ActivityPub.UserView do }, "endpoints" => endpoints, "attachment" => fields, - "tag" => (user.info.source_data["tag"] || []) ++ user_tags + "tag" => (user.info.source_data["tag"] || []) ++ emoji_tags } |> Map.merge(maybe_make_image(&User.avatar_url/2, "icon", user)) |> Map.merge(maybe_make_image(&User.banner_url/2, "image", user)) @@ -118,30 +115,34 @@ defmodule Pleroma.Web.ActivityPub.UserView do end def render("following.json", %{user: user, page: page} = opts) do - showing = (opts[:for] && opts[:for] == user) || !user.info.hide_follows + showing_items = (opts[:for] && opts[:for] == user) || !user.info.hide_follows + showing_count = showing_items || !user.info.hide_follows_count + query = User.get_friends_query(user) query = from(user in query, select: [:ap_id]) following = Repo.all(query) total = - if showing do + if showing_count do length(following) else 0 end - collection(following, "#{user.ap_id}/following", page, showing, total) + collection(following, "#{user.ap_id}/following", page, showing_items, total) |> Map.merge(Utils.make_json_ld_header()) end def render("following.json", %{user: user} = opts) do - showing = (opts[:for] && opts[:for] == user) || !user.info.hide_follows + showing_items = (opts[:for] && opts[:for] == user) || !user.info.hide_follows + showing_count = showing_items || !user.info.hide_follows_count + query = User.get_friends_query(user) query = from(user in query, select: [:ap_id]) following = Repo.all(query) total = - if showing do + if showing_count do length(following) else 0 @@ -152,7 +153,7 @@ defmodule Pleroma.Web.ActivityPub.UserView do "type" => "OrderedCollection", "totalItems" => total, "first" => - if showing do + if showing_items do collection(following, "#{user.ap_id}/following", 1, !user.info.hide_follows) else "#{user.ap_id}/following?page=1" @@ -162,32 +163,34 @@ defmodule Pleroma.Web.ActivityPub.UserView do end def render("followers.json", %{user: user, page: page} = opts) do - showing = (opts[:for] && opts[:for] == user) || !user.info.hide_followers + showing_items = (opts[:for] && opts[:for] == user) || !user.info.hide_followers + showing_count = showing_items || !user.info.hide_followers_count query = User.get_followers_query(user) query = from(user in query, select: [:ap_id]) followers = Repo.all(query) total = - if showing do + if showing_count do length(followers) else 0 end - collection(followers, "#{user.ap_id}/followers", page, showing, total) + collection(followers, "#{user.ap_id}/followers", page, showing_items, total) |> Map.merge(Utils.make_json_ld_header()) end def render("followers.json", %{user: user} = opts) do - showing = (opts[:for] && opts[:for] == user) || !user.info.hide_followers + showing_items = (opts[:for] && opts[:for] == user) || !user.info.hide_followers + showing_count = showing_items || !user.info.hide_followers_count query = User.get_followers_query(user) query = from(user in query, select: [:ap_id]) followers = Repo.all(query) total = - if showing do + if showing_count do length(followers) else 0 @@ -198,8 +201,8 @@ defmodule Pleroma.Web.ActivityPub.UserView do "type" => "OrderedCollection", "totalItems" => total, "first" => - if showing do - collection(followers, "#{user.ap_id}/followers", 1, showing, total) + if showing_items do + collection(followers, "#{user.ap_id}/followers", 1, showing_items, total) else "#{user.ap_id}/followers?page=1" end @@ -221,11 +224,12 @@ defmodule Pleroma.Web.ActivityPub.UserView do activities = ActivityPub.fetch_user_activities(user, nil, params) + # this is sorted chronologically, so first activity is the newest (max) {max_id, min_id, collection} = if length(activities) > 0 do { - Enum.at(Enum.reverse(activities), 0).id, Enum.at(activities, 0).id, + Enum.at(Enum.reverse(activities), 0).id, Enum.map(activities, fn act -> {:ok, data} = Transmogrifier.prepare_outgoing(act.data) data diff --git a/lib/pleroma/web/admin_api/admin_api_controller.ex b/lib/pleroma/web/admin_api/admin_api_controller.ex index 544b9d7d8..0d1db8fa0 100644 --- a/lib/pleroma/web/admin_api/admin_api_controller.ex +++ b/lib/pleroma/web/admin_api/admin_api_controller.ex @@ -14,6 +14,7 @@ defmodule Pleroma.Web.AdminAPI.AdminAPIController do alias Pleroma.Web.AdminAPI.Config alias Pleroma.Web.AdminAPI.ConfigView alias Pleroma.Web.AdminAPI.ModerationLogView + alias Pleroma.Web.AdminAPI.Report alias Pleroma.Web.AdminAPI.ReportView alias Pleroma.Web.AdminAPI.Search alias Pleroma.Web.CommonAPI @@ -139,7 +140,8 @@ defmodule Pleroma.Web.AdminAPI.AdminAPIController do def user_show(conn, %{"nickname" => nickname}) do with %User{} = user <- User.get_cached_by_nickname_or_id(nickname) do conn - |> json(AccountView.render("show.json", %{user: user})) + |> put_view(AccountView) + |> render("show.json", %{user: user}) else _ -> {:error, :not_found} end @@ -158,7 +160,8 @@ defmodule Pleroma.Web.AdminAPI.AdminAPIController do }) conn - |> json(StatusView.render("index.json", %{activities: activities, as: :activity})) + |> put_view(StatusView) + |> render("index.json", %{activities: activities, as: :activity}) else _ -> {:error, :not_found} end @@ -178,7 +181,8 @@ defmodule Pleroma.Web.AdminAPI.AdminAPIController do }) conn - |> json(AccountView.render("show.json", %{user: updated_user})) + |> put_view(AccountView) + |> render("show.json", %{user: updated_user}) end def tag_users(%{assigns: %{user: admin}} = conn, %{"nicknames" => nicknames, "tags" => tags}) do @@ -400,13 +404,23 @@ defmodule Pleroma.Web.AdminAPI.AdminAPIController do end end - @doc "Get a account registeration invite token (base64 string)" - def get_invite_token(conn, params) do - options = params["invite"] || %{} - {:ok, invite} = UserInviteToken.create_invite(options) + @doc "Create an account registration invite token" + def create_invite_token(conn, params) do + opts = %{} - conn - |> json(invite.token) + opts = + if params["max_use"], + do: Map.put(opts, :max_use, params["max_use"]), + else: opts + + opts = + if params["expires_at"], + do: Map.put(opts, :expires_at, params["expires_at"]), + else: opts + + {:ok, invite} = UserInviteToken.create_invite(opts) + + json(conn, AccountView.render("invite.json", %{invite: invite})) end @doc "Get list of created invites" @@ -414,7 +428,8 @@ defmodule Pleroma.Web.AdminAPI.AdminAPIController do invites = UserInviteToken.list_invites() conn - |> json(AccountView.render("invites.json", %{invites: invites})) + |> put_view(AccountView) + |> render("invites.json", %{invites: invites}) end @doc "Revokes invite by token" @@ -422,7 +437,8 @@ defmodule Pleroma.Web.AdminAPI.AdminAPIController do with {:ok, invite} <- UserInviteToken.find_by_token(token), {:ok, updated_invite} = UserInviteToken.update_invite(invite, %{used: true}) do conn - |> json(AccountView.render("invite.json", %{invite: updated_invite})) + |> put_view(AccountView) + |> render("invite.json", %{invite: updated_invite}) else nil -> {:error, :not_found} end @@ -437,16 +453,23 @@ defmodule Pleroma.Web.AdminAPI.AdminAPIController do |> json(token.token) end + @doc "Force password reset for a given user" + def force_password_reset(conn, %{"nickname" => nickname}) do + (%User{local: true} = user) = User.get_cached_by_nickname(nickname) + + User.force_password_reset_async(user) + + json_response(conn, :no_content, "") + end + def list_reports(conn, params) do params = params |> Map.put("type", "Flag") |> Map.put("skip_preload", true) + |> Map.put("total", true) - reports = - [] - |> ActivityPub.fetch_activities(params) - |> Enum.reverse() + reports = ActivityPub.fetch_activities([], params) conn |> put_view(ReportView) @@ -457,7 +480,7 @@ defmodule Pleroma.Web.AdminAPI.AdminAPIController do with %Activity{} = report <- Activity.get_by_id(id) do conn |> put_view(ReportView) - |> render("show.json", %{report: report}) + |> render("show.json", Report.extract_report_info(report)) else _ -> {:error, :not_found} end @@ -473,7 +496,7 @@ defmodule Pleroma.Web.AdminAPI.AdminAPIController do conn |> put_view(ReportView) - |> render("show.json", %{report: report}) + |> render("show.json", Report.extract_report_info(report)) end end @@ -591,6 +614,12 @@ defmodule Pleroma.Web.AdminAPI.AdminAPIController do |> render("index.json", %{configs: updated}) end + def reload_emoji(conn, _params) do + Pleroma.Emoji.reload() + + conn |> json("ok") + end + def errors(conn, {:error, :not_found}) do conn |> put_status(:not_found) diff --git a/lib/pleroma/web/admin_api/config.ex b/lib/pleroma/web/admin_api/config.ex index a10cc779b..1917a5580 100644 --- a/lib/pleroma/web/admin_api/config.ex +++ b/lib/pleroma/web/admin_api/config.ex @@ -90,6 +90,8 @@ defmodule Pleroma.Web.AdminAPI.Config do for v <- entity, into: [], do: do_convert(v) end + defp do_convert(%Regex{} = entity), do: inspect(entity) + defp do_convert(entity) when is_map(entity) do for {k, v} <- entity, into: %{}, do: {do_convert(k), do_convert(v)} end @@ -122,7 +124,7 @@ defmodule Pleroma.Web.AdminAPI.Config do def transform(entity), do: :erlang.term_to_binary(entity) - defp do_transform(%Regex{} = entity) when is_map(entity), do: entity + defp do_transform(%Regex{} = entity), do: entity defp do_transform(%{"tuple" => [":dispatch", [entity]]}) do {dispatch_settings, []} = do_eval(entity) @@ -154,8 +156,15 @@ defmodule Pleroma.Web.AdminAPI.Config do defp do_transform(entity), do: entity defp do_transform_string("~r/" <> pattern) do - pattern = String.trim_trailing(pattern, "/") - ~r/#{pattern}/ + modificator = String.split(pattern, "/") |> List.last() + pattern = String.trim_trailing(pattern, "/" <> modificator) + + case modificator do + "" -> ~r/#{pattern}/ + "i" -> ~r/#{pattern}/i + "u" -> ~r/#{pattern}/u + "s" -> ~r/#{pattern}/s + end end defp do_transform_string(":" <> atom), do: String.to_atom(atom) diff --git a/lib/pleroma/web/admin_api/report.ex b/lib/pleroma/web/admin_api/report.ex new file mode 100644 index 000000000..c751dc2be --- /dev/null +++ b/lib/pleroma/web/admin_api/report.ex @@ -0,0 +1,22 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Web.AdminAPI.Report do + alias Pleroma.Activity + alias Pleroma.User + + def extract_report_info( + %{data: %{"actor" => actor, "object" => [account_ap_id | status_ap_ids]}} = report + ) do + user = User.get_cached_by_ap_id(actor) + account = User.get_cached_by_ap_id(account_ap_id) + + statuses = + Enum.map(status_ap_ids, fn ap_id -> + Activity.get_by_ap_id_with_object(ap_id) + end) + + %{report: report, user: user, account: account, statuses: statuses} + end +end diff --git a/lib/pleroma/web/admin_api/views/report_view.ex b/lib/pleroma/web/admin_api/views/report_view.ex index a25f3f1fe..8c06364a3 100644 --- a/lib/pleroma/web/admin_api/views/report_view.ex +++ b/lib/pleroma/web/admin_api/views/report_view.ex @@ -4,25 +4,26 @@ defmodule Pleroma.Web.AdminAPI.ReportView do use Pleroma.Web, :view - alias Pleroma.Activity alias Pleroma.HTML alias Pleroma.User + alias Pleroma.Web.AdminAPI.Report alias Pleroma.Web.CommonAPI.Utils alias Pleroma.Web.MastodonAPI.StatusView def render("index.json", %{reports: reports}) do %{ - reports: render_many(reports, __MODULE__, "show.json", as: :report) + reports: + reports[:items] + |> Enum.map(&Report.extract_report_info(&1)) + |> Enum.map(&render(__MODULE__, "show.json", &1)) + |> Enum.reverse(), + total: reports[:total] } end - def render("show.json", %{report: report}) do - user = User.get_cached_by_ap_id(report.data["actor"]) + def render("show.json", %{report: report, user: user, account: account, statuses: statuses}) do created_at = Utils.to_masto_date(report.data["published"]) - [account_ap_id | status_ap_ids] = report.data["object"] - account = User.get_cached_by_ap_id(account_ap_id) - content = unless is_nil(report.data["content"]) do HTML.filter_tags(report.data["content"]) @@ -30,11 +31,6 @@ defmodule Pleroma.Web.AdminAPI.ReportView do nil end - statuses = - Enum.map(status_ap_ids, fn ap_id -> - Activity.get_by_ap_id_with_object(ap_id) - end) - %{ id: report.id, account: merge_account_views(account), diff --git a/lib/pleroma/web/controller_helper.ex b/lib/pleroma/web/controller_helper.ex index eeac9f503..b53a01955 100644 --- a/lib/pleroma/web/controller_helper.ex +++ b/lib/pleroma/web/controller_helper.ex @@ -34,79 +34,38 @@ defmodule Pleroma.Web.ControllerHelper do defp param_to_integer(_, default), do: default - def add_link_headers( - conn, - method, - activities, - param \\ nil, - params \\ %{}, - func3 \\ nil, - func4 \\ nil - ) do - params = - conn.params - |> Map.drop(["since_id", "max_id", "min_id"]) - |> Map.merge(params) + def add_link_headers(conn, activities, extra_params \\ %{}) do + case List.last(activities) do + %{id: max_id} -> + params = + conn.params + |> Map.drop(Map.keys(conn.path_params)) + |> Map.drop(["since_id", "max_id", "min_id"]) + |> Map.merge(extra_params) - last = List.last(activities) + limit = + params + |> Map.get("limit", "20") + |> String.to_integer() - func3 = func3 || (&mastodon_api_url/3) - func4 = func4 || (&mastodon_api_url/4) + min_id = + if length(activities) <= limit do + activities + |> List.first() + |> Map.get(:id) + else + activities + |> Enum.at(limit * -1) + |> Map.get(:id) + end - if last do - max_id = last.id + next_url = current_url(conn, Map.merge(params, %{max_id: max_id})) + prev_url = current_url(conn, Map.merge(params, %{min_id: min_id})) - limit = - params - |> Map.get("limit", "20") - |> String.to_integer() + put_resp_header(conn, "link", "<#{next_url}>; rel=\"next\", <#{prev_url}>; rel=\"prev\"") - min_id = - if length(activities) <= limit do - activities - |> List.first() - |> Map.get(:id) - else - activities - |> Enum.at(limit * -1) - |> Map.get(:id) - end - - {next_url, prev_url} = - if param do - { - func4.( - Pleroma.Web.Endpoint, - method, - param, - Map.merge(params, %{max_id: max_id}) - ), - func4.( - Pleroma.Web.Endpoint, - method, - param, - Map.merge(params, %{min_id: min_id}) - ) - } - else - { - func3.( - Pleroma.Web.Endpoint, - method, - Map.merge(params, %{max_id: max_id}) - ), - func3.( - Pleroma.Web.Endpoint, - method, - Map.merge(params, %{min_id: min_id}) - ) - } - end - - conn - |> put_resp_header("link", "<#{next_url}>; rel=\"next\", <#{prev_url}>; rel=\"prev\"") - else - conn + _ -> + conn end end end diff --git a/lib/pleroma/web/federator/federator.ex b/lib/pleroma/web/federator/federator.ex index f4f9e83e0..1a2da014a 100644 --- a/lib/pleroma/web/federator/federator.ex +++ b/lib/pleroma/web/federator/federator.ex @@ -10,16 +10,17 @@ defmodule Pleroma.Web.Federator do alias Pleroma.Web.ActivityPub.Transmogrifier alias Pleroma.Web.ActivityPub.Utils alias Pleroma.Web.Federator.Publisher - alias Pleroma.Web.Federator.RetryQueue alias Pleroma.Web.OStatus alias Pleroma.Web.Websub + alias Pleroma.Workers.PublisherWorker + alias Pleroma.Workers.ReceiverWorker + alias Pleroma.Workers.SubscriberWorker require Logger def init do - # 1 minute - Process.sleep(1000 * 60) - refresh_subscriptions() + # To do: consider removing this call in favor of scheduled execution (`quantum`-based) + refresh_subscriptions(schedule_in: 60) end @doc "Addresses [memory leaks on recursive replies fetching](https://git.pleroma.social/pleroma/pleroma/issues/161)" @@ -37,50 +38,38 @@ defmodule Pleroma.Web.Federator do # Client API def incoming_doc(doc) do - PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_doc, doc]) + ReceiverWorker.enqueue("incoming_doc", %{"body" => doc}) end def incoming_ap_doc(params) do - PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_ap_doc, params]) + ReceiverWorker.enqueue("incoming_ap_doc", %{"params" => params}) end - def publish(activity, priority \\ 1) do - PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish, activity], priority) + def publish(%{id: "pleroma:fakeid"} = activity) do + perform(:publish, activity) end - def verify_websub(websub) do - PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:verify_websub, websub]) + def publish(activity) do + PublisherWorker.enqueue("publish", %{"activity_id" => activity.id}) end - def request_subscription(sub) do - PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:request_subscription, sub]) + def verify_websub(websub) do + SubscriberWorker.enqueue("verify_websub", %{"websub_id" => websub.id}) end - def refresh_subscriptions do - PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:refresh_subscriptions]) + def request_subscription(websub) do + SubscriberWorker.enqueue("request_subscription", %{"websub_id" => websub.id}) end - # Job Worker Callbacks - - def perform(:refresh_subscriptions) do - Logger.debug("Federator running refresh subscriptions") - Websub.refresh_subscriptions() - - spawn(fn -> - # 6 hours - Process.sleep(1000 * 60 * 60 * 6) - refresh_subscriptions() - end) + def refresh_subscriptions(worker_args \\ []) do + SubscriberWorker.enqueue("refresh_subscriptions", %{}, worker_args ++ [max_attempts: 1]) end - def perform(:request_subscription, websub) do - Logger.debug("Refreshing #{websub.topic}") + # Job Worker Callbacks - with {:ok, websub} <- Websub.request_subscription(websub) do - Logger.debug("Successfully refreshed #{websub.topic}") - else - _e -> Logger.debug("Couldn't refresh #{websub.topic}") - end + @spec perform(atom(), module(), any()) :: {:ok, any()} | {:error, any()} + def perform(:publish_one, module, params) do + apply(module, :publish_one, [params]) end def perform(:publish, activity) do @@ -92,14 +81,6 @@ defmodule Pleroma.Web.Federator do end end - def perform(:verify_websub, websub) do - Logger.debug(fn -> - "Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})" - end) - - Websub.verify(websub) - end - def perform(:incoming_doc, doc) do Logger.info("Got document, trying to parse") OStatus.handle_incoming(doc) @@ -130,22 +111,27 @@ defmodule Pleroma.Web.Federator do end end - def perform( - :publish_single_websub, - %{xml: _xml, topic: _topic, callback: _callback, secret: _secret} = params - ) do - case Websub.publish_one(params) do - {:ok, _} -> - :ok + def perform(:request_subscription, websub) do + Logger.debug("Refreshing #{websub.topic}") - {:error, _} -> - RetryQueue.enqueue(params, Websub) + with {:ok, websub} <- Websub.request_subscription(websub) do + Logger.debug("Successfully refreshed #{websub.topic}") + else + _e -> Logger.debug("Couldn't refresh #{websub.topic}") end end - def perform(type, _) do - Logger.debug(fn -> "Unknown task: #{type}" end) - {:error, "Don't know what to do with this"} + def perform(:verify_websub, websub) do + Logger.debug(fn -> + "Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})" + end) + + Websub.verify(websub) + end + + def perform(:refresh_subscriptions) do + Logger.debug("Federator running refresh subscriptions") + Websub.refresh_subscriptions() end def ap_enabled_actor(id) do diff --git a/lib/pleroma/web/federator/publisher.ex b/lib/pleroma/web/federator/publisher.ex index 70f870244..937064638 100644 --- a/lib/pleroma/web/federator/publisher.ex +++ b/lib/pleroma/web/federator/publisher.ex @@ -6,7 +6,7 @@ defmodule Pleroma.Web.Federator.Publisher do alias Pleroma.Activity alias Pleroma.Config alias Pleroma.User - alias Pleroma.Web.Federator.RetryQueue + alias Pleroma.Workers.PublisherWorker require Logger @@ -30,23 +30,11 @@ defmodule Pleroma.Web.Federator.Publisher do Enqueue publishing a single activity. """ @spec enqueue_one(module(), Map.t()) :: :ok - def enqueue_one(module, %{} = params), - do: PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish_one, module, params]) - - @spec perform(atom(), module(), any()) :: {:ok, any()} | {:error, any()} - def perform(:publish_one, module, params) do - case apply(module, :publish_one, [params]) do - {:ok, _} -> - :ok - - {:error, _e} -> - RetryQueue.enqueue(params, module) - end - end - - def perform(type, _, _) do - Logger.debug("Unknown task: #{type}") - {:error, "Don't know what to do with this"} + def enqueue_one(module, %{} = params) do + PublisherWorker.enqueue( + "publish_one", + %{"module" => to_string(module), "params" => params} + ) end @doc """ diff --git a/lib/pleroma/web/federator/retry_queue.ex b/lib/pleroma/web/federator/retry_queue.ex deleted file mode 100644 index 9eab8c218..000000000 --- a/lib/pleroma/web/federator/retry_queue.ex +++ /dev/null @@ -1,239 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Web.Federator.RetryQueue do - use GenServer - - require Logger - - def init(args) do - queue_table = :ets.new(:pleroma_retry_queue, [:bag, :protected]) - - {:ok, %{args | queue_table: queue_table, running_jobs: :sets.new()}} - end - - def start_link(_) do - enabled = - if Pleroma.Config.get(:env) == :test, - do: true, - else: Pleroma.Config.get([__MODULE__, :enabled], false) - - if enabled do - Logger.info("Starting retry queue") - - linkres = - GenServer.start_link( - __MODULE__, - %{delivered: 0, dropped: 0, queue_table: nil, running_jobs: nil}, - name: __MODULE__ - ) - - maybe_kickoff_timer() - linkres - else - Logger.info("Retry queue disabled") - :ignore - end - end - - def enqueue(data, transport, retries \\ 0) do - GenServer.cast(__MODULE__, {:maybe_enqueue, data, transport, retries + 1}) - end - - def get_stats do - GenServer.call(__MODULE__, :get_stats) - end - - def reset_stats do - GenServer.call(__MODULE__, :reset_stats) - end - - def get_retry_params(retries) do - if retries > Pleroma.Config.get([__MODULE__, :max_retries]) do - {:drop, "Max retries reached"} - else - {:retry, growth_function(retries)} - end - end - - def get_retry_timer_interval do - Pleroma.Config.get([:retry_queue, :interval], 1000) - end - - defp ets_count_expires(table, current_time) do - :ets.select_count( - table, - [ - { - {:"$1", :"$2"}, - [{:"=<", :"$1", {:const, current_time}}], - [true] - } - ] - ) - end - - defp ets_pop_n_expired(table, current_time, desired) do - {popped, _continuation} = - :ets.select( - table, - [ - { - {:"$1", :"$2"}, - [{:"=<", :"$1", {:const, current_time}}], - [:"$_"] - } - ], - desired - ) - - popped - |> Enum.each(fn e -> - :ets.delete_object(table, e) - end) - - popped - end - - def maybe_start_job(running_jobs, queue_table) do - # we don't want to hit the ets or the DateTime more times than we have to - # could optimize slightly further by not using the count, and instead grabbing - # up to N objects early... - current_time = DateTime.to_unix(DateTime.utc_now()) - n_running_jobs = :sets.size(running_jobs) - - if n_running_jobs < Pleroma.Config.get([__MODULE__, :max_jobs]) do - n_ready_jobs = ets_count_expires(queue_table, current_time) - - if n_ready_jobs > 0 do - # figure out how many we could start - available_job_slots = Pleroma.Config.get([__MODULE__, :max_jobs]) - n_running_jobs - start_n_jobs(running_jobs, queue_table, current_time, available_job_slots) - else - running_jobs - end - else - running_jobs - end - end - - defp start_n_jobs(running_jobs, _queue_table, _current_time, 0) do - running_jobs - end - - defp start_n_jobs(running_jobs, queue_table, current_time, available_job_slots) - when available_job_slots > 0 do - candidates = ets_pop_n_expired(queue_table, current_time, available_job_slots) - - candidates - |> List.foldl(running_jobs, fn {_, e}, rj -> - {:ok, pid} = Task.start(fn -> worker(e) end) - mref = Process.monitor(pid) - :sets.add_element(mref, rj) - end) - end - - def worker({:send, data, transport, retries}) do - case transport.publish_one(data) do - {:ok, _} -> - GenServer.cast(__MODULE__, :inc_delivered) - :delivered - - {:error, _reason} -> - enqueue(data, transport, retries) - :retry - end - end - - def handle_call(:get_stats, _from, %{delivered: delivery_count, dropped: drop_count} = state) do - {:reply, %{delivered: delivery_count, dropped: drop_count}, state} - end - - def handle_call(:reset_stats, _from, %{delivered: delivery_count, dropped: drop_count} = state) do - {:reply, %{delivered: delivery_count, dropped: drop_count}, - %{state | delivered: 0, dropped: 0}} - end - - def handle_cast(:reset_stats, state) do - {:noreply, %{state | delivered: 0, dropped: 0}} - end - - def handle_cast( - {:maybe_enqueue, data, transport, retries}, - %{dropped: drop_count, queue_table: queue_table, running_jobs: running_jobs} = state - ) do - case get_retry_params(retries) do - {:retry, timeout} -> - :ets.insert(queue_table, {timeout, {:send, data, transport, retries}}) - running_jobs = maybe_start_job(running_jobs, queue_table) - {:noreply, %{state | running_jobs: running_jobs}} - - {:drop, message} -> - Logger.debug(message) - {:noreply, %{state | dropped: drop_count + 1}} - end - end - - def handle_cast(:kickoff_timer, state) do - retry_interval = get_retry_timer_interval() - Process.send_after(__MODULE__, :retry_timer_run, retry_interval) - {:noreply, state} - end - - def handle_cast(:inc_delivered, %{delivered: delivery_count} = state) do - {:noreply, %{state | delivered: delivery_count + 1}} - end - - def handle_cast(:inc_dropped, %{dropped: drop_count} = state) do - {:noreply, %{state | dropped: drop_count + 1}} - end - - def handle_info({:send, data, transport, retries}, %{delivered: delivery_count} = state) do - case transport.publish_one(data) do - {:ok, _} -> - {:noreply, %{state | delivered: delivery_count + 1}} - - {:error, _reason} -> - enqueue(data, transport, retries) - {:noreply, state} - end - end - - def handle_info( - :retry_timer_run, - %{queue_table: queue_table, running_jobs: running_jobs} = state - ) do - maybe_kickoff_timer() - running_jobs = maybe_start_job(running_jobs, queue_table) - {:noreply, %{state | running_jobs: running_jobs}} - end - - def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do - %{running_jobs: running_jobs, queue_table: queue_table} = state - running_jobs = :sets.del_element(ref, running_jobs) - running_jobs = maybe_start_job(running_jobs, queue_table) - {:noreply, %{state | running_jobs: running_jobs}} - end - - def handle_info(unknown, state) do - Logger.debug("RetryQueue: don't know what to do with #{inspect(unknown)}, ignoring") - {:noreply, state} - end - - if Pleroma.Config.get(:env) == :test do - defp growth_function(_retries) do - _shutit = Pleroma.Config.get([__MODULE__, :initial_timeout]) - DateTime.to_unix(DateTime.utc_now()) - 1 - end - else - defp growth_function(retries) do - round(Pleroma.Config.get([__MODULE__, :initial_timeout]) * :math.pow(retries, 3)) + - DateTime.to_unix(DateTime.utc_now()) - end - end - - defp maybe_kickoff_timer do - GenServer.cast(__MODULE__, :kickoff_timer) - end -end diff --git a/lib/pleroma/web/mastodon_api/controllers/mastodon_api_controller.ex b/lib/pleroma/web/mastodon_api/controllers/mastodon_api_controller.ex index 116da9ae8..bb81b061e 100644 --- a/lib/pleroma/web/mastodon_api/controllers/mastodon_api_controller.ex +++ b/lib/pleroma/web/mastodon_api/controllers/mastodon_api_controller.ex @@ -6,7 +6,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do use Pleroma.Web, :controller import Pleroma.Web.ControllerHelper, - only: [json_response: 3, add_link_headers: 5, add_link_headers: 4, add_link_headers: 3] + only: [json_response: 3, add_link_headers: 2, add_link_headers: 3] alias Ecto.Changeset alias Pleroma.Activity @@ -147,6 +147,8 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do [ :no_rich_text, :locked, + :hide_followers_count, + :hide_follows_count, :hide_followers, :hide_follows, :hide_favorites, @@ -365,7 +367,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do |> Enum.reverse() conn - |> add_link_headers(:home_timeline, activities) + |> add_link_headers(activities) |> put_view(StatusView) |> render("index.json", %{activities: activities, for: user, as: :activity}) end @@ -379,12 +381,11 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do |> Map.put("local_only", local_only) |> Map.put("blocking_user", user) |> Map.put("muting_user", user) - |> Map.put("user", user) |> ActivityPub.fetch_public_activities() |> Enum.reverse() conn - |> add_link_headers(:public_timeline, activities, false, %{"local" => local_only}) + |> add_link_headers(activities, %{"local" => local_only}) |> put_view(StatusView) |> render("index.json", %{activities: activities, for: user, as: :activity}) end @@ -398,7 +399,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do activities = ActivityPub.fetch_user_activities(user, reading_user, params) conn - |> add_link_headers(:user_statuses, activities, params["id"]) + |> add_link_headers(activities) |> put_view(StatusView) |> render("index.json", %{ activities: activities, @@ -422,11 +423,25 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do |> Pagination.fetch_paginated(params) conn - |> add_link_headers(:dm_timeline, activities) + |> add_link_headers(activities) |> put_view(StatusView) |> render("index.json", %{activities: activities, for: user, as: :activity}) end + def get_statuses(%{assigns: %{user: user}} = conn, %{"ids" => ids}) do + limit = 100 + + activities = + ids + |> Enum.take(limit) + |> Activity.all_by_ids_with_object() + |> Enum.filter(&Visibility.visible_for_user?(&1, user)) + + conn + |> put_view(StatusView) + |> render("index.json", activities: activities, for: user, as: :activity) + end + def get_status(%{assigns: %{user: user}} = conn, %{"id" => id}) do with %Activity{} = activity <- Activity.get_by_id_with_object(id), true <- Visibility.visible_for_user?(activity, user) do @@ -471,7 +486,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do end def get_poll(%{assigns: %{user: user}} = conn, %{"id" => id}) do - with %Object{} = object <- Object.get_by_id(id), + with %Object{} = object <- Object.get_by_id_and_maybe_refetch(id, interval: 60), %Activity{} = activity <- Activity.get_create_by_object_ap_id(object.data["id"]), true <- Visibility.visible_for_user?(activity, user) do conn @@ -523,7 +538,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do def scheduled_statuses(%{assigns: %{user: user}} = conn, params) do with scheduled_activities <- MastodonAPI.get_scheduled_activities(user, params) do conn - |> add_link_headers(:scheduled_statuses, scheduled_activities) + |> add_link_headers(scheduled_activities) |> put_view(ScheduledActivityView) |> render("index.json", %{scheduled_activities: scheduled_activities}) end @@ -595,7 +610,12 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do {:ok, activity} -> conn |> put_view(StatusView) - |> try_render("status.json", %{activity: activity, for: user, as: :activity}) + |> try_render("status.json", %{ + activity: activity, + for: user, + as: :activity, + with_direct_conversation_id: true + }) end end end @@ -706,7 +726,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do notifications = MastodonAPI.get_notifications(user, params) conn - |> add_link_headers(:notifications, notifications) + |> add_link_headers(notifications) |> put_view(NotificationView) |> render("index.json", %{notifications: notifications, for: user}) end @@ -828,6 +848,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do def favourited_by(%{assigns: %{user: user}} = conn, %{"id" => id}) do with %Activity{} = activity <- Activity.get_by_id_with_object(id), + {:visible, true} <- {:visible, Visibility.visible_for_user?(activity, user)}, %Object{data: %{"likes" => likes}} <- Object.normalize(activity) do q = from(u in User, where: u.ap_id in ^likes) @@ -839,12 +860,14 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do |> put_view(AccountView) |> render("accounts.json", %{for: user, users: users, as: :user}) else + {:visible, false} -> {:error, :not_found} _ -> json(conn, []) end end def reblogged_by(%{assigns: %{user: user}} = conn, %{"id" => id}) do with %Activity{} = activity <- Activity.get_by_id_with_object(id), + {:visible, true} <- {:visible, Visibility.visible_for_user?(activity, user)}, %Object{data: %{"announcements" => announces}} <- Object.normalize(activity) do q = from(u in User, where: u.ap_id in ^announces) @@ -856,6 +879,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do |> put_view(AccountView) |> render("accounts.json", %{for: user, users: users, as: :user}) else + {:visible, false} -> {:error, :not_found} _ -> json(conn, []) end end @@ -894,7 +918,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do |> Enum.reverse() conn - |> add_link_headers(:hashtag_timeline, activities, params["tag"], %{"local" => local_only}) + |> add_link_headers(activities, %{"local" => local_only}) |> put_view(StatusView) |> render("index.json", %{activities: activities, for: user, as: :activity}) end @@ -910,7 +934,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do end conn - |> add_link_headers(:followers, followers, user) + |> add_link_headers(followers) |> put_view(AccountView) |> render("accounts.json", %{for: for_user, users: followers, as: :user}) end @@ -927,7 +951,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do end conn - |> add_link_headers(:following, followers, user) + |> add_link_headers(followers) |> put_view(AccountView) |> render("accounts.json", %{for: for_user, users: followers, as: :user}) end @@ -1152,7 +1176,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do |> Enum.reverse() conn - |> add_link_headers(:favourites, activities) + |> add_link_headers(activities) |> put_view(StatusView) |> render("index.json", %{activities: activities, for: user, as: :activity}) end @@ -1179,7 +1203,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do |> Enum.reverse() conn - |> add_link_headers(:favourites, activities) + |> add_link_headers(activities) |> put_view(StatusView) |> render("index.json", %{activities: activities, for: for_user, as: :activity}) else @@ -1200,7 +1224,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do |> Enum.map(fn b -> Map.put(b.activity, :bookmark, Map.delete(b, :activity)) end) conn - |> add_link_headers(:bookmarks, bookmarks) + |> add_link_headers(bookmarks) |> put_view(StatusView) |> render("index.json", %{activities: activities, for: user, as: :activity}) end @@ -1640,7 +1664,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do end) conn - |> add_link_headers(:conversations, participations) + |> add_link_headers(participations) |> json(conversations) end diff --git a/lib/pleroma/web/mastodon_api/controllers/search_controller.ex b/lib/pleroma/web/mastodon_api/controllers/search_controller.ex index 9072aa7a4..c91713773 100644 --- a/lib/pleroma/web/mastodon_api/controllers/search_controller.ex +++ b/lib/pleroma/web/mastodon_api/controllers/search_controller.ex @@ -19,9 +19,10 @@ defmodule Pleroma.Web.MastodonAPI.SearchController do def account_search(%{assigns: %{user: user}} = conn, %{"q" => query} = params) do accounts = User.search(query, search_options(params, user)) - res = AccountView.render("accounts.json", users: accounts, for: user, as: :user) - json(conn, res) + conn + |> put_view(AccountView) + |> render("accounts.json", users: accounts, for: user, as: :user) end def search2(conn, params), do: do_search(:v2, conn, params) diff --git a/lib/pleroma/web/mastodon_api/views/account_view.ex b/lib/pleroma/web/mastodon_api/views/account_view.ex index 169116d0d..195dd124b 100644 --- a/lib/pleroma/web/mastodon_api/views/account_view.ex +++ b/lib/pleroma/web/mastodon_api/views/account_view.ex @@ -74,10 +74,18 @@ defmodule Pleroma.Web.MastodonAPI.AccountView do user_info = User.get_cached_user_info(user) following_count = - ((!user.info.hide_follows or opts[:for] == user) && user_info.following_count) || 0 + if !user.info.hide_follows_count or !user.info.hide_follows or opts[:for] == user do + user_info.following_count + else + 0 + end followers_count = - ((!user.info.hide_followers or opts[:for] == user) && user_info.follower_count) || 0 + if !user.info.hide_followers_count or !user.info.hide_followers or opts[:for] == user do + user_info.follower_count + else + 0 + end bot = (user.info.source_data["type"] || "Person") in ["Application", "Service"] @@ -138,6 +146,8 @@ defmodule Pleroma.Web.MastodonAPI.AccountView do pleroma: %{ confirmation_pending: user_info.confirmation_pending, tags: user.tags, + hide_followers_count: user.info.hide_followers_count, + hide_follows_count: user.info.hide_follows_count, hide_followers: user.info.hide_followers, hide_follows: user.info.hide_follows, hide_favorites: user.info.hide_favorites, diff --git a/lib/pleroma/web/mastodon_api/views/status_view.ex b/lib/pleroma/web/mastodon_api/views/status_view.ex index e71083b91..ef796cddd 100644 --- a/lib/pleroma/web/mastodon_api/views/status_view.ex +++ b/lib/pleroma/web/mastodon_api/views/status_view.ex @@ -73,14 +73,12 @@ defmodule Pleroma.Web.MastodonAPI.StatusView do def render("index.json", opts) do replied_to_activities = get_replied_to_activities(opts.activities) - parallel = unless is_nil(opts[:parallel]), do: opts[:parallel], else: true opts.activities |> safe_render_many( StatusView, "status.json", - Map.put(opts, :replied_to_activities, replied_to_activities), - parallel + Map.put(opts, :replied_to_activities, replied_to_activities) ) end @@ -499,7 +497,7 @@ defmodule Pleroma.Web.MastodonAPI.StatusView do object_tags = for tag when is_binary(tag) <- object_tags, do: tag Enum.reduce(object_tags, [], fn tag, tags -> - tags ++ [%{name: tag, url: "/tag/#{tag}"}] + tags ++ [%{name: tag, url: "/tag/#{URI.encode(tag)}"}] end) end diff --git a/lib/pleroma/web/mastodon_api/websocket_handler.ex b/lib/pleroma/web/mastodon_api/websocket_handler.ex index dbd3542ea..3c26eb406 100644 --- a/lib/pleroma/web/mastodon_api/websocket_handler.ex +++ b/lib/pleroma/web/mastodon_api/websocket_handler.ex @@ -8,6 +8,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do alias Pleroma.Repo alias Pleroma.User alias Pleroma.Web.OAuth.Token + alias Pleroma.Web.Streamer @behaviour :cowboy_websocket @@ -24,7 +25,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do ] @anonymous_streams ["public", "public:local", "hashtag"] - # Handled by periodic keepalive in Pleroma.Web.Streamer. + # Handled by periodic keepalive in Pleroma.Web.Streamer.Ping. @timeout :infinity def init(%{qs: qs} = req, state) do @@ -65,7 +66,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do }, topic #{state.topic}" ) - Pleroma.Web.Streamer.add_socket(state.topic, streamer_socket(state)) + Streamer.add_socket(state.topic, streamer_socket(state)) {:ok, state} end @@ -80,7 +81,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do }, topic #{state.topic || "?"}: #{inspect(reason)}" ) - Pleroma.Web.Streamer.remove_socket(state.topic, streamer_socket(state)) + Streamer.remove_socket(state.topic, streamer_socket(state)) :ok end diff --git a/lib/pleroma/web/nodeinfo/nodeinfo_controller.ex b/lib/pleroma/web/nodeinfo/nodeinfo_controller.ex index ee14cfd6b..192984242 100644 --- a/lib/pleroma/web/nodeinfo/nodeinfo_controller.ex +++ b/lib/pleroma/web/nodeinfo/nodeinfo_controller.ex @@ -57,6 +57,7 @@ defmodule Pleroma.Web.Nodeinfo.NodeinfoController do "mastodon_api_streaming", "polls", "pleroma_explicit_addressing", + "shareable_emoji_packs", if Config.get([:media_proxy, :enabled]) do "media_proxy" end, diff --git a/lib/pleroma/web/oauth/oauth_controller.ex b/lib/pleroma/web/oauth/oauth_controller.ex index 81eae2c8b..a57670e02 100644 --- a/lib/pleroma/web/oauth/oauth_controller.ex +++ b/lib/pleroma/web/oauth/oauth_controller.ex @@ -202,6 +202,8 @@ defmodule Pleroma.Web.OAuth.OAuthController do {:ok, app} <- Token.Utils.fetch_app(conn), {:auth_active, true} <- {:auth_active, User.auth_active?(user)}, {:user_active, true} <- {:user_active, !user.info.deactivated}, + {:password_reset_pending, false} <- + {:password_reset_pending, user.info.password_reset_pending}, {:ok, scopes} <- validate_scopes(app, params), {:ok, auth} <- Authorization.create_authorization(app, user, scopes), {:ok, token} <- Token.exchange_token(app, auth) do @@ -215,6 +217,9 @@ defmodule Pleroma.Web.OAuth.OAuthController do {:user_active, false} -> render_error(conn, :forbidden, "Your account is currently disabled") + {:password_reset_pending, true} -> + render_error(conn, :forbidden, "Password reset is required") + _error -> render_invalid_credentials_error(conn) end diff --git a/lib/pleroma/web/oauth/token/clean_worker.ex b/lib/pleroma/web/oauth/token/clean_worker.ex index f50098302..f639f9c6f 100644 --- a/lib/pleroma/web/oauth/token/clean_worker.ex +++ b/lib/pleroma/web/oauth/token/clean_worker.ex @@ -1,5 +1,5 @@ # Pleroma: A lightweight social networking server -# Copyright © 2017-2018 Pleroma Authors <https://pleroma.social/> +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Web.OAuth.Token.CleanWorker do @@ -17,6 +17,7 @@ defmodule Pleroma.Web.OAuth.Token.CleanWorker do ) alias Pleroma.Web.OAuth.Token + alias Pleroma.Workers.BackgroundWorker def start_link(_), do: GenServer.start_link(__MODULE__, %{}) @@ -27,9 +28,11 @@ defmodule Pleroma.Web.OAuth.Token.CleanWorker do @doc false def handle_info(:perform, state) do - Token.delete_expired_tokens() + BackgroundWorker.enqueue("clean_expired_tokens", %{}) Process.send_after(self(), :perform, @interval) {:noreply, state} end + + def perform(:clean), do: Token.delete_expired_tokens() end diff --git a/lib/pleroma/web/oauth/token/query.ex b/lib/pleroma/web/oauth/token/query.ex index d92e1f071..9642103e6 100644 --- a/lib/pleroma/web/oauth/token/query.ex +++ b/lib/pleroma/web/oauth/token/query.ex @@ -1,5 +1,5 @@ # Pleroma: A lightweight social networking server -# Copyright © 2017-2018 Pleroma Authors <https://pleroma.social/> +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Web.OAuth.Token.Query do diff --git a/lib/pleroma/web/ostatus/ostatus.ex b/lib/pleroma/web/ostatus/ostatus.ex index 331cbc0b7..5de1ceef3 100644 --- a/lib/pleroma/web/ostatus/ostatus.ex +++ b/lib/pleroma/web/ostatus/ostatus.ex @@ -3,14 +3,12 @@ # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Web.OStatus do - import Ecto.Query import Pleroma.Web.XML require Logger alias Pleroma.Activity alias Pleroma.HTTP alias Pleroma.Object - alias Pleroma.Repo alias Pleroma.User alias Pleroma.Web alias Pleroma.Web.ActivityPub.ActivityPub @@ -38,21 +36,13 @@ defmodule Pleroma.Web.OStatus do end end - def feed_path(user) do - "#{user.ap_id}/feed.atom" - end + def feed_path(user), do: "#{user.ap_id}/feed.atom" - def pubsub_path(user) do - "#{Web.base_url()}/push/hub/#{user.nickname}" - end + def pubsub_path(user), do: "#{Web.base_url()}/push/hub/#{user.nickname}" - def salmon_path(user) do - "#{user.ap_id}/salmon" - end + def salmon_path(user), do: "#{user.ap_id}/salmon" - def remote_follow_path do - "#{Web.base_url()}/ostatus_subscribe?acct={uri}" - end + def remote_follow_path, do: "#{Web.base_url()}/ostatus_subscribe?acct={uri}" def handle_incoming(xml_string, options \\ []) do with doc when doc != :error <- parse_document(xml_string) do @@ -217,10 +207,9 @@ defmodule Pleroma.Web.OStatus do Get the cw that mastodon uses. """ def get_cw(entry) do - with cw when not is_nil(cw) <- string_from_xpath("/*/summary", entry) do - cw - else - _e -> nil + case string_from_xpath("/*/summary", entry) do + cw when not is_nil(cw) -> cw + _ -> nil end end @@ -232,19 +221,17 @@ defmodule Pleroma.Web.OStatus do end def maybe_update(doc, user) do - if "true" == string_from_xpath("//author[1]/ap_enabled", doc) do - Transmogrifier.upgrade_user_from_ap_id(user.ap_id) - else - maybe_update_ostatus(doc, user) + case string_from_xpath("//author[1]/ap_enabled", doc) do + "true" -> + Transmogrifier.upgrade_user_from_ap_id(user.ap_id) + + _ -> + maybe_update_ostatus(doc, user) end end def maybe_update_ostatus(doc, user) do - old_data = %{ - avatar: user.avatar, - bio: user.bio, - name: user.name - } + old_data = Map.take(user, [:bio, :avatar, :name]) with false <- user.local, avatar <- make_avatar_object(doc), @@ -279,38 +266,37 @@ defmodule Pleroma.Web.OStatus do end end + @spec find_or_make_user(String.t()) :: {:ok, User.t()} def find_or_make_user(uri) do - query = from(user in User, where: user.ap_id == ^uri) - - user = Repo.one(query) - - if is_nil(user) do - make_user(uri) - else - {:ok, user} + case User.get_by_ap_id(uri) do + %User{} = user -> {:ok, user} + _ -> make_user(uri) end end + @spec make_user(String.t(), boolean()) :: {:ok, User.t()} | {:error, any()} def make_user(uri, update \\ false) do with {:ok, info} <- gather_user_info(uri) do - data = %{ - name: info["name"], - nickname: info["nickname"] <> "@" <> info["host"], - ap_id: info["uri"], - info: info, - avatar: info["avatar"], - bio: info["bio"] - } - with false <- update, - %User{} = user <- User.get_cached_by_ap_id(data.ap_id) do + %User{} = user <- User.get_cached_by_ap_id(info["uri"]) do {:ok, user} else - _e -> User.insert_or_update_user(data) + _e -> User.insert_or_update_user(build_user_data(info)) end end end + defp build_user_data(info) do + %{ + name: info["name"], + nickname: info["nickname"] <> "@" <> info["host"], + ap_id: info["uri"], + info: info, + avatar: info["avatar"], + bio: info["bio"] + } + end + # TODO: Just takes the first one for now. def make_avatar_object(author_doc, rel \\ "avatar") do href = string_from_xpath("//author[1]/link[@rel=\"#{rel}\"]/@href", author_doc) @@ -319,23 +305,23 @@ defmodule Pleroma.Web.OStatus do if href do %{ "type" => "Image", - "url" => [ - %{ - "type" => "Link", - "mediaType" => type, - "href" => href - } - ] + "url" => [%{"type" => "Link", "mediaType" => type, "href" => href}] } else nil end end + @spec gather_user_info(String.t()) :: {:ok, map()} | {:error, any()} def gather_user_info(username) do with {:ok, webfinger_data} <- WebFinger.finger(username), {:ok, feed_data} <- Websub.gather_feed_data(webfinger_data["topic"]) do - {:ok, Map.merge(webfinger_data, feed_data) |> Map.put("fqn", username)} + data = + webfinger_data + |> Map.merge(feed_data) + |> Map.put("fqn", username) + + {:ok, data} else e -> Logger.debug(fn -> "Couldn't gather info for #{username}" end) @@ -371,10 +357,7 @@ defmodule Pleroma.Web.OStatus do def fetch_activity_from_atom_url(url, options \\ []) do with true <- String.starts_with?(url, "http"), {:ok, %{body: body, status: code}} when code in 200..299 <- - HTTP.get( - url, - [{:Accept, "application/atom+xml"}] - ) do + HTTP.get(url, [{:Accept, "application/atom+xml"}]) do Logger.debug("Got document from #{url}, handling...") handle_incoming(body, options) else diff --git a/lib/pleroma/web/ostatus/ostatus_controller.ex b/lib/pleroma/web/ostatus/ostatus_controller.ex index 07e2a4c2d..8f325b28e 100644 --- a/lib/pleroma/web/ostatus/ostatus_controller.ex +++ b/lib/pleroma/web/ostatus/ostatus_controller.ex @@ -55,12 +55,11 @@ defmodule Pleroma.Web.OStatus.OStatusController do def feed(conn, %{"nickname" => nickname} = params) do with {_, %User{} = user} <- {:fetch_user, User.get_cached_by_nickname(nickname)} do - query_params = - Map.take(params, ["max_id"]) - |> Map.merge(%{"whole_db" => true, "actor_id" => user.ap_id}) - activities = - ActivityPub.fetch_public_activities(query_params) + params + |> Map.take(["max_id"]) + |> Map.merge(%{"whole_db" => true, "actor_id" => user.ap_id}) + |> ActivityPub.fetch_public_activities() |> Enum.reverse() response = @@ -98,8 +97,7 @@ defmodule Pleroma.Web.OStatus.OStatusController do Federator.incoming_doc(doc) - conn - |> send_resp(200, "") + send_resp(conn, 200, "") end def object(%{assigns: %{format: format}} = conn, %{"uuid" => _uuid}) @@ -218,7 +216,8 @@ defmodule Pleroma.Web.OStatus.OStatusController do conn |> put_resp_header("content-type", "application/activity+json") - |> json(ObjectView.render("object.json", %{object: object})) + |> put_view(ObjectView) + |> render("object.json", %{object: object}) end defp represent_activity(_conn, "activity+json", _, _) do diff --git a/lib/pleroma/web/pleroma_api/controllers/emoji_api_controller.ex b/lib/pleroma/web/pleroma_api/controllers/emoji_api_controller.ex new file mode 100644 index 000000000..370bee9c3 --- /dev/null +++ b/lib/pleroma/web/pleroma_api/controllers/emoji_api_controller.ex @@ -0,0 +1,575 @@ +defmodule Pleroma.Web.PleromaAPI.EmojiAPIController do + use Pleroma.Web, :controller + + require Logger + + @emoji_dir_path Path.join( + Pleroma.Config.get!([:instance, :static_dir]), + "emoji" + ) + + @cache_seconds_per_file Pleroma.Config.get!([:emoji, :shared_pack_cache_seconds_per_file]) + + @doc """ + Lists the packs available on the instance as JSON. + + The information is public and does not require authentification. The format is + a map of "pack directory name" to pack.json contents. + """ + def list_packs(conn, _params) do + with {:ok, results} <- File.ls(@emoji_dir_path) do + pack_infos = + results + |> Enum.filter(&has_pack_json?/1) + |> Enum.map(&load_pack/1) + # Check if all the files are in place and can be sent + |> Enum.map(&validate_pack/1) + # Transform into a map of pack-name => pack-data + |> Enum.into(%{}) + + json(conn, pack_infos) + end + end + + defp has_pack_json?(file) do + dir_path = Path.join(@emoji_dir_path, file) + # Filter to only use the pack.json packs + File.dir?(dir_path) and File.exists?(Path.join(dir_path, "pack.json")) + end + + defp load_pack(pack_name) do + pack_path = Path.join(@emoji_dir_path, pack_name) + pack_file = Path.join(pack_path, "pack.json") + + {pack_name, Jason.decode!(File.read!(pack_file))} + end + + defp validate_pack({name, pack}) do + pack_path = Path.join(@emoji_dir_path, name) + + if can_download?(pack, pack_path) do + archive_for_sha = make_archive(name, pack, pack_path) + archive_sha = :crypto.hash(:sha256, archive_for_sha) |> Base.encode16() + + pack = + pack + |> put_in(["pack", "can-download"], true) + |> put_in(["pack", "download-sha256"], archive_sha) + + {name, pack} + else + {name, put_in(pack, ["pack", "can-download"], false)} + end + end + + defp can_download?(pack, pack_path) do + # If the pack is set as shared, check if it can be downloaded + # That means that when asked, the pack can be packed and sent to the remote + # Otherwise, they'd have to download it from external-src + pack["pack"]["share-files"] && + Enum.all?(pack["files"], fn {_, path} -> + File.exists?(Path.join(pack_path, path)) + end) + end + + defp create_archive_and_cache(name, pack, pack_dir, md5) do + files = + ['pack.json'] ++ + (pack["files"] |> Enum.map(fn {_, path} -> to_charlist(path) end)) + + {:ok, {_, zip_result}} = :zip.zip('#{name}.zip', files, [:memory, cwd: to_charlist(pack_dir)]) + + cache_ms = :timer.seconds(@cache_seconds_per_file * Enum.count(files)) + + Cachex.put!( + :emoji_packs_cache, + name, + # if pack.json MD5 changes, the cache is not valid anymore + %{pack_json_md5: md5, pack_data: zip_result}, + # Add a minute to cache time for every file in the pack + ttl: cache_ms + ) + + Logger.debug("Created an archive for the '#{name}' emoji pack, \ +keeping it in cache for #{div(cache_ms, 1000)}s") + + zip_result + end + + defp make_archive(name, pack, pack_dir) do + # Having a different pack.json md5 invalidates cache + pack_file_md5 = :crypto.hash(:md5, File.read!(Path.join(pack_dir, "pack.json"))) + + case Cachex.get!(:emoji_packs_cache, name) do + %{pack_file_md5: ^pack_file_md5, pack_data: zip_result} -> + Logger.debug("Using cache for the '#{name}' shared emoji pack") + zip_result + + _ -> + create_archive_and_cache(name, pack, pack_dir, pack_file_md5) + end + end + + @doc """ + An endpoint for other instances (via admin UI) or users (via browser) + to download packs that the instance shares. + """ + def download_shared(conn, %{"name" => name}) do + pack_dir = Path.join(@emoji_dir_path, name) + pack_file = Path.join(pack_dir, "pack.json") + + with {_, true} <- {:exists?, File.exists?(pack_file)}, + pack = Jason.decode!(File.read!(pack_file)), + {_, true} <- {:can_download?, can_download?(pack, pack_dir)} do + zip_result = make_archive(name, pack, pack_dir) + send_download(conn, {:binary, zip_result}, filename: "#{name}.zip") + else + {:can_download?, _} -> + conn + |> put_status(:forbidden) + |> json(%{ + error: "Pack #{name} cannot be downloaded from this instance, either pack sharing\ + was disabled for this pack or some files are missing" + }) + + {:exists?, _} -> + conn + |> put_status(:not_found) + |> json(%{error: "Pack #{name} does not exist"}) + end + end + + @doc """ + An admin endpoint to request downloading a pack named `pack_name` from the instance + `instance_address`. + + If the requested instance's admin chose to share the pack, it will be downloaded + from that instance, otherwise it will be downloaded from the fallback source, if there is one. + """ + def download_from(conn, %{"instance_address" => address, "pack_name" => name} = data) do + shareable_packs_available = + "#{address}/.well-known/nodeinfo" + |> Tesla.get!() + |> Map.get(:body) + |> Jason.decode!() + |> List.last() + |> Map.get("href") + # Get the actual nodeinfo address and fetch it + |> Tesla.get!() + |> Map.get(:body) + |> Jason.decode!() + |> get_in(["metadata", "features"]) + |> Enum.member?("shareable_emoji_packs") + + if shareable_packs_available do + full_pack = + "#{address}/api/pleroma/emoji/packs/list" + |> Tesla.get!() + |> Map.get(:body) + |> Jason.decode!() + |> Map.get(name) + + pack_info_res = + case full_pack["pack"] do + %{"share-files" => true, "can-download" => true, "download-sha256" => sha} -> + {:ok, + %{ + sha: sha, + uri: "#{address}/api/pleroma/emoji/packs/download_shared/#{name}" + }} + + %{"fallback-src" => src, "fallback-src-sha256" => sha} when is_binary(src) -> + {:ok, + %{ + sha: sha, + uri: src, + fallback: true + }} + + _ -> + {:error, + "The pack was not set as shared and there is no fallback src to download from"} + end + + with {:ok, %{sha: sha, uri: uri} = pinfo} <- pack_info_res, + %{body: emoji_archive} <- Tesla.get!(uri), + {_, true} <- {:checksum, Base.decode16!(sha) == :crypto.hash(:sha256, emoji_archive)} do + local_name = data["as"] || name + pack_dir = Path.join(@emoji_dir_path, local_name) + File.mkdir_p!(pack_dir) + + files = Enum.map(full_pack["files"], fn {_, path} -> to_charlist(path) end) + # Fallback cannot contain a pack.json file + files = if pinfo[:fallback], do: files, else: ['pack.json'] ++ files + + {:ok, _} = :zip.unzip(emoji_archive, cwd: to_charlist(pack_dir), file_list: files) + + # Fallback can't contain a pack.json file, since that would cause the fallback-src-sha256 + # in it to depend on itself + if pinfo[:fallback] do + pack_file_path = Path.join(pack_dir, "pack.json") + + File.write!(pack_file_path, Jason.encode!(full_pack, pretty: true)) + end + + json(conn, "ok") + else + {:error, e} -> + conn |> put_status(:internal_server_error) |> json(%{error: e}) + + {:checksum, _} -> + conn + |> put_status(:internal_server_error) + |> json(%{error: "SHA256 for the pack doesn't match the one sent by the server"}) + end + else + conn + |> put_status(:internal_server_error) + |> json(%{error: "The requested instance does not support sharing emoji packs"}) + end + end + + @doc """ + Creates an empty pack named `name` which then can be updated via the admin UI. + """ + def create(conn, %{"name" => name}) do + pack_dir = Path.join(@emoji_dir_path, name) + + if not File.exists?(pack_dir) do + File.mkdir_p!(pack_dir) + + pack_file_p = Path.join(pack_dir, "pack.json") + + File.write!( + pack_file_p, + Jason.encode!(%{pack: %{}, files: %{}}, pretty: true) + ) + + conn |> json("ok") + else + conn + |> put_status(:conflict) + |> json(%{error: "A pack named \"#{name}\" already exists"}) + end + end + + @doc """ + Deletes the pack `name` and all it's files. + """ + def delete(conn, %{"name" => name}) do + pack_dir = Path.join(@emoji_dir_path, name) + + case File.rm_rf(pack_dir) do + {:ok, _} -> + conn |> json("ok") + + {:error, _} -> + conn + |> put_status(:internal_server_error) + |> json(%{error: "Couldn't delete the pack #{name}"}) + end + end + + @doc """ + An endpoint to update `pack_names`'s metadata. + + `new_data` is the new metadata for the pack, that will replace the old metadata. + """ + def update_metadata(conn, %{"pack_name" => name, "new_data" => new_data}) do + pack_file_p = Path.join([@emoji_dir_path, name, "pack.json"]) + + full_pack = Jason.decode!(File.read!(pack_file_p)) + + # The new fallback-src is in the new data and it's not the same as it was in the old data + should_update_fb_sha = + not is_nil(new_data["fallback-src"]) and + new_data["fallback-src"] != full_pack["pack"]["fallback-src"] + + with {_, true} <- {:should_update?, should_update_fb_sha}, + %{body: pack_arch} <- Tesla.get!(new_data["fallback-src"]), + {:ok, flist} <- :zip.unzip(pack_arch, [:memory]), + {_, true} <- {:has_all_files?, has_all_files?(full_pack, flist)} do + fallback_sha = :crypto.hash(:sha256, pack_arch) |> Base.encode16() + + new_data = Map.put(new_data, "fallback-src-sha256", fallback_sha) + update_metadata_and_send(conn, full_pack, new_data, pack_file_p) + else + {:should_update?, _} -> + update_metadata_and_send(conn, full_pack, new_data, pack_file_p) + + {:has_all_files?, _} -> + conn + |> put_status(:bad_request) + |> json(%{error: "The fallback archive does not have all files specified in pack.json"}) + end + end + + # Check if all files from the pack.json are in the archive + defp has_all_files?(%{"files" => files}, flist) do + Enum.all?(files, fn {_, from_manifest} -> + Enum.find(flist, fn {from_archive, _} -> + to_string(from_archive) == from_manifest + end) + end) + end + + defp update_metadata_and_send(conn, full_pack, new_data, pack_file_p) do + full_pack = Map.put(full_pack, "pack", new_data) + File.write!(pack_file_p, Jason.encode!(full_pack, pretty: true)) + + # Send new data back with fallback sha filled + json(conn, new_data) + end + + defp get_filename(%{"filename" => filename}), do: filename + + defp get_filename(%{"file" => file}) do + case file do + %Plug.Upload{filename: filename} -> filename + url when is_binary(url) -> Path.basename(url) + end + end + + defp empty?(str), do: String.trim(str) == "" + + defp update_file_and_send(conn, updated_full_pack, pack_file_p) do + # Write the emoji pack file + File.write!(pack_file_p, Jason.encode!(updated_full_pack, pretty: true)) + + # Return the modified file list + json(conn, updated_full_pack["files"]) + end + + @doc """ + Updates a file in a pack. + + Updating can mean three things: + + - `add` adds an emoji named `shortcode` to the pack `pack_name`, + that means that the emoji file needs to be uploaded with the request + (thus requiring it to be a multipart request) and be named `file`. + There can also be an optional `filename` that will be the new emoji file name + (if it's not there, the name will be taken from the uploaded file). + - `update` changes emoji shortcode (from `shortcode` to `new_shortcode` or moves the file + (from the current filename to `new_filename`) + - `remove` removes the emoji named `shortcode` and it's associated file + """ + + # Add + def update_file( + conn, + %{"pack_name" => pack_name, "action" => "add", "shortcode" => shortcode} = params + ) do + pack_dir = Path.join(@emoji_dir_path, pack_name) + pack_file_p = Path.join(pack_dir, "pack.json") + + full_pack = Jason.decode!(File.read!(pack_file_p)) + + with {_, false} <- {:has_shortcode, Map.has_key?(full_pack["files"], shortcode)}, + filename <- get_filename(params), + false <- empty?(shortcode), + false <- empty?(filename) do + file_path = Path.join(pack_dir, filename) + + # If the name contains directories, create them + if String.contains?(file_path, "/") do + File.mkdir_p!(Path.dirname(file_path)) + end + + case params["file"] do + %Plug.Upload{path: upload_path} -> + # Copy the uploaded file from the temporary directory + File.copy!(upload_path, file_path) + + url when is_binary(url) -> + # Download and write the file + file_contents = Tesla.get!(url).body + File.write!(file_path, file_contents) + end + + updated_full_pack = put_in(full_pack, ["files", shortcode], filename) + update_file_and_send(conn, updated_full_pack, pack_file_p) + else + {:has_shortcode, _} -> + conn + |> put_status(:conflict) + |> json(%{error: "An emoji with the \"#{shortcode}\" shortcode already exists"}) + + true -> + conn + |> put_status(:bad_request) + |> json(%{error: "shortcode or filename cannot be empty"}) + end + end + + # Remove + def update_file(conn, %{ + "pack_name" => pack_name, + "action" => "remove", + "shortcode" => shortcode + }) do + pack_dir = Path.join(@emoji_dir_path, pack_name) + pack_file_p = Path.join(pack_dir, "pack.json") + + full_pack = Jason.decode!(File.read!(pack_file_p)) + + if Map.has_key?(full_pack["files"], shortcode) do + {emoji_file_path, updated_full_pack} = pop_in(full_pack, ["files", shortcode]) + + emoji_file_path = Path.join(pack_dir, emoji_file_path) + + # Delete the emoji file + File.rm!(emoji_file_path) + + # If the old directory has no more files, remove it + if String.contains?(emoji_file_path, "/") do + dir = Path.dirname(emoji_file_path) + + if Enum.empty?(File.ls!(dir)) do + File.rmdir!(dir) + end + end + + update_file_and_send(conn, updated_full_pack, pack_file_p) + else + conn + |> put_status(:bad_request) + |> json(%{error: "Emoji \"#{shortcode}\" does not exist"}) + end + end + + # Update + def update_file( + conn, + %{"pack_name" => pack_name, "action" => "update", "shortcode" => shortcode} = params + ) do + pack_dir = Path.join(@emoji_dir_path, pack_name) + pack_file_p = Path.join(pack_dir, "pack.json") + + full_pack = Jason.decode!(File.read!(pack_file_p)) + + with {_, true} <- {:has_shortcode, Map.has_key?(full_pack["files"], shortcode)}, + %{"new_shortcode" => new_shortcode, "new_filename" => new_filename} <- params, + false <- empty?(new_shortcode), + false <- empty?(new_filename) do + # First, remove the old shortcode, saving the old path + {old_emoji_file_path, updated_full_pack} = pop_in(full_pack, ["files", shortcode]) + old_emoji_file_path = Path.join(pack_dir, old_emoji_file_path) + new_emoji_file_path = Path.join(pack_dir, new_filename) + + # If the name contains directories, create them + if String.contains?(new_emoji_file_path, "/") do + File.mkdir_p!(Path.dirname(new_emoji_file_path)) + end + + # Move/Rename the old filename to a new filename + # These are probably on the same filesystem, so just rename should work + :ok = File.rename(old_emoji_file_path, new_emoji_file_path) + + # If the old directory has no more files, remove it + if String.contains?(old_emoji_file_path, "/") do + dir = Path.dirname(old_emoji_file_path) + + if Enum.empty?(File.ls!(dir)) do + File.rmdir!(dir) + end + end + + # Then, put in the new shortcode with the new path + updated_full_pack = put_in(updated_full_pack, ["files", new_shortcode], new_filename) + update_file_and_send(conn, updated_full_pack, pack_file_p) + else + {:has_shortcode, _} -> + conn + |> put_status(:bad_request) + |> json(%{error: "Emoji \"#{shortcode}\" does not exist"}) + + true -> + conn + |> put_status(:bad_request) + |> json(%{error: "new_shortcode or new_filename cannot be empty"}) + + _ -> + conn + |> put_status(:bad_request) + |> json(%{error: "new_shortcode or new_file were not specified"}) + end + end + + def update_file(conn, %{"action" => action}) do + conn + |> put_status(:bad_request) + |> json(%{error: "Unknown action: #{action}"}) + end + + @doc """ + Imports emoji from the filesystem. + + Importing means checking all the directories in the + `$instance_static/emoji/` for directories which do not have + `pack.json`. If one has an emoji.txt file, that file will be used + to create a `pack.json` file with it's contents. If the directory has + neither, all the files with specific configured extenstions will be + assumed to be emojis and stored in the new `pack.json` file. + """ + def import_from_fs(conn, _params) do + with {:ok, results} <- File.ls(@emoji_dir_path) do + imported_pack_names = + results + |> Enum.filter(fn file -> + dir_path = Path.join(@emoji_dir_path, file) + # Find the directories that do NOT have pack.json + File.dir?(dir_path) and not File.exists?(Path.join(dir_path, "pack.json")) + end) + |> Enum.map(&write_pack_json_contents/1) + + json(conn, imported_pack_names) + else + {:error, _} -> + conn + |> put_status(:internal_server_error) + |> json(%{error: "Error accessing emoji pack directory"}) + end + end + + defp write_pack_json_contents(dir) do + dir_path = Path.join(@emoji_dir_path, dir) + emoji_txt_path = Path.join(dir_path, "emoji.txt") + + files_for_pack = files_for_pack(emoji_txt_path, dir_path) + pack_json_contents = Jason.encode!(%{pack: %{}, files: files_for_pack}) + + File.write!(Path.join(dir_path, "pack.json"), pack_json_contents) + + dir + end + + defp files_for_pack(emoji_txt_path, dir_path) do + if File.exists?(emoji_txt_path) do + # There's an emoji.txt file, it's likely from a pack installed by the pack manager. + # Make a pack.json file from the contents of that emoji.txt fileh + + # FIXME: Copy-pasted from Pleroma.Emoji/load_from_file_stream/2 + + # Create a map of shortcodes to filenames from emoji.txt + File.read!(emoji_txt_path) + |> String.split("\n") + |> Enum.map(&String.trim/1) + |> Enum.map(fn line -> + case String.split(line, ~r/,\s*/) do + # This matches both strings with and without tags + # and we don't care about tags here + [name, file | _] -> {name, file} + _ -> nil + end + end) + |> Enum.filter(fn x -> not is_nil(x) end) + |> Enum.into(%{}) + else + # If there's no emoji.txt, assume all files + # that are of certain extensions from the config are emojis and import them all + pack_extensions = Pleroma.Config.get!([:emoji, :pack_extensions]) + Pleroma.Emoji.Loader.make_shortcode_to_file_map(dir_path, pack_extensions) + end + end +end diff --git a/lib/pleroma/web/pleroma_api/pleroma_api_controller.ex b/lib/pleroma/web/pleroma_api/controllers/pleroma_api_controller.ex index f4df3b024..d17ccf84d 100644 --- a/lib/pleroma/web/pleroma_api/pleroma_api_controller.ex +++ b/lib/pleroma/web/pleroma_api/controllers/pleroma_api_controller.ex @@ -5,7 +5,7 @@ defmodule Pleroma.Web.PleromaAPI.PleromaAPIController do use Pleroma.Web, :controller - import Pleroma.Web.ControllerHelper, only: [add_link_headers: 7] + import Pleroma.Web.ControllerHelper, only: [add_link_headers: 2] alias Pleroma.Conversation.Participation alias Pleroma.Notification @@ -27,31 +27,22 @@ defmodule Pleroma.Web.PleromaAPI.PleromaAPIController do %{assigns: %{user: user}} = conn, %{"id" => participation_id} = params ) do - params = - params - |> Map.put("blocking_user", user) - |> Map.put("muting_user", user) - |> Map.put("user", user) - - participation = - participation_id - |> Participation.get(preload: [:conversation]) + participation = Participation.get(participation_id, preload: [:conversation]) if user.id == participation.user_id do + params = + params + |> Map.put("blocking_user", user) + |> Map.put("muting_user", user) + |> Map.put("user", user) + activities = participation.conversation.ap_id |> ActivityPub.fetch_activities_for_context(params) |> Enum.reverse() conn - |> add_link_headers( - :conversation_statuses, - activities, - participation_id, - params, - nil, - &pleroma_api_url/4 - ) + |> add_link_headers(activities) |> put_view(StatusView) |> render("index.json", %{activities: activities, for: user, as: :activity}) end diff --git a/lib/pleroma/web/push/push.ex b/lib/pleroma/web/push/push.ex index 729dad02a..7ef1532ac 100644 --- a/lib/pleroma/web/push/push.ex +++ b/lib/pleroma/web/push/push.ex @@ -3,7 +3,7 @@ # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Web.Push do - alias Pleroma.Web.Push.Impl + alias Pleroma.Workers.WebPusherWorker require Logger @@ -31,6 +31,7 @@ defmodule Pleroma.Web.Push do end end - def send(notification), - do: PleromaJobQueue.enqueue(:web_push, Impl, [notification]) + def send(notification) do + WebPusherWorker.enqueue("web_push", %{"notification_id" => notification.id}) + end end diff --git a/lib/pleroma/web/rich_media/parser.ex b/lib/pleroma/web/rich_media/parser.ex index f5f9e358c..c06b0a0f2 100644 --- a/lib/pleroma/web/rich_media/parser.ex +++ b/lib/pleroma/web/rich_media/parser.ex @@ -81,6 +81,7 @@ defmodule Pleroma.Web.RichMedia.Parser do {:ok, %Tesla.Env{body: html}} = Pleroma.HTTP.get(url, [], adapter: @hackney_options) html + |> parse_html |> maybe_parse() |> Map.put(:url, url) |> clean_parsed_data() @@ -91,6 +92,8 @@ defmodule Pleroma.Web.RichMedia.Parser do end end + defp parse_html(html), do: Floki.parse(html) + defp maybe_parse(html) do Enum.reduce_while(parsers(), %{}, fn parser, acc -> case parser.parse(html, acc) do @@ -100,7 +103,8 @@ defmodule Pleroma.Web.RichMedia.Parser do end) end - defp check_parsed_data(%{title: title} = data) when is_binary(title) and byte_size(title) > 0 do + defp check_parsed_data(%{title: title} = data) + when is_binary(title) and byte_size(title) > 0 do {:ok, data} end diff --git a/lib/pleroma/web/router.ex b/lib/pleroma/web/router.ex index cfb973f53..e583093d2 100644 --- a/lib/pleroma/web/router.ex +++ b/lib/pleroma/web/router.ex @@ -135,6 +135,7 @@ defmodule Pleroma.Web.Router do pipeline :http_signature do plug(Pleroma.Web.Plugs.HTTPSignaturePlug) + plug(Pleroma.Web.Plugs.MappedSignatureToIdentityPlug) end scope "/api/pleroma", Pleroma.Web.TwitterAPI do @@ -179,12 +180,13 @@ defmodule Pleroma.Web.Router do post("/relay", AdminAPIController, :relay_follow) delete("/relay", AdminAPIController, :relay_unfollow) - get("/users/invite_token", AdminAPIController, :get_invite_token) + post("/users/invite_token", AdminAPIController, :create_invite_token) get("/users/invites", AdminAPIController, :invites) post("/users/revoke_invite", AdminAPIController, :revoke_invite) post("/users/email_invite", AdminAPIController, :email_invite) get("/users/:nickname/password_reset", AdminAPIController, :get_password_reset) + patch("/users/:nickname/force_password_reset", AdminAPIController, :force_password_reset) get("/users", AdminAPIController, :list_users) get("/users/:nickname", AdminAPIController, :user_show) @@ -204,6 +206,29 @@ defmodule Pleroma.Web.Router do get("/config/migrate_from_db", AdminAPIController, :migrate_from_db) get("/moderation_log", AdminAPIController, :list_log) + + post("/reload_emoji", AdminAPIController, :reload_emoji) + end + + scope "/api/pleroma/emoji", Pleroma.Web.PleromaAPI do + scope "/packs" do + # Modifying packs + pipe_through([:admin_api, :oauth_write]) + + post("/import_from_fs", EmojiAPIController, :import_from_fs) + + post("/:pack_name/update_file", EmojiAPIController, :update_file) + post("/:pack_name/update_metadata", EmojiAPIController, :update_metadata) + put("/:name", EmojiAPIController, :create) + delete("/:name", EmojiAPIController, :delete) + post("/download_from", EmojiAPIController, :download_from) + end + + scope "/packs" do + # Pack info / downloading + get("/", EmojiAPIController, :list_packs) + get("/:name/download_shared/", EmojiAPIController, :download_shared) + end end scope "/", Pleroma.Web.TwitterAPI do @@ -224,6 +249,7 @@ defmodule Pleroma.Web.Router do scope [] do pipe_through(:oauth_write) + post("/change_email", UtilController, :change_email) post("/change_password", UtilController, :change_password) post("/delete_account", UtilController, :delete_account) put("/notification_settings", UtilController, :update_notificaton_settings) @@ -443,6 +469,7 @@ defmodule Pleroma.Web.Router do get("/timelines/tag/:tag", MastodonAPIController, :hashtag_timeline) get("/timelines/list/:list_id", MastodonAPIController, :list_timeline) + get("/statuses", MastodonAPIController, :get_statuses) get("/statuses/:id", MastodonAPIController, :get_status) get("/statuses/:id/context", MastodonAPIController, :get_context) @@ -512,6 +539,7 @@ defmodule Pleroma.Web.Router do scope "/", Pleroma.Web do pipe_through(:ostatus) + pipe_through(:http_signature) get("/objects/:uuid", OStatus.OStatusController, :object) get("/activities/:uuid", OStatus.OStatusController, :activity) diff --git a/lib/pleroma/web/salmon/salmon.ex b/lib/pleroma/web/salmon/salmon.ex index 9b01ebcc6..8ba7380c0 100644 --- a/lib/pleroma/web/salmon/salmon.ex +++ b/lib/pleroma/web/salmon/salmon.ex @@ -170,6 +170,15 @@ defmodule Pleroma.Web.Salmon do end end + def publish_one(%{recipient_id: recipient_id} = params) do + recipient = User.get_cached_by_id(recipient_id) + + params + |> Map.delete(:recipient_id) + |> Map.put(:recipient, recipient) + |> publish_one() + end + def publish_one(_), do: :noop @supported_activities [ @@ -218,7 +227,7 @@ defmodule Pleroma.Web.Salmon do Logger.debug(fn -> "Sending Salmon to #{remote_user.ap_id}" end) Publisher.enqueue_one(__MODULE__, %{ - recipient: remote_user, + recipient_id: remote_user.id, feed: feed, unreachable_since: reachable_urls_metadata[remote_user.info.salmon] }) diff --git a/lib/pleroma/web/streamer.ex b/lib/pleroma/web/streamer.ex deleted file mode 100644 index 587c43f40..000000000 --- a/lib/pleroma/web/streamer.ex +++ /dev/null @@ -1,318 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Web.Streamer do - use GenServer - require Logger - alias Pleroma.Activity - alias Pleroma.Config - alias Pleroma.Conversation.Participation - alias Pleroma.Notification - alias Pleroma.Object - alias Pleroma.User - alias Pleroma.Web.ActivityPub.ActivityPub - alias Pleroma.Web.ActivityPub.Visibility - alias Pleroma.Web.CommonAPI - alias Pleroma.Web.MastodonAPI.NotificationView - - @keepalive_interval :timer.seconds(30) - - def start_link(_) do - GenServer.start_link(__MODULE__, %{}, name: __MODULE__) - end - - def add_socket(topic, socket) do - GenServer.cast(__MODULE__, %{action: :add, socket: socket, topic: topic}) - end - - def remove_socket(topic, socket) do - GenServer.cast(__MODULE__, %{action: :remove, socket: socket, topic: topic}) - end - - def stream(topic, item) do - GenServer.cast(__MODULE__, %{action: :stream, topic: topic, item: item}) - end - - def init(args) do - Process.send_after(self(), %{action: :ping}, @keepalive_interval) - - {:ok, args} - end - - def handle_info(%{action: :ping}, topics) do - topics - |> Map.values() - |> List.flatten() - |> Enum.each(fn socket -> - Logger.debug("Sending keepalive ping") - send(socket.transport_pid, {:text, ""}) - end) - - Process.send_after(self(), %{action: :ping}, @keepalive_interval) - - {:noreply, topics} - end - - def handle_cast(%{action: :stream, topic: "direct", item: item}, topics) do - recipient_topics = - User.get_recipients_from_activity(item) - |> Enum.map(fn %{id: id} -> "direct:#{id}" end) - - Enum.each(recipient_topics || [], fn user_topic -> - Logger.debug("Trying to push direct message to #{user_topic}\n\n") - push_to_socket(topics, user_topic, item) - end) - - {:noreply, topics} - end - - def handle_cast(%{action: :stream, topic: "participation", item: participation}, topics) do - user_topic = "direct:#{participation.user_id}" - Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n") - - push_to_socket(topics, user_topic, participation) - - {:noreply, topics} - end - - def handle_cast(%{action: :stream, topic: "list", item: item}, topics) do - # filter the recipient list if the activity is not public, see #270. - recipient_lists = - case Visibility.is_public?(item) do - true -> - Pleroma.List.get_lists_from_activity(item) - - _ -> - Pleroma.List.get_lists_from_activity(item) - |> Enum.filter(fn list -> - owner = User.get_cached_by_id(list.user_id) - - Visibility.visible_for_user?(item, owner) - end) - end - - recipient_topics = - recipient_lists - |> Enum.map(fn %{id: id} -> "list:#{id}" end) - - Enum.each(recipient_topics || [], fn list_topic -> - Logger.debug("Trying to push message to #{list_topic}\n\n") - push_to_socket(topics, list_topic, item) - end) - - {:noreply, topics} - end - - def handle_cast( - %{action: :stream, topic: topic, item: %Notification{} = item}, - topics - ) - when topic in ["user", "user:notification"] do - topics - |> Map.get("#{topic}:#{item.user_id}", []) - |> Enum.each(fn socket -> - with %User{} = user <- User.get_cached_by_ap_id(socket.assigns[:user].ap_id), - true <- should_send?(user, item) do - send( - socket.transport_pid, - {:text, represent_notification(socket.assigns[:user], item)} - ) - end - end) - - {:noreply, topics} - end - - def handle_cast(%{action: :stream, topic: "user", item: item}, topics) do - Logger.debug("Trying to push to users") - - recipient_topics = - User.get_recipients_from_activity(item) - |> Enum.map(fn %{id: id} -> "user:#{id}" end) - - Enum.each(recipient_topics, fn topic -> - push_to_socket(topics, topic, item) - end) - - {:noreply, topics} - end - - def handle_cast(%{action: :stream, topic: topic, item: item}, topics) do - Logger.debug("Trying to push to #{topic}") - Logger.debug("Pushing item to #{topic}") - push_to_socket(topics, topic, item) - {:noreply, topics} - end - - def handle_cast(%{action: :add, topic: topic, socket: socket}, sockets) do - topic = internal_topic(topic, socket) - sockets_for_topic = sockets[topic] || [] - sockets_for_topic = Enum.uniq([socket | sockets_for_topic]) - sockets = Map.put(sockets, topic, sockets_for_topic) - Logger.debug("Got new conn for #{topic}") - {:noreply, sockets} - end - - def handle_cast(%{action: :remove, topic: topic, socket: socket}, sockets) do - topic = internal_topic(topic, socket) - sockets_for_topic = sockets[topic] || [] - sockets_for_topic = List.delete(sockets_for_topic, socket) - sockets = Map.put(sockets, topic, sockets_for_topic) - Logger.debug("Removed conn for #{topic}") - {:noreply, sockets} - end - - def handle_cast(m, state) do - Logger.info("Unknown: #{inspect(m)}, #{inspect(state)}") - {:noreply, state} - end - - defp represent_update(%Activity{} = activity, %User{} = user) do - %{ - event: "update", - payload: - Pleroma.Web.MastodonAPI.StatusView.render( - "status.json", - activity: activity, - for: user - ) - |> Jason.encode!() - } - |> Jason.encode!() - end - - defp represent_update(%Activity{} = activity) do - %{ - event: "update", - payload: - Pleroma.Web.MastodonAPI.StatusView.render( - "status.json", - activity: activity - ) - |> Jason.encode!() - } - |> Jason.encode!() - end - - def represent_conversation(%Participation{} = participation) do - %{ - event: "conversation", - payload: - Pleroma.Web.MastodonAPI.ConversationView.render("participation.json", %{ - participation: participation, - for: participation.user - }) - |> Jason.encode!() - } - |> Jason.encode!() - end - - @spec represent_notification(User.t(), Notification.t()) :: binary() - defp represent_notification(%User{} = user, %Notification{} = notify) do - %{ - event: "notification", - payload: - NotificationView.render( - "show.json", - %{notification: notify, for: user} - ) - |> Jason.encode!() - } - |> Jason.encode!() - end - - defp should_send?(%User{} = user, %Activity{} = item) do - blocks = user.info.blocks || [] - mutes = user.info.mutes || [] - reblog_mutes = user.info.muted_reblogs || [] - domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.info.domain_blocks) - - with parent when not is_nil(parent) <- Object.normalize(item), - true <- Enum.all?([blocks, mutes, reblog_mutes], &(item.actor not in &1)), - true <- Enum.all?([blocks, mutes], &(parent.data["actor"] not in &1)), - %{host: item_host} <- URI.parse(item.actor), - %{host: parent_host} <- URI.parse(parent.data["actor"]), - false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host), - false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host), - true <- thread_containment(item, user), - false <- CommonAPI.thread_muted?(user, item) do - true - else - _ -> false - end - end - - defp should_send?(%User{} = user, %Notification{activity: activity}) do - should_send?(user, activity) - end - - def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do - Enum.each(topics[topic] || [], fn socket -> - # Get the current user so we have up-to-date blocks etc. - if socket.assigns[:user] do - user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id) - - if should_send?(user, item) do - send(socket.transport_pid, {:text, represent_update(item, user)}) - end - else - send(socket.transport_pid, {:text, represent_update(item)}) - end - end) - end - - def push_to_socket(topics, topic, %Participation{} = participation) do - Enum.each(topics[topic] || [], fn socket -> - send(socket.transport_pid, {:text, represent_conversation(participation)}) - end) - end - - def push_to_socket(topics, topic, %Activity{ - data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id} - }) do - Enum.each(topics[topic] || [], fn socket -> - send( - socket.transport_pid, - {:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()} - ) - end) - end - - def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop - - def push_to_socket(topics, topic, item) do - Enum.each(topics[topic] || [], fn socket -> - # Get the current user so we have up-to-date blocks etc. - if socket.assigns[:user] do - user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id) - blocks = user.info.blocks || [] - mutes = user.info.mutes || [] - - with true <- Enum.all?([blocks, mutes], &(item.actor not in &1)), - true <- thread_containment(item, user) do - send(socket.transport_pid, {:text, represent_update(item, user)}) - end - else - send(socket.transport_pid, {:text, represent_update(item)}) - end - end) - end - - defp internal_topic(topic, socket) when topic in ~w[user user:notification direct] do - "#{topic}:#{socket.assigns[:user].id}" - end - - defp internal_topic(topic, _), do: topic - - @spec thread_containment(Activity.t(), User.t()) :: boolean() - defp thread_containment(_activity, %User{info: %{skip_thread_containment: true}}), do: true - - defp thread_containment(activity, user) do - if Config.get([:instance, :skip_thread_containment]) do - true - else - ActivityPub.contain_activity(activity, user) - end - end -end diff --git a/lib/pleroma/web/streamer/ping.ex b/lib/pleroma/web/streamer/ping.ex new file mode 100644 index 000000000..f77cbb95c --- /dev/null +++ b/lib/pleroma/web/streamer/ping.ex @@ -0,0 +1,33 @@ +defmodule Pleroma.Web.Streamer.Ping do + use GenServer + require Logger + + alias Pleroma.Web.Streamer.State + alias Pleroma.Web.Streamer.StreamerSocket + + @keepalive_interval :timer.seconds(30) + + def start_link(opts) do + ping_interval = Keyword.get(opts, :ping_interval, @keepalive_interval) + GenServer.start_link(__MODULE__, %{ping_interval: ping_interval}, name: __MODULE__) + end + + def init(%{ping_interval: ping_interval} = args) do + Process.send_after(self(), :ping, ping_interval) + {:ok, args} + end + + def handle_info(:ping, %{ping_interval: ping_interval} = state) do + State.get_sockets() + |> Map.values() + |> List.flatten() + |> Enum.each(fn %StreamerSocket{transport_pid: transport_pid} -> + Logger.debug("Sending keepalive ping") + send(transport_pid, {:text, ""}) + end) + + Process.send_after(self(), :ping, ping_interval) + + {:noreply, state} + end +end diff --git a/lib/pleroma/web/streamer/state.ex b/lib/pleroma/web/streamer/state.ex new file mode 100644 index 000000000..c48752d95 --- /dev/null +++ b/lib/pleroma/web/streamer/state.ex @@ -0,0 +1,78 @@ +defmodule Pleroma.Web.Streamer.State do + use GenServer + require Logger + + alias Pleroma.Web.Streamer.StreamerSocket + + @env Mix.env() + + def start_link(_) do + GenServer.start_link(__MODULE__, %{sockets: %{}}, name: __MODULE__) + end + + def add_socket(topic, socket) do + GenServer.call(__MODULE__, {:add, topic, socket}) + end + + def remove_socket(topic, socket) do + do_remove_socket(@env, topic, socket) + end + + def get_sockets do + %{sockets: stream_sockets} = GenServer.call(__MODULE__, :get_state) + stream_sockets + end + + def init(init_arg) do + {:ok, init_arg} + end + + def handle_call(:get_state, _from, state) do + {:reply, state, state} + end + + def handle_call({:add, topic, socket}, _from, %{sockets: sockets} = state) do + internal_topic = internal_topic(topic, socket) + stream_socket = StreamerSocket.from_socket(socket) + + sockets_for_topic = + sockets + |> Map.get(internal_topic, []) + |> List.insert_at(0, stream_socket) + |> Enum.uniq() + + state = put_in(state, [:sockets, internal_topic], sockets_for_topic) + Logger.debug("Got new conn for #{topic}") + {:reply, state, state} + end + + def handle_call({:remove, topic, socket}, _from, %{sockets: sockets} = state) do + internal_topic = internal_topic(topic, socket) + stream_socket = StreamerSocket.from_socket(socket) + + sockets_for_topic = + sockets + |> Map.get(internal_topic, []) + |> List.delete(stream_socket) + + state = Kernel.put_in(state, [:sockets, internal_topic], sockets_for_topic) + {:reply, state, state} + end + + defp do_remove_socket(:test, _, _) do + :ok + end + + defp do_remove_socket(_env, topic, socket) do + GenServer.call(__MODULE__, {:remove, topic, socket}) + end + + defp internal_topic(topic, socket) + when topic in ~w[user user:notification direct] do + "#{topic}:#{socket.assigns[:user].id}" + end + + defp internal_topic(topic, _) do + topic + end +end diff --git a/lib/pleroma/web/streamer/streamer.ex b/lib/pleroma/web/streamer/streamer.ex new file mode 100644 index 000000000..8cf719277 --- /dev/null +++ b/lib/pleroma/web/streamer/streamer.ex @@ -0,0 +1,55 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Web.Streamer do + alias Pleroma.Web.Streamer.State + alias Pleroma.Web.Streamer.Worker + + @timeout 60_000 + @mix_env Mix.env() + + def add_socket(topic, socket) do + State.add_socket(topic, socket) + end + + def remove_socket(topic, socket) do + State.remove_socket(topic, socket) + end + + def get_sockets do + State.get_sockets() + end + + def stream(topics, items) do + if should_send?() do + Task.async(fn -> + :poolboy.transaction( + :streamer_worker, + &Worker.stream(&1, topics, items), + @timeout + ) + end) + end + end + + def supervisor, do: Pleroma.Web.Streamer.Supervisor + + defp should_send? do + handle_should_send(@mix_env) + end + + defp handle_should_send(:test) do + case Process.whereis(:streamer_worker) do + nil -> + false + + pid -> + Process.alive?(pid) + end + end + + defp handle_should_send(_) do + true + end +end diff --git a/lib/pleroma/web/streamer/streamer_socket.ex b/lib/pleroma/web/streamer/streamer_socket.ex new file mode 100644 index 000000000..f006c0306 --- /dev/null +++ b/lib/pleroma/web/streamer/streamer_socket.ex @@ -0,0 +1,31 @@ +defmodule Pleroma.Web.Streamer.StreamerSocket do + defstruct transport_pid: nil, user: nil + + alias Pleroma.User + alias Pleroma.Web.Streamer.StreamerSocket + + def from_socket(%{ + transport_pid: transport_pid, + assigns: %{user: nil} + }) do + %StreamerSocket{ + transport_pid: transport_pid + } + end + + def from_socket(%{ + transport_pid: transport_pid, + assigns: %{user: %User{} = user} + }) do + %StreamerSocket{ + transport_pid: transport_pid, + user: user + } + end + + def from_socket(%{transport_pid: transport_pid}) do + %StreamerSocket{ + transport_pid: transport_pid + } + end +end diff --git a/lib/pleroma/web/streamer/supervisor.ex b/lib/pleroma/web/streamer/supervisor.ex new file mode 100644 index 000000000..6afe19323 --- /dev/null +++ b/lib/pleroma/web/streamer/supervisor.ex @@ -0,0 +1,33 @@ +defmodule Pleroma.Web.Streamer.Supervisor do + use Supervisor + + def start_link(opts) do + Supervisor.start_link(__MODULE__, opts, name: __MODULE__) + end + + def init(args) do + children = [ + {Pleroma.Web.Streamer.State, args}, + {Pleroma.Web.Streamer.Ping, args}, + :poolboy.child_spec(:streamer_worker, poolboy_config()) + ] + + opts = [strategy: :one_for_one, name: Pleroma.Web.Streamer.Supervisor] + Supervisor.init(children, opts) + end + + defp poolboy_config do + opts = + Pleroma.Config.get(:streamer, + workers: 3, + overflow_workers: 2 + ) + + [ + {:name, {:local, :streamer_worker}}, + {:worker_module, Pleroma.Web.Streamer.Worker}, + {:size, opts[:workers]}, + {:max_overflow, opts[:overflow_workers]} + ] + end +end diff --git a/lib/pleroma/web/streamer/worker.ex b/lib/pleroma/web/streamer/worker.ex new file mode 100644 index 000000000..5804508eb --- /dev/null +++ b/lib/pleroma/web/streamer/worker.ex @@ -0,0 +1,220 @@ +defmodule Pleroma.Web.Streamer.Worker do + use GenServer + + require Logger + + alias Pleroma.Activity + alias Pleroma.Config + alias Pleroma.Conversation.Participation + alias Pleroma.Notification + alias Pleroma.Object + alias Pleroma.User + alias Pleroma.Web.ActivityPub.ActivityPub + alias Pleroma.Web.ActivityPub.Visibility + alias Pleroma.Web.CommonAPI + alias Pleroma.Web.Streamer.State + alias Pleroma.Web.Streamer.StreamerSocket + alias Pleroma.Web.StreamerView + + def start_link(_) do + GenServer.start_link(__MODULE__, %{}, []) + end + + def init(init_arg) do + {:ok, init_arg} + end + + def stream(pid, topics, items) do + GenServer.call(pid, {:stream, topics, items}) + end + + def handle_call({:stream, topics, item}, _from, state) when is_list(topics) do + Enum.each(topics, fn t -> + do_stream(%{topic: t, item: item}) + end) + + {:reply, state, state} + end + + def handle_call({:stream, topic, items}, _from, state) when is_list(items) do + Enum.each(items, fn i -> + do_stream(%{topic: topic, item: i}) + end) + + {:reply, state, state} + end + + def handle_call({:stream, topic, item}, _from, state) do + do_stream(%{topic: topic, item: item}) + + {:reply, state, state} + end + + defp do_stream(%{topic: "direct", item: item}) do + recipient_topics = + User.get_recipients_from_activity(item) + |> Enum.map(fn %{id: id} -> "direct:#{id}" end) + + Enum.each(recipient_topics, fn user_topic -> + Logger.debug("Trying to push direct message to #{user_topic}\n\n") + push_to_socket(State.get_sockets(), user_topic, item) + end) + end + + defp do_stream(%{topic: "participation", item: participation}) do + user_topic = "direct:#{participation.user_id}" + Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n") + + push_to_socket(State.get_sockets(), user_topic, participation) + end + + defp do_stream(%{topic: "list", item: item}) do + # filter the recipient list if the activity is not public, see #270. + recipient_lists = + case Visibility.is_public?(item) do + true -> + Pleroma.List.get_lists_from_activity(item) + + _ -> + Pleroma.List.get_lists_from_activity(item) + |> Enum.filter(fn list -> + owner = User.get_cached_by_id(list.user_id) + + Visibility.visible_for_user?(item, owner) + end) + end + + recipient_topics = + recipient_lists + |> Enum.map(fn %{id: id} -> "list:#{id}" end) + + Enum.each(recipient_topics, fn list_topic -> + Logger.debug("Trying to push message to #{list_topic}\n\n") + push_to_socket(State.get_sockets(), list_topic, item) + end) + end + + defp do_stream(%{topic: topic, item: %Notification{} = item}) + when topic in ["user", "user:notification"] do + State.get_sockets() + |> Map.get("#{topic}:#{item.user_id}", []) + |> Enum.each(fn %StreamerSocket{transport_pid: transport_pid, user: socket_user} -> + with %User{} = user <- User.get_cached_by_ap_id(socket_user.ap_id), + true <- should_send?(user, item) do + send(transport_pid, {:text, StreamerView.render("notification.json", socket_user, item)}) + end + end) + end + + defp do_stream(%{topic: "user", item: item}) do + Logger.debug("Trying to push to users") + + recipient_topics = + User.get_recipients_from_activity(item) + |> Enum.map(fn %{id: id} -> "user:#{id}" end) + + Enum.each(recipient_topics, fn topic -> + push_to_socket(State.get_sockets(), topic, item) + end) + end + + defp do_stream(%{topic: topic, item: item}) do + Logger.debug("Trying to push to #{topic}") + Logger.debug("Pushing item to #{topic}") + push_to_socket(State.get_sockets(), topic, item) + end + + defp should_send?(%User{} = user, %Activity{} = item) do + blocks = user.info.blocks || [] + mutes = user.info.mutes || [] + reblog_mutes = user.info.muted_reblogs || [] + domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.info.domain_blocks) + + with parent when not is_nil(parent) <- Object.normalize(item), + true <- Enum.all?([blocks, mutes, reblog_mutes], &(item.actor not in &1)), + true <- Enum.all?([blocks, mutes], &(parent.data["actor"] not in &1)), + %{host: item_host} <- URI.parse(item.actor), + %{host: parent_host} <- URI.parse(parent.data["actor"]), + false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host), + false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host), + true <- thread_containment(item, user), + false <- CommonAPI.thread_muted?(user, item) do + true + else + _ -> false + end + end + + defp should_send?(%User{} = user, %Notification{activity: activity}) do + should_send?(user, activity) + end + + def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do + Enum.each(topics[topic] || [], fn %StreamerSocket{ + transport_pid: transport_pid, + user: socket_user + } -> + # Get the current user so we have up-to-date blocks etc. + if socket_user do + user = User.get_cached_by_ap_id(socket_user.ap_id) + + if should_send?(user, item) do + send(transport_pid, {:text, StreamerView.render("update.json", item, user)}) + end + else + send(transport_pid, {:text, StreamerView.render("update.json", item)}) + end + end) + end + + def push_to_socket(topics, topic, %Participation{} = participation) do + Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} -> + send(transport_pid, {:text, StreamerView.render("conversation.json", participation)}) + end) + end + + def push_to_socket(topics, topic, %Activity{ + data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id} + }) do + Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} -> + send( + transport_pid, + {:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()} + ) + end) + end + + def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop + + def push_to_socket(topics, topic, item) do + Enum.each(topics[topic] || [], fn %StreamerSocket{ + transport_pid: transport_pid, + user: socket_user + } -> + # Get the current user so we have up-to-date blocks etc. + if socket_user do + user = User.get_cached_by_ap_id(socket_user.ap_id) + blocks = user.info.blocks || [] + mutes = user.info.mutes || [] + + with true <- Enum.all?([blocks, mutes], &(item.actor not in &1)), + true <- thread_containment(item, user) do + send(transport_pid, {:text, StreamerView.render("update.json", item, user)}) + end + else + send(transport_pid, {:text, StreamerView.render("update.json", item)}) + end + end) + end + + @spec thread_containment(Activity.t(), User.t()) :: boolean() + defp thread_containment(_activity, %User{info: %{skip_thread_containment: true}}), do: true + + defp thread_containment(activity, user) do + if Config.get([:instance, :skip_thread_containment]) do + true + else + ActivityPub.contain_activity(activity, user) + end + end +end diff --git a/lib/pleroma/web/twitter_api/controllers/util_controller.ex b/lib/pleroma/web/twitter_api/controllers/util_controller.ex index c14792068..f05a84c7f 100644 --- a/lib/pleroma/web/twitter_api/controllers/util_controller.ex +++ b/lib/pleroma/web/twitter_api/controllers/util_controller.ex @@ -263,12 +263,7 @@ defmodule Pleroma.Web.TwitterAPI.UtilController do String.split(line, ",") |> List.first() end) |> List.delete("Account address") do - PleromaJobQueue.enqueue(:background, User, [ - :follow_import, - follower, - followed_identifiers - ]) - + User.follow_import(follower, followed_identifiers) json(conn, "job started") end end @@ -279,12 +274,7 @@ defmodule Pleroma.Web.TwitterAPI.UtilController do def blocks_import(%{assigns: %{user: blocker}} = conn, %{"list" => list}) do with blocked_identifiers <- String.split(list) do - PleromaJobQueue.enqueue(:background, User, [ - :blocks_import, - blocker, - blocked_identifiers - ]) - + User.blocks_import(blocker, blocked_identifiers) json(conn, "job started") end end @@ -312,6 +302,25 @@ defmodule Pleroma.Web.TwitterAPI.UtilController do end end + def change_email(%{assigns: %{user: user}} = conn, params) do + case CommonAPI.Utils.confirm_current_password(user, params["password"]) do + {:ok, user} -> + with {:ok, _user} <- User.change_email(user, params["email"]) do + json(conn, %{status: "success"}) + else + {:error, changeset} -> + {_, {error, _}} = Enum.at(changeset.errors, 0) + json(conn, %{error: "Email #{error}."}) + + _ -> + json(conn, %{error: "Unable to change email."}) + end + + {:error, msg} -> + json(conn, %{error: msg}) + end + end + def delete_account(%{assigns: %{user: user}} = conn, params) do case CommonAPI.Utils.confirm_current_password(user, params["password"]) do {:ok, user} -> diff --git a/lib/pleroma/web/views/streamer_view.ex b/lib/pleroma/web/views/streamer_view.ex new file mode 100644 index 000000000..b13030fa0 --- /dev/null +++ b/lib/pleroma/web/views/streamer_view.ex @@ -0,0 +1,66 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Web.StreamerView do + use Pleroma.Web, :view + + alias Pleroma.Activity + alias Pleroma.Conversation.Participation + alias Pleroma.Notification + alias Pleroma.User + alias Pleroma.Web.MastodonAPI.NotificationView + + def render("update.json", %Activity{} = activity, %User{} = user) do + %{ + event: "update", + payload: + Pleroma.Web.MastodonAPI.StatusView.render( + "status.json", + activity: activity, + for: user + ) + |> Jason.encode!() + } + |> Jason.encode!() + end + + def render("notification.json", %User{} = user, %Notification{} = notify) do + %{ + event: "notification", + payload: + NotificationView.render( + "show.json", + %{notification: notify, for: user} + ) + |> Jason.encode!() + } + |> Jason.encode!() + end + + def render("update.json", %Activity{} = activity) do + %{ + event: "update", + payload: + Pleroma.Web.MastodonAPI.StatusView.render( + "status.json", + activity: activity + ) + |> Jason.encode!() + } + |> Jason.encode!() + end + + def render("conversation.json", %Participation{} = participation) do + %{ + event: "conversation", + payload: + Pleroma.Web.MastodonAPI.ConversationView.render("participation.json", %{ + participation: participation, + for: participation.user + }) + |> Jason.encode!() + } + |> Jason.encode!() + end +end diff --git a/lib/pleroma/web/web.ex b/lib/pleroma/web/web.ex index bfb6c7287..687346554 100644 --- a/lib/pleroma/web/web.ex +++ b/lib/pleroma/web/web.ex @@ -66,23 +66,9 @@ defmodule Pleroma.Web do end @doc """ - Same as `render_many/4` but wrapped in rescue block and parallelized (unless disabled by passing false as a fifth argument). + Same as `render_many/4` but wrapped in rescue block. """ - def safe_render_many(collection, view, template, assigns \\ %{}, parallel \\ true) - - def safe_render_many(collection, view, template, assigns, true) do - Enum.map(collection, fn resource -> - Task.async(fn -> - as = Map.get(assigns, :as) || view.__resource__ - assigns = Map.put(assigns, as, resource) - safe_render(view, template, assigns) - end) - end) - |> Enum.map(&Task.await(&1, :infinity)) - |> Enum.filter(& &1) - end - - def safe_render_many(collection, view, template, assigns, false) do + def safe_render_many(collection, view, template, assigns \\ %{}) do Enum.map(collection, fn resource -> as = Map.get(assigns, :as) || view.__resource__ assigns = Map.put(assigns, as, resource) |