diff options
Diffstat (limited to 'lib')
37 files changed, 1228 insertions, 203 deletions
diff --git a/lib/mix/tasks/pleroma/database.ex b/lib/mix/tasks/pleroma/database.ex index 2403ed581..e7f4b67a4 100644 --- a/lib/mix/tasks/pleroma/database.ex +++ b/lib/mix/tasks/pleroma/database.ex @@ -8,10 +8,13 @@ defmodule Mix.Tasks.Pleroma.Database do alias Pleroma.Object alias Pleroma.Repo alias Pleroma.User + require Logger require Pleroma.Constants + import Ecto.Query import Mix.Pleroma + use Mix.Task @shortdoc "A collection of database related tasks" @@ -214,4 +217,32 @@ defmodule Mix.Tasks.Pleroma.Database do shell_info('Done.') end end + + # Rolls back a specific migration (leaving subsequent migrations applied). + # WARNING: imposes a risk of unrecoverable data loss — proceed at your own responsibility. + # Based on https://stackoverflow.com/a/53825840 + def run(["rollback", version]) do + prompt = "SEVERE WARNING: this operation may result in unrecoverable data loss. Continue?" + + if shell_prompt(prompt, "n") in ~w(Yn Y y) do + {_, result, _} = + Ecto.Migrator.with_repo(Pleroma.Repo, fn repo -> + version = String.to_integer(version) + re = ~r/^#{version}_.*\.exs/ + path = Ecto.Migrator.migrations_path(repo) + + with {_, "" <> file} <- {:find, Enum.find(File.ls!(path), &String.match?(&1, re))}, + {_, [{mod, _} | _]} <- {:compile, Code.compile_file(Path.join(path, file))}, + {_, :ok} <- {:rollback, Ecto.Migrator.down(repo, version, mod)} do + {:ok, "Reversed migration: #{file}"} + else + {:find, _} -> {:error, "No migration found with version prefix: #{version}"} + {:compile, e} -> {:error, "Problem compiling migration module: #{inspect(e)}"} + {:rollback, e} -> {:error, "Problem reversing migration: #{inspect(e)}"} + end + end) + + shell_info(inspect(result)) + end + end end diff --git a/lib/pleroma/activity.ex b/lib/pleroma/activity.ex index 6542e684e..d59403884 100644 --- a/lib/pleroma/activity.ex +++ b/lib/pleroma/activity.ex @@ -113,6 +113,7 @@ defmodule Pleroma.Activity do from([a] in query, left_join: b in Bookmark, on: b.user_id == ^user.id and b.activity_id == a.id, + as: :bookmark, preload: [bookmark: b] ) end @@ -123,6 +124,7 @@ defmodule Pleroma.Activity do from([a] in query, left_join: r in ReportNote, on: a.id == r.activity_id, + as: :report_note, preload: [report_notes: r] ) end diff --git a/lib/pleroma/activity/ir/topics.ex b/lib/pleroma/activity/ir/topics.ex index d94395fc1..7a603a615 100644 --- a/lib/pleroma/activity/ir/topics.ex +++ b/lib/pleroma/activity/ir/topics.ex @@ -48,14 +48,12 @@ defmodule Pleroma.Activity.Ir.Topics do tags end - defp hashtags_to_topics(%{data: %{"tag" => tags}}) do - tags - |> Enum.filter(&is_bitstring(&1)) - |> Enum.map(fn tag -> "hashtag:" <> tag end) + defp hashtags_to_topics(object) do + object + |> Object.hashtags() + |> Enum.map(fn hashtag -> "hashtag:" <> hashtag end) end - defp hashtags_to_topics(_), do: [] - defp remote_topics(%{local: true}), do: [] defp remote_topics(%{actor: actor}) when is_binary(actor), diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex index c853a2bb4..06d399b2e 100644 --- a/lib/pleroma/application.ex +++ b/lib/pleroma/application.ex @@ -103,9 +103,7 @@ defmodule Pleroma.Application do task_children(@mix_env) ++ dont_run_in_test(@mix_env) ++ chat_child(chat_enabled?()) ++ - [ - Pleroma.Gopher.Server - ] + [Pleroma.Gopher.Server] # See http://elixir-lang.org/docs/stable/elixir/Supervisor.html # for other strategies and supported options @@ -230,6 +228,12 @@ defmodule Pleroma.Application do keys: :duplicate, partitions: System.schedulers_online() ]} + ] ++ background_migrators() + end + + defp background_migrators do + [ + Pleroma.Migrators.HashtagsTableMigrator ] end diff --git a/lib/pleroma/config.ex b/lib/pleroma/config.ex index 2e15a3719..54e332595 100644 --- a/lib/pleroma/config.ex +++ b/lib/pleroma/config.ex @@ -99,4 +99,8 @@ defmodule Pleroma.Config do def oauth_consumer_strategies, do: get([:auth, :oauth_consumer_strategies], []) def oauth_consumer_enabled?, do: oauth_consumer_strategies() != [] + + def feature_enabled?(feature_name) do + get([:features, feature_name]) not in [nil, false, :disabled, :auto] + end end diff --git a/lib/pleroma/data_migration.ex b/lib/pleroma/data_migration.ex new file mode 100644 index 000000000..1377af16e --- /dev/null +++ b/lib/pleroma/data_migration.ex @@ -0,0 +1,45 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.DataMigration do + use Ecto.Schema + + alias Pleroma.DataMigration + alias Pleroma.DataMigration.State + alias Pleroma.Repo + + import Ecto.Changeset + import Ecto.Query + + schema "data_migrations" do + field(:name, :string) + field(:state, State, default: :pending) + field(:feature_lock, :boolean, default: false) + field(:params, :map, default: %{}) + field(:data, :map, default: %{}) + + timestamps() + end + + def changeset(data_migration, params \\ %{}) do + data_migration + |> cast(params, [:name, :state, :feature_lock, :params, :data]) + |> validate_required([:name]) + |> unique_constraint(:name) + end + + def update_one_by_id(id, params \\ %{}) do + with {1, _} <- + from(dm in DataMigration, where: dm.id == ^id) + |> Repo.update_all(set: params) do + :ok + end + end + + def get_by_name(name) do + Repo.get_by(DataMigration, name: name) + end + + def populate_hashtags_table, do: get_by_name("populate_hashtags_table") +end diff --git a/lib/pleroma/delivery.ex b/lib/pleroma/delivery.ex index e8d536767..511d5cf58 100644 --- a/lib/pleroma/delivery.ex +++ b/lib/pleroma/delivery.ex @@ -9,7 +9,6 @@ defmodule Pleroma.Delivery do alias Pleroma.Object alias Pleroma.Repo alias Pleroma.User - alias Pleroma.User import Ecto.Changeset import Ecto.Query diff --git a/lib/pleroma/ecto_enums.ex b/lib/pleroma/ecto_enums.ex index f198cccb7..2a9addabc 100644 --- a/lib/pleroma/ecto_enums.ex +++ b/lib/pleroma/ecto_enums.ex @@ -17,3 +17,11 @@ defenum(Pleroma.FollowingRelationship.State, follow_accept: 2, follow_reject: 3 ) + +defenum(Pleroma.DataMigration.State, + pending: 1, + running: 2, + complete: 3, + failed: 4, + manual: 5 +) diff --git a/lib/pleroma/hashtag.ex b/lib/pleroma/hashtag.ex new file mode 100644 index 000000000..53e2e9c89 --- /dev/null +++ b/lib/pleroma/hashtag.ex @@ -0,0 +1,106 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Hashtag do + use Ecto.Schema + + import Ecto.Changeset + import Ecto.Query + + alias Ecto.Multi + alias Pleroma.Hashtag + alias Pleroma.Object + alias Pleroma.Repo + + schema "hashtags" do + field(:name, :string) + + many_to_many(:objects, Object, join_through: "hashtags_objects", on_replace: :delete) + + timestamps() + end + + def normalize_name(name) do + name + |> String.downcase() + |> String.trim() + end + + def get_or_create_by_name(name) do + changeset = changeset(%Hashtag{}, %{name: name}) + + Repo.insert( + changeset, + on_conflict: [set: [name: get_field(changeset, :name)]], + conflict_target: :name, + returning: true + ) + end + + def get_or_create_by_names(names) when is_list(names) do + names = Enum.map(names, &normalize_name/1) + timestamp = NaiveDateTime.truncate(NaiveDateTime.utc_now(), :second) + + structs = + Enum.map(names, fn name -> + %Hashtag{} + |> changeset(%{name: name}) + |> Map.get(:changes) + |> Map.merge(%{inserted_at: timestamp, updated_at: timestamp}) + end) + + try do + with {:ok, %{query_op: hashtags}} <- + Multi.new() + |> Multi.insert_all(:insert_all_op, Hashtag, structs, + on_conflict: :nothing, + conflict_target: :name + ) + |> Multi.run(:query_op, fn _repo, _changes -> + {:ok, Repo.all(from(ht in Hashtag, where: ht.name in ^names))} + end) + |> Repo.transaction() do + {:ok, hashtags} + else + {:error, _name, value, _changes_so_far} -> {:error, value} + end + rescue + e -> {:error, e} + end + end + + def changeset(%Hashtag{} = struct, params) do + struct + |> cast(params, [:name]) + |> update_change(:name, &normalize_name/1) + |> validate_required([:name]) + |> unique_constraint(:name) + end + + def unlink(%Object{id: object_id}) do + with {_, hashtag_ids} <- + from(hto in "hashtags_objects", + where: hto.object_id == ^object_id, + select: hto.hashtag_id + ) + |> Repo.delete_all(), + {:ok, unreferenced_count} <- delete_unreferenced(hashtag_ids) do + {:ok, length(hashtag_ids), unreferenced_count} + end + end + + @delete_unreferenced_query """ + DELETE FROM hashtags WHERE id IN + (SELECT hashtags.id FROM hashtags + LEFT OUTER JOIN hashtags_objects + ON hashtags_objects.hashtag_id = hashtags.id + WHERE hashtags_objects.hashtag_id IS NULL AND hashtags.id = ANY($1)); + """ + + def delete_unreferenced(ids) do + with {:ok, %{num_rows: deleted_count}} <- Repo.query(@delete_unreferenced_query, [ids]) do + {:ok, deleted_count} + end + end +end diff --git a/lib/pleroma/migrators/hashtags_table_migrator.ex b/lib/pleroma/migrators/hashtags_table_migrator.ex new file mode 100644 index 000000000..b84058e11 --- /dev/null +++ b/lib/pleroma/migrators/hashtags_table_migrator.ex @@ -0,0 +1,208 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Migrators.HashtagsTableMigrator do + defmodule State do + use Pleroma.Migrators.Support.BaseMigratorState + + @impl Pleroma.Migrators.Support.BaseMigratorState + defdelegate data_migration(), to: Pleroma.DataMigration, as: :populate_hashtags_table + end + + use Pleroma.Migrators.Support.BaseMigrator + + alias Pleroma.Hashtag + alias Pleroma.Migrators.Support.BaseMigrator + alias Pleroma.Object + + @impl BaseMigrator + def feature_config_path, do: [:features, :improved_hashtag_timeline] + + @impl BaseMigrator + def fault_rate_allowance, do: Config.get([:populate_hashtags_table, :fault_rate_allowance], 0) + + @impl BaseMigrator + def perform do + data_migration_id = data_migration_id() + max_processed_id = get_stat(:max_processed_id, 0) + + Logger.info("Transferring embedded hashtags to `hashtags` (from oid: #{max_processed_id})...") + + query() + |> where([object], object.id > ^max_processed_id) + |> Repo.chunk_stream(100, :batches, timeout: :infinity) + |> Stream.each(fn objects -> + object_ids = Enum.map(objects, & &1.id) + + results = Enum.map(objects, &transfer_object_hashtags(&1)) + + failed_ids = + results + |> Enum.filter(&(elem(&1, 0) == :error)) + |> Enum.map(&elem(&1, 1)) + + # Count of objects with hashtags: `{:noop, id}` is returned for objects having other AS2 tags + chunk_affected_count = + results + |> Enum.filter(&(elem(&1, 0) == :ok)) + |> length() + + for failed_id <- failed_ids do + _ = + Repo.query( + "INSERT INTO data_migration_failed_ids(data_migration_id, record_id) " <> + "VALUES ($1, $2) ON CONFLICT DO NOTHING;", + [data_migration_id, failed_id] + ) + end + + _ = + Repo.query( + "DELETE FROM data_migration_failed_ids " <> + "WHERE data_migration_id = $1 AND record_id = ANY($2)", + [data_migration_id, object_ids -- failed_ids] + ) + + max_object_id = Enum.at(object_ids, -1) + + put_stat(:max_processed_id, max_object_id) + increment_stat(:iteration_processed_count, length(object_ids)) + increment_stat(:processed_count, length(object_ids)) + increment_stat(:failed_count, length(failed_ids)) + increment_stat(:affected_count, chunk_affected_count) + put_stat(:records_per_second, records_per_second()) + persist_state() + + # A quick and dirty approach to controlling the load this background migration imposes + sleep_interval = Config.get([:populate_hashtags_table, :sleep_interval_ms], 0) + Process.sleep(sleep_interval) + end) + |> Stream.run() + end + + @impl BaseMigrator + def query do + # Note: most objects have Mention-type AS2 tags and no hashtags (but we can't filter them out) + # Note: not checking activity type, expecting remove_non_create_objects_hashtags/_ to clean up + from( + object in Object, + where: + fragment("(?)->'tag' IS NOT NULL AND (?)->'tag' != '[]'::jsonb", object.data, object.data), + select: %{ + id: object.id, + tag: fragment("(?)->'tag'", object.data) + } + ) + |> join(:left, [o], hashtags_objects in fragment("SELECT object_id FROM hashtags_objects"), + on: hashtags_objects.object_id == o.id + ) + |> where([_o, hashtags_objects], is_nil(hashtags_objects.object_id)) + end + + @spec transfer_object_hashtags(Map.t()) :: {:noop | :ok | :error, integer()} + defp transfer_object_hashtags(object) do + embedded_tags = if Map.has_key?(object, :tag), do: object.tag, else: object.data["tag"] + hashtags = Object.object_data_hashtags(%{"tag" => embedded_tags}) + + if Enum.any?(hashtags) do + transfer_object_hashtags(object, hashtags) + else + {:noop, object.id} + end + end + + defp transfer_object_hashtags(object, hashtags) do + Repo.transaction(fn -> + with {:ok, hashtag_records} <- Hashtag.get_or_create_by_names(hashtags) do + maps = Enum.map(hashtag_records, &%{hashtag_id: &1.id, object_id: object.id}) + base_error = "ERROR when inserting hashtags_objects for object with id #{object.id}" + + try do + with {rows_count, _} when is_integer(rows_count) <- + Repo.insert_all("hashtags_objects", maps, on_conflict: :nothing) do + object.id + else + e -> + Logger.error("#{base_error}: #{inspect(e)}") + Repo.rollback(object.id) + end + rescue + e -> + Logger.error("#{base_error}: #{inspect(e)}") + Repo.rollback(object.id) + end + else + e -> + error = "ERROR: could not create hashtags for object #{object.id}: #{inspect(e)}" + Logger.error(error) + Repo.rollback(object.id) + end + end) + end + + @impl BaseMigrator + def retry_failed do + data_migration_id = data_migration_id() + + failed_objects_query() + |> Repo.chunk_stream(100, :one) + |> Stream.each(fn object -> + with {res, _} when res != :error <- transfer_object_hashtags(object) do + _ = + Repo.query( + "DELETE FROM data_migration_failed_ids " <> + "WHERE data_migration_id = $1 AND record_id = $2", + [data_migration_id, object.id] + ) + end + end) + |> Stream.run() + + put_stat(:failed_count, failures_count()) + persist_state() + + force_continue() + end + + defp failed_objects_query do + from(o in Object) + |> join(:inner, [o], dmf in fragment("SELECT * FROM data_migration_failed_ids"), + on: dmf.record_id == o.id + ) + |> where([_o, dmf], dmf.data_migration_id == ^data_migration_id()) + |> order_by([o], asc: o.id) + end + + @doc """ + Service func to delete `hashtags_objects` for legacy objects not associated with Create activity. + Also deletes unreferenced `hashtags` records (might occur after deletion of `hashtags_objects`). + """ + def delete_non_create_activities_hashtags do + hashtags_objects_cleanup_query = """ + DELETE FROM hashtags_objects WHERE object_id IN + (SELECT DISTINCT objects.id FROM objects + JOIN hashtags_objects ON hashtags_objects.object_id = objects.id LEFT JOIN activities + ON COALESCE(activities.data->'object'->>'id', activities.data->>'object') = + (objects.data->>'id') + AND activities.data->>'type' = 'Create' + WHERE activities.id IS NULL); + """ + + hashtags_cleanup_query = """ + DELETE FROM hashtags WHERE id IN + (SELECT hashtags.id FROM hashtags + LEFT OUTER JOIN hashtags_objects + ON hashtags_objects.hashtag_id = hashtags.id + WHERE hashtags_objects.hashtag_id IS NULL); + """ + + {:ok, %{num_rows: hashtags_objects_count}} = + Repo.query(hashtags_objects_cleanup_query, [], timeout: :infinity) + + {:ok, %{num_rows: hashtags_count}} = + Repo.query(hashtags_cleanup_query, [], timeout: :infinity) + + {:ok, hashtags_objects_count, hashtags_count} + end +end diff --git a/lib/pleroma/migrators/support/base_migrator.ex b/lib/pleroma/migrators/support/base_migrator.ex new file mode 100644 index 000000000..1f8a5402b --- /dev/null +++ b/lib/pleroma/migrators/support/base_migrator.ex @@ -0,0 +1,210 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Migrators.Support.BaseMigrator do + @moduledoc """ + Base background migrator functionality. + """ + + @callback perform() :: any() + @callback retry_failed() :: any() + @callback feature_config_path() :: list(atom()) + @callback query() :: Ecto.Query.t() + @callback fault_rate_allowance() :: integer() | float() + + defmacro __using__(_opts) do + quote do + use GenServer + + require Logger + + import Ecto.Query + + alias __MODULE__.State + alias Pleroma.Config + alias Pleroma.Repo + + @behaviour Pleroma.Migrators.Support.BaseMigrator + + defdelegate data_migration(), to: State + defdelegate data_migration_id(), to: State + defdelegate state(), to: State + defdelegate persist_state(), to: State, as: :persist_to_db + defdelegate get_stat(key, value \\ nil), to: State, as: :get_data_key + defdelegate put_stat(key, value), to: State, as: :put_data_key + defdelegate increment_stat(key, increment), to: State, as: :increment_data_key + + @reg_name {:global, __MODULE__} + + def whereis, do: GenServer.whereis(@reg_name) + + def start_link(_) do + case whereis() do + nil -> + GenServer.start_link(__MODULE__, nil, name: @reg_name) + + pid -> + {:ok, pid} + end + end + + @impl true + def init(_) do + {:ok, nil, {:continue, :init_state}} + end + + @impl true + def handle_continue(:init_state, _state) do + {:ok, _} = State.start_link(nil) + + data_migration = data_migration() + manual_migrations = Config.get([:instance, :manual_data_migrations], []) + + cond do + Config.get(:env) == :test -> + update_status(:noop) + + is_nil(data_migration) -> + message = "Data migration does not exist." + update_status(:failed, message) + Logger.error("#{__MODULE__}: #{message}") + + data_migration.state == :manual or data_migration.name in manual_migrations -> + message = "Data migration is in manual execution or manual fix mode." + update_status(:manual, message) + Logger.warn("#{__MODULE__}: #{message}") + + data_migration.state == :complete -> + on_complete(data_migration) + + true -> + send(self(), :perform) + end + + {:noreply, nil} + end + + @impl true + def handle_info(:perform, state) do + State.reinit() + + update_status(:running) + put_stat(:iteration_processed_count, 0) + put_stat(:started_at, NaiveDateTime.utc_now()) + + perform() + + fault_rate = fault_rate() + put_stat(:fault_rate, fault_rate) + fault_rate_allowance = fault_rate_allowance() + + cond do + fault_rate == 0 -> + set_complete() + + is_float(fault_rate) and fault_rate <= fault_rate_allowance -> + message = """ + Done with fault rate of #{fault_rate} which doesn't exceed #{fault_rate_allowance}. + Putting data migration to manual fix mode. Try running `#{__MODULE__}.retry_failed/0`. + """ + + Logger.warn("#{__MODULE__}: #{message}") + update_status(:manual, message) + on_complete(data_migration()) + + true -> + message = "Too many failures. Try running `#{__MODULE__}.retry_failed/0`." + Logger.error("#{__MODULE__}: #{message}") + update_status(:failed, message) + end + + persist_state() + {:noreply, state} + end + + defp on_complete(data_migration) do + if data_migration.feature_lock || feature_state() == :disabled do + Logger.warn( + "#{__MODULE__}: migration complete but feature is locked; consider enabling." + ) + + :noop + else + Config.put(feature_config_path(), :enabled) + :ok + end + end + + @doc "Approximate count for current iteration (including processed records count)" + def count(force \\ false, timeout \\ :infinity) do + stored_count = get_stat(:count) + + if stored_count && !force do + stored_count + else + processed_count = get_stat(:processed_count, 0) + max_processed_id = get_stat(:max_processed_id, 0) + query = where(query(), [entity], entity.id > ^max_processed_id) + + count = Repo.aggregate(query, :count, :id, timeout: timeout) + processed_count + put_stat(:count, count) + persist_state() + + count + end + end + + def failures_count do + with {:ok, %{rows: [[count]]}} <- + Repo.query( + "SELECT COUNT(record_id) FROM data_migration_failed_ids WHERE data_migration_id = $1;", + [data_migration_id()] + ) do + count + end + end + + def feature_state, do: Config.get(feature_config_path()) + + def force_continue do + send(whereis(), :perform) + end + + def force_restart do + :ok = State.reset() + force_continue() + end + + def set_complete do + update_status(:complete) + persist_state() + on_complete(data_migration()) + end + + defp update_status(status, message \\ nil) do + put_stat(:state, status) + put_stat(:message, message) + end + + defp fault_rate do + with failures_count when is_integer(failures_count) <- failures_count() do + failures_count / Enum.max([get_stat(:affected_count, 0), 1]) + else + _ -> :error + end + end + + defp records_per_second do + get_stat(:iteration_processed_count, 0) / Enum.max([running_time(), 1]) + end + + defp running_time do + NaiveDateTime.diff( + NaiveDateTime.utc_now(), + get_stat(:started_at, NaiveDateTime.utc_now()) + ) + end + end + end +end diff --git a/lib/pleroma/migrators/support/base_migrator_state.ex b/lib/pleroma/migrators/support/base_migrator_state.ex new file mode 100644 index 000000000..b698587f2 --- /dev/null +++ b/lib/pleroma/migrators/support/base_migrator_state.ex @@ -0,0 +1,117 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Migrators.Support.BaseMigratorState do + @moduledoc """ + Base background migrator state functionality. + """ + + @callback data_migration() :: Pleroma.DataMigration.t() + + defmacro __using__(_opts) do + quote do + use Agent + + alias Pleroma.DataMigration + + @behaviour Pleroma.Migrators.Support.BaseMigratorState + @reg_name {:global, __MODULE__} + + def start_link(_) do + Agent.start_link(fn -> load_state_from_db() end, name: @reg_name) + end + + def data_migration, do: raise("data_migration/0 is not implemented") + defoverridable data_migration: 0 + + defp load_state_from_db do + data_migration = data_migration() + + data = + if data_migration do + Map.new(data_migration.data, fn {k, v} -> {String.to_atom(k), v} end) + else + %{} + end + + %{ + data_migration_id: data_migration && data_migration.id, + data: data + } + end + + def persist_to_db do + %{data_migration_id: data_migration_id, data: data} = state() + + if data_migration_id do + DataMigration.update_one_by_id(data_migration_id, data: data) + else + {:error, :nil_data_migration_id} + end + end + + def reset do + %{data_migration_id: data_migration_id} = state() + + with false <- is_nil(data_migration_id), + :ok <- + DataMigration.update_one_by_id(data_migration_id, + state: :pending, + data: %{} + ) do + reinit() + else + true -> {:error, :nil_data_migration_id} + e -> e + end + end + + def reinit do + Agent.update(@reg_name, fn _state -> load_state_from_db() end) + end + + def state do + Agent.get(@reg_name, & &1) + end + + def get_data_key(key, default \\ nil) do + get_in(state(), [:data, key]) || default + end + + def put_data_key(key, value) do + _ = persist_non_data_change(key, value) + + Agent.update(@reg_name, fn state -> + put_in(state, [:data, key], value) + end) + end + + def increment_data_key(key, increment \\ 1) do + Agent.update(@reg_name, fn state -> + initial_value = get_in(state, [:data, key]) || 0 + updated_value = initial_value + increment + put_in(state, [:data, key], updated_value) + end) + end + + defp persist_non_data_change(:state, value) do + with true <- get_data_key(:state) != value, + true <- value in Pleroma.DataMigration.State.__valid_values__(), + %{data_migration_id: data_migration_id} when not is_nil(data_migration_id) <- + state() do + DataMigration.update_one_by_id(data_migration_id, state: value) + else + false -> :ok + _ -> {:error, :nil_data_migration_id} + end + end + + defp persist_non_data_change(_, _) do + nil + end + + def data_migration_id, do: Map.get(state(), :data_migration_id) + end + end +end diff --git a/lib/pleroma/object.ex b/lib/pleroma/object.ex index aaf123840..3ba749d1a 100644 --- a/lib/pleroma/object.ex +++ b/lib/pleroma/object.ex @@ -10,6 +10,7 @@ defmodule Pleroma.Object do alias Pleroma.Activity alias Pleroma.Config + alias Pleroma.Hashtag alias Pleroma.Object alias Pleroma.Object.Fetcher alias Pleroma.ObjectTombstone @@ -28,6 +29,8 @@ defmodule Pleroma.Object do schema "objects" do field(:data, :map) + many_to_many(:hashtags, Hashtag, join_through: "hashtags_objects", on_replace: :delete) + timestamps() end @@ -49,7 +52,8 @@ defmodule Pleroma.Object do end def create(data) do - Object.change(%Object{}, %{data: data}) + %Object{} + |> Object.change(%{data: data}) |> Repo.insert() end @@ -58,8 +62,41 @@ defmodule Pleroma.Object do |> cast(params, [:data]) |> validate_required([:data]) |> unique_constraint(:ap_id, name: :objects_unique_apid_index) + # Expecting `maybe_handle_hashtags_change/1` to run last: + |> maybe_handle_hashtags_change(struct) + end + + # Note: not checking activity type (assuming non-legacy objects are associated with Create act.) + defp maybe_handle_hashtags_change(changeset, struct) do + with %Ecto.Changeset{valid?: true} <- changeset, + data_hashtags_change = get_change(changeset, :data), + {_, true} <- {:changed, hashtags_changed?(struct, data_hashtags_change)}, + {:ok, hashtag_records} <- + data_hashtags_change + |> object_data_hashtags() + |> Hashtag.get_or_create_by_names() do + put_assoc(changeset, :hashtags, hashtag_records) + else + %{valid?: false} -> + changeset + + {:changed, false} -> + changeset + + {:error, _} -> + validate_change(changeset, :data, fn _, _ -> + [data: "error referencing hashtags"] + end) + end + end + + defp hashtags_changed?(%Object{} = struct, %{"tag" => _} = data) do + Enum.sort(embedded_hashtags(struct)) != + Enum.sort(object_data_hashtags(data)) end + defp hashtags_changed?(_, _), do: false + def get_by_id(nil), do: nil def get_by_id(id), do: Repo.get(Object, id) @@ -187,9 +224,13 @@ defmodule Pleroma.Object do def swap_object_with_tombstone(object) do tombstone = make_tombstone(object) - object - |> Object.change(%{data: tombstone}) - |> Repo.update() + with {:ok, object} <- + object + |> Object.change(%{data: tombstone}) + |> Repo.update() do + Hashtag.unlink(object) + {:ok, object} + end end def delete(%Object{data: %{"id" => id}} = object) do @@ -349,4 +390,39 @@ defmodule Pleroma.Object do def self_replies(object, opts \\ []), do: replies(object, Keyword.put(opts, :self_only, true)) + + def tags(%Object{data: %{"tag" => tags}}) when is_list(tags), do: tags + + def tags(_), do: [] + + def hashtags(%Object{} = object) do + # Note: always using embedded hashtags regardless whether they are migrated to hashtags table + # (embedded hashtags stay in sync anyways, and we avoid extra joins and preload hassle) + embedded_hashtags(object) + end + + def embedded_hashtags(%Object{data: data}) do + object_data_hashtags(data) + end + + def embedded_hashtags(_), do: [] + + def object_data_hashtags(%{"tag" => tags}) when is_list(tags) do + tags + |> Enum.filter(fn + %{"type" => "Hashtag"} = data -> Map.has_key?(data, "name") + plain_text when is_bitstring(plain_text) -> true + _ -> false + end) + |> Enum.map(fn + %{"name" => "#" <> hashtag} -> String.downcase(hashtag) + %{"name" => hashtag} -> String.downcase(hashtag) + hashtag when is_bitstring(hashtag) -> String.downcase(hashtag) + end) + |> Enum.uniq() + # Note: "" elements (plain text) might occur in `data.tag` for incoming objects + |> Enum.filter(&(&1 not in [nil, ""])) + end + + def object_data_hashtags(_), do: [] end diff --git a/lib/pleroma/pagination.ex b/lib/pleroma/pagination.ex index 0d24e1010..33e45a0eb 100644 --- a/lib/pleroma/pagination.ex +++ b/lib/pleroma/pagination.ex @@ -93,6 +93,7 @@ defmodule Pleroma.Pagination do max_id: :string, offset: :integer, limit: :integer, + skip_extra_order: :boolean, skip_order: :boolean } @@ -114,6 +115,8 @@ defmodule Pleroma.Pagination do defp restrict(query, :order, %{skip_order: true}, _), do: query + defp restrict(%{order_bys: [_ | _]} = query, :order, %{skip_extra_order: true}, _), do: query + defp restrict(query, :order, %{min_id: _}, table_binding) do order_by( query, diff --git a/lib/pleroma/repo.ex b/lib/pleroma/repo.ex index 4556352d0..b8ea06e33 100644 --- a/lib/pleroma/repo.ex +++ b/lib/pleroma/repo.ex @@ -8,6 +8,8 @@ defmodule Pleroma.Repo do adapter: Ecto.Adapters.Postgres, migration_timestamps: [type: :naive_datetime_usec] + use Ecto.Explain + import Ecto.Query require Logger @@ -63,8 +65,8 @@ defmodule Pleroma.Repo do iex> Pleroma.Repo.chunk_stream(Pleroma.Activity.Queries.by_actor(ap_id), 500, :batches) """ @spec chunk_stream(Ecto.Query.t(), integer(), atom()) :: Enumerable.t() - def chunk_stream(query, chunk_size, returns_as \\ :one) do - # We don't actually need start and end funcitons of resource streaming, + def chunk_stream(query, chunk_size, returns_as \\ :one, query_options \\ []) do + # We don't actually need start and end functions of resource streaming, # but it seems to be the only way to not fetch records one-by-one and # have individual records be the elements of the stream, instead of # lists of records @@ -76,7 +78,7 @@ defmodule Pleroma.Repo do |> order_by(asc: :id) |> where([r], r.id > ^last_id) |> limit(^chunk_size) - |> all() + |> all(query_options) |> case do [] -> {:halt, last_id} diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index 5b45e2ca1..efbf92c70 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -10,6 +10,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do alias Pleroma.Conversation alias Pleroma.Conversation.Participation alias Pleroma.Filter + alias Pleroma.Hashtag alias Pleroma.Maps alias Pleroma.Notification alias Pleroma.Object @@ -465,6 +466,23 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do |> Repo.one() end + defp fetch_paginated_optimized(query, opts, pagination) do + # Note: tag-filtering funcs may apply "ORDER BY objects.id DESC", + # and extra sorting on "activities.id DESC NULLS LAST" would worse the query plan + opts = Map.put(opts, :skip_extra_order, true) + + Pagination.fetch_paginated(query, opts, pagination) + end + + def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do + list_memberships = Pleroma.List.memberships(opts[:user]) + + fetch_activities_query(recipients ++ list_memberships, opts) + |> fetch_paginated_optimized(opts, pagination) + |> Enum.reverse() + |> maybe_update_cc(list_memberships, opts[:user]) + end + @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()] def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do opts = Map.delete(opts, :user) @@ -472,7 +490,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do [Constants.as_public()] |> fetch_activities_query(opts) |> restrict_unlisted(opts) - |> Pagination.fetch_paginated(opts, pagination) + |> fetch_paginated_optimized(opts, pagination) end @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()] @@ -693,51 +711,143 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do defp restrict_since(query, _), do: query - defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do - raise "Can't use the child object without preloading!" + defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do + raise_on_missing_preload() + end + + defp restrict_embedded_tag_all(query, %{tag_all: [_ | _] = tag_all}) do + from( + [_activity, object] in query, + where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all) + ) + end + + defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do + restrict_embedded_tag_any(query, %{tag: tag}) + end + + defp restrict_embedded_tag_all(query, _), do: query + + defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do + raise_on_missing_preload() + end + + defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag_any}) do + from( + [_activity, object] in query, + where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag_any) + ) end - defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do + defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do + restrict_embedded_tag_any(query, %{tag: [tag]}) + end + + defp restrict_embedded_tag_any(query, _), do: query + + defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do + raise_on_missing_preload() + end + + defp restrict_embedded_tag_reject_any(query, %{tag_reject: [_ | _] = tag_reject}) do from( [_activity, object] in query, where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject) ) end - defp restrict_tag_reject(query, _), do: query + defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject}) + when is_binary(tag_reject) do + restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]}) + end + + defp restrict_embedded_tag_reject_any(query, _), do: query - defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do - raise "Can't use the child object without preloading!" + defp object_ids_query_for_tags(tags) do + from(hto in "hashtags_objects") + |> join(:inner, [hto], ht in Pleroma.Hashtag, on: hto.hashtag_id == ht.id) + |> where([hto, ht], ht.name in ^tags) + |> select([hto], hto.object_id) + |> distinct([hto], true) end - defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do + defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do + raise_on_missing_preload() + end + + defp restrict_hashtag_all(query, %{tag_all: [single_tag]}) do + restrict_hashtag_any(query, %{tag: single_tag}) + end + + defp restrict_hashtag_all(query, %{tag_all: [_ | _] = tags}) do from( [_activity, object] in query, - where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all) + where: + fragment( + """ + (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects + ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?) + AND hashtags_objects.object_id = ?) @> ? + """, + ^tags, + object.id, + ^tags + ) ) end - defp restrict_tag_all(query, _), do: query + defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do + restrict_hashtag_all(query, %{tag_all: [tag]}) + end - defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do - raise "Can't use the child object without preloading!" + defp restrict_hashtag_all(query, _), do: query + + defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do + raise_on_missing_preload() end - defp restrict_tag(query, %{tag: tag}) when is_list(tag) do + defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do + hashtag_ids = + from(ht in Hashtag, where: ht.name in ^tags, select: ht.id) + |> Repo.all() + + # Note: NO extra ordering should be done on "activities.id desc nulls last" for optimal plan from( [_activity, object] in query, - where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag) + join: hto in "hashtags_objects", + on: hto.object_id == object.id, + where: hto.hashtag_id in ^hashtag_ids, + distinct: [desc: object.id], + order_by: [desc: object.id] ) end - defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do + defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do + restrict_hashtag_any(query, %{tag: [tag]}) + end + + defp restrict_hashtag_any(query, _), do: query + + defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do + raise_on_missing_preload() + end + + defp restrict_hashtag_reject_any(query, %{tag_reject: [_ | _] = tags_reject}) do from( [_activity, object] in query, - where: fragment("(?)->'tag' \\? (?)", object.data, ^tag) + where: object.id not in subquery(object_ids_query_for_tags(tags_reject)) ) end - defp restrict_tag(query, _), do: query + defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do + restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]}) + end + + defp restrict_hashtag_reject_any(query, _), do: query + + defp raise_on_missing_preload do + raise "Can't use the child object without preloading!" + end defp restrict_recipients(query, [], _user), do: query @@ -1098,6 +1208,26 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do defp maybe_order(query, _), do: query + defp normalize_fetch_activities_query_opts(opts) do + Enum.reduce([:tag, :tag_all, :tag_reject], opts, fn key, opts -> + case opts[key] do + value when is_bitstring(value) -> + Map.put(opts, key, Hashtag.normalize_name(value)) + + value when is_list(value) -> + normalized_value = + value + |> Enum.map(&Hashtag.normalize_name/1) + |> Enum.uniq() + + Map.put(opts, key, normalized_value) + + _ -> + opts + end + end) + end + defp fetch_activities_query_ap_ids_ops(opts) do source_user = opts[:muting_user] ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: [] @@ -1121,6 +1251,8 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do end def fetch_activities_query(recipients, opts \\ %{}) do + opts = normalize_fetch_activities_query_opts(opts) + {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} = fetch_activities_query_ap_ids_ops(opts) @@ -1128,50 +1260,51 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do skip_thread_containment: Config.get([:instance, :skip_thread_containment]) } - Activity - |> maybe_preload_objects(opts) - |> maybe_preload_bookmarks(opts) - |> maybe_preload_report_notes(opts) - |> maybe_set_thread_muted_field(opts) - |> maybe_order(opts) - |> restrict_recipients(recipients, opts[:user]) - |> restrict_replies(opts) - |> restrict_tag(opts) - |> restrict_tag_reject(opts) - |> restrict_tag_all(opts) - |> restrict_since(opts) - |> restrict_local(opts) - |> restrict_remote(opts) - |> restrict_actor(opts) - |> restrict_type(opts) - |> restrict_state(opts) - |> restrict_favorited_by(opts) - |> restrict_blocked(restrict_blocked_opts) - |> restrict_muted(restrict_muted_opts) - |> restrict_filtered(opts) - |> restrict_media(opts) - |> restrict_visibility(opts) - |> restrict_thread_visibility(opts, config) - |> restrict_reblogs(opts) - |> restrict_pinned(opts) - |> restrict_muted_reblogs(restrict_muted_reblogs_opts) - |> restrict_instance(opts) - |> restrict_announce_object_actor(opts) - |> restrict_filtered(opts) - |> Activity.restrict_deactivated_users() - |> exclude_poll_votes(opts) - |> exclude_chat_messages(opts) - |> exclude_invisible_actors(opts) - |> exclude_visibility(opts) - end - - def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do - list_memberships = Pleroma.List.memberships(opts[:user]) - - fetch_activities_query(recipients ++ list_memberships, opts) - |> Pagination.fetch_paginated(opts, pagination) - |> Enum.reverse() - |> maybe_update_cc(list_memberships, opts[:user]) + query = + Activity + |> maybe_preload_objects(opts) + |> maybe_preload_bookmarks(opts) + |> maybe_preload_report_notes(opts) + |> maybe_set_thread_muted_field(opts) + |> maybe_order(opts) + |> restrict_recipients(recipients, opts[:user]) + |> restrict_replies(opts) + |> restrict_since(opts) + |> restrict_local(opts) + |> restrict_remote(opts) + |> restrict_actor(opts) + |> restrict_type(opts) + |> restrict_state(opts) + |> restrict_favorited_by(opts) + |> restrict_blocked(restrict_blocked_opts) + |> restrict_muted(restrict_muted_opts) + |> restrict_filtered(opts) + |> restrict_media(opts) + |> restrict_visibility(opts) + |> restrict_thread_visibility(opts, config) + |> restrict_reblogs(opts) + |> restrict_pinned(opts) + |> restrict_muted_reblogs(restrict_muted_reblogs_opts) + |> restrict_instance(opts) + |> restrict_announce_object_actor(opts) + |> restrict_filtered(opts) + |> Activity.restrict_deactivated_users() + |> exclude_poll_votes(opts) + |> exclude_chat_messages(opts) + |> exclude_invisible_actors(opts) + |> exclude_visibility(opts) + + if Config.feature_enabled?(:improved_hashtag_timeline) do + query + |> restrict_hashtag_any(opts) + |> restrict_hashtag_all(opts) + |> restrict_hashtag_reject_any(opts) + else + query + |> restrict_embedded_tag_any(opts) + |> restrict_embedded_tag_all(opts) + |> restrict_embedded_tag_reject_any(opts) + end end @doc """ @@ -1250,21 +1383,17 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do defp get_actor_url(_url), do: nil - defp object_to_user_data(data) do - avatar = - data["icon"]["url"] && - %{ - "type" => "Image", - "url" => [%{"href" => data["icon"]["url"]}] - } + defp normalize_image(%{"url" => url}) do + %{ + "type" => "Image", + "url" => [%{"href" => url}] + } + end - banner = - data["image"]["url"] && - %{ - "type" => "Image", - "url" => [%{"href" => data["image"]["url"]}] - } + defp normalize_image(urls) when is_list(urls), do: urls |> List.first() |> normalize_image() + defp normalize_image(_), do: nil + defp object_to_user_data(data) do fields = data |> Map.get("attachment", []) @@ -1308,13 +1437,13 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do ap_id: data["id"], uri: get_actor_url(data["url"]), ap_enabled: true, - banner: banner, + banner: normalize_image(data["image"]), fields: fields, emoji: emojis, is_locked: is_locked, is_discoverable: is_discoverable, invisible: invisible, - avatar: avatar, + avatar: normalize_image(data["icon"]), name: data["name"], follower_address: data["followers"], following_address: data["following"], diff --git a/lib/pleroma/web/activity_pub/mrf.ex b/lib/pleroma/web/activity_pub/mrf.ex index ef5a09a93..f2fec3ff6 100644 --- a/lib/pleroma/web/activity_pub/mrf.ex +++ b/lib/pleroma/web/activity_pub/mrf.ex @@ -92,7 +92,9 @@ defmodule Pleroma.Web.ActivityPub.MRF do end def get_policies do - Pleroma.Config.get([:mrf, :policies], []) |> get_policies() + Pleroma.Config.get([:mrf, :policies], []) + |> get_policies() + |> Enum.concat([Pleroma.Web.ActivityPub.MRF.HashtagPolicy]) end defp get_policies(policy) when is_atom(policy), do: [policy] diff --git a/lib/pleroma/web/activity_pub/mrf/hashtag_policy.ex b/lib/pleroma/web/activity_pub/mrf/hashtag_policy.ex new file mode 100644 index 000000000..def0c437c --- /dev/null +++ b/lib/pleroma/web/activity_pub/mrf/hashtag_policy.ex @@ -0,0 +1,116 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Web.ActivityPub.MRF.HashtagPolicy do + require Pleroma.Constants + + alias Pleroma.Config + alias Pleroma.Object + + @moduledoc """ + Reject, TWKN-remove or Set-Sensitive messsages with specific hashtags (without the leading #) + + Note: This MRF Policy is always enabled, if you want to disable it you have to set empty lists. + """ + + @behaviour Pleroma.Web.ActivityPub.MRF + + defp check_reject(message, hashtags) do + if Enum.any?(Config.get([:mrf_hashtag, :reject]), fn match -> match in hashtags end) do + {:reject, "[HashtagPolicy] Matches with rejected keyword"} + else + {:ok, message} + end + end + + defp check_ftl_removal(%{"to" => to} = message, hashtags) do + if Pleroma.Constants.as_public() in to and + Enum.any?(Config.get([:mrf_hashtag, :federated_timeline_removal]), fn match -> + match in hashtags + end) do + to = List.delete(to, Pleroma.Constants.as_public()) + cc = [Pleroma.Constants.as_public() | message["cc"] || []] + + message = + message + |> Map.put("to", to) + |> Map.put("cc", cc) + |> Kernel.put_in(["object", "to"], to) + |> Kernel.put_in(["object", "cc"], cc) + + {:ok, message} + else + {:ok, message} + end + end + + defp check_ftl_removal(message, _hashtags), do: {:ok, message} + + defp check_sensitive(message, hashtags) do + if Enum.any?(Config.get([:mrf_hashtag, :sensitive]), fn match -> match in hashtags end) do + {:ok, Kernel.put_in(message, ["object", "sensitive"], true)} + else + {:ok, message} + end + end + + @impl true + def filter(%{"type" => "Create", "object" => object} = message) do + hashtags = Object.hashtags(%Object{data: object}) + + if hashtags != [] do + with {:ok, message} <- check_reject(message, hashtags), + {:ok, message} <- check_ftl_removal(message, hashtags), + {:ok, message} <- check_sensitive(message, hashtags) do + {:ok, message} + end + else + {:ok, message} + end + end + + @impl true + def filter(message), do: {:ok, message} + + @impl true + def describe do + mrf_hashtag = + Config.get(:mrf_hashtag) + |> Enum.into(%{}) + + {:ok, %{mrf_hashtag: mrf_hashtag}} + end + + @impl true + def config_description do + %{ + key: :mrf_hashtag, + related_policy: "Pleroma.Web.ActivityPub.MRF.HashtagPolicy", + label: "MRF Hashtag", + description: @moduledoc, + children: [ + %{ + key: :reject, + type: {:list, :string}, + description: "A list of hashtags which result in message being rejected.", + suggestions: ["foo"] + }, + %{ + key: :federated_timeline_removal, + type: {:list, :string}, + description: + "A list of hashtags which result in message being removed from federated timelines (a.k.a unlisted).", + suggestions: ["foo"] + }, + %{ + key: :sensitive, + type: {:list, :string}, + description: + "A list of hashtags which result in message being set as sensitive (a.k.a NSFW/R-18)", + suggestions: ["nsfw", "r18"] + } + ] + } + end +end diff --git a/lib/pleroma/web/activity_pub/mrf/simple_policy.ex b/lib/pleroma/web/activity_pub/mrf/simple_policy.ex index bb3838d2c..62024c58c 100644 --- a/lib/pleroma/web/activity_pub/mrf/simple_policy.ex +++ b/lib/pleroma/web/activity_pub/mrf/simple_policy.ex @@ -64,20 +64,16 @@ defmodule Pleroma.Web.ActivityPub.MRF.SimplePolicy do %{host: actor_host} = _actor_info, %{ "type" => "Create", - "object" => child_object + "object" => %{} = _child_object } = object - ) - when is_map(child_object) do + ) do media_nsfw = Config.get([:mrf_simple, :media_nsfw]) |> MRF.subdomains_regex() object = if MRF.subdomain_match?(media_nsfw, actor_host) do - tags = (child_object["tag"] || []) ++ ["nsfw"] - child_object = Map.put(child_object, "tag", tags) - child_object = Map.put(child_object, "sensitive", true) - Map.put(object, "object", child_object) + Kernel.put_in(object, ["object", "sensitive"], true) else object end diff --git a/lib/pleroma/web/activity_pub/mrf/tag_policy.ex b/lib/pleroma/web/activity_pub/mrf/tag_policy.ex index 5739cee63..528093ac0 100644 --- a/lib/pleroma/web/activity_pub/mrf/tag_policy.ex +++ b/lib/pleroma/web/activity_pub/mrf/tag_policy.ex @@ -28,20 +28,11 @@ defmodule Pleroma.Web.ActivityPub.MRF.TagPolicy do "mrf_tag:media-force-nsfw", %{ "type" => "Create", - "object" => %{"attachment" => child_attachment} = object + "object" => %{"attachment" => child_attachment} } = message ) when length(child_attachment) > 0 do - tags = (object["tag"] || []) ++ ["nsfw"] - - object = - object - |> Map.put("tag", tags) - |> Map.put("sensitive", true) - - message = Map.put(message, "object", object) - - {:ok, message} + {:ok, Kernel.put_in(message, ["object", "sensitive"], true)} end defp process_tag( diff --git a/lib/pleroma/web/activity_pub/transmogrifier.ex b/lib/pleroma/web/activity_pub/transmogrifier.ex index 4d9a5617e..8c7d6a747 100644 --- a/lib/pleroma/web/activity_pub/transmogrifier.ex +++ b/lib/pleroma/web/activity_pub/transmogrifier.ex @@ -32,18 +32,17 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do """ def fix_object(object, options \\ []) do object - |> strip_internal_fields - |> fix_actor - |> fix_url - |> fix_attachments - |> fix_context + |> strip_internal_fields() + |> fix_actor() + |> fix_url() + |> fix_attachments() + |> fix_context() |> fix_in_reply_to(options) - |> fix_emoji - |> fix_tag - |> set_sensitive - |> fix_content_map - |> fix_addressing - |> fix_summary + |> fix_emoji() + |> fix_tag() + |> fix_content_map() + |> fix_addressing() + |> fix_summary() |> fix_type(options) end @@ -315,10 +314,9 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do tags = tag |> Enum.filter(fn data -> data["type"] == "Hashtag" and data["name"] end) - |> Enum.map(fn %{"name" => name} -> - name - |> String.slice(1..-1) - |> String.downcase() + |> Enum.map(fn + %{"name" => "#" <> hashtag} -> String.downcase(hashtag) + %{"name" => hashtag} -> String.downcase(hashtag) end) Map.put(object, "tag", tag ++ tags) @@ -742,7 +740,6 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do # Prepares the object of an outgoing create activity. def prepare_object(object) do object - |> set_sensitive |> add_hashtags |> add_mention_tags |> add_emoji_tags @@ -933,15 +930,6 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do Map.put(object, "conversation", object["context"]) end - def set_sensitive(%{"sensitive" => _} = object) do - object - end - - def set_sensitive(object) do - tags = object["tag"] || [] - Map.put(object, "sensitive", "nsfw" in tags) - end - def set_type(%{"type" => "Answer"} = object) do Map.put(object, "type", "Note") end diff --git a/lib/pleroma/web/api_spec/cast_and_validate.ex b/lib/pleroma/web/api_spec/cast_and_validate.ex index a3da856ff..d23a7dcb6 100644 --- a/lib/pleroma/web/api_spec/cast_and_validate.ex +++ b/lib/pleroma/web/api_spec/cast_and_validate.ex @@ -15,6 +15,7 @@ defmodule Pleroma.Web.ApiSpec.CastAndValidate do @behaviour Plug + alias OpenApiSpex.Plug.PutApiSpec alias Plug.Conn @impl Plug @@ -25,12 +26,10 @@ defmodule Pleroma.Web.ApiSpec.CastAndValidate do end @impl Plug - def call(%{private: %{open_api_spex: private_data}} = conn, %{ - operation_id: operation_id, - render_error: render_error - }) do - spec = private_data.spec - operation = private_data.operation_lookup[operation_id] + + def call(conn, %{operation_id: operation_id, render_error: render_error}) do + {spec, operation_lookup} = PutApiSpec.get_spec_and_operation_lookup(conn) + operation = operation_lookup[operation_id] content_type = case Conn.get_req_header(conn, "content-type") do @@ -43,8 +42,7 @@ defmodule Pleroma.Web.ApiSpec.CastAndValidate do "application/json" end - private_data = Map.put(private_data, :operation_id, operation_id) - conn = Conn.put_private(conn, :open_api_spex, private_data) + conn = Conn.put_private(conn, :operation_id, operation_id) case cast_and_validate(spec, operation, conn, content_type, strict?()) do {:ok, conn} -> @@ -64,25 +62,22 @@ defmodule Pleroma.Web.ApiSpec.CastAndValidate do private: %{ phoenix_controller: controller, phoenix_action: action, - open_api_spex: private_data + open_api_spex: %{spec_module: spec_module} } } = conn, opts ) do + {spec, operation_lookup} = PutApiSpec.get_spec_and_operation_lookup(conn) + operation = - case private_data.operation_lookup[{controller, action}] do + case operation_lookup[{controller, action}] do nil -> operation_id = controller.open_api_operation(action).operationId - operation = private_data.operation_lookup[operation_id] + operation = operation_lookup[operation_id] - operation_lookup = - private_data.operation_lookup - |> Map.put({controller, action}, operation) + operation_lookup = Map.put(operation_lookup, {controller, action}, operation) - OpenApiSpex.Plug.Cache.adapter().put( - private_data.spec_module, - {private_data.spec, operation_lookup} - ) + OpenApiSpex.Plug.Cache.adapter().put(spec_module, {spec, operation_lookup}) operation diff --git a/lib/pleroma/web/api_spec/operations/status_operation.ex b/lib/pleroma/web/api_spec/operations/status_operation.ex index 40edc747d..4bdb8e281 100644 --- a/lib/pleroma/web/api_spec/operations/status_operation.ex +++ b/lib/pleroma/web/api_spec/operations/status_operation.ex @@ -59,7 +59,7 @@ defmodule Pleroma.Web.ApiSpec.StatusOperation do Operation.response( "Status. When `scheduled_at` is present, ScheduledStatus is returned instead", "application/json", - %Schema{oneOf: [Status, ScheduledStatus]} + %Schema{anyOf: [Status, ScheduledStatus]} ), 422 => Operation.response("Bad Request / MRF Rejection", "application/json", ApiError) } diff --git a/lib/pleroma/web/api_spec/schemas/boolean_like.ex b/lib/pleroma/web/api_spec/schemas/boolean_like.ex index eb001c5bb..778158f66 100644 --- a/lib/pleroma/web/api_spec/schemas/boolean_like.ex +++ b/lib/pleroma/web/api_spec/schemas/boolean_like.ex @@ -3,6 +3,7 @@ # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Web.ApiSpec.Schemas.BooleanLike do + alias OpenApiSpex.Cast alias OpenApiSpex.Schema require OpenApiSpex @@ -27,10 +28,13 @@ defmodule Pleroma.Web.ApiSpec.Schemas.BooleanLike do %Schema{type: :boolean}, %Schema{type: :string}, %Schema{type: :integer} - ] + ], + "x-validate": __MODULE__ }) - def after_cast(value, _schmea) do - {:ok, Pleroma.Web.ControllerHelper.truthy_param?(value)} + def cast(%Cast{value: value} = context) do + context + |> Map.put(:value, Pleroma.Web.ControllerHelper.truthy_param?(value)) + |> Cast.ok() end end diff --git a/lib/pleroma/web/common_api/activity_draft.ex b/lib/pleroma/web/common_api/activity_draft.ex index 73f1b0931..8668b600e 100644 --- a/lib/pleroma/web/common_api/activity_draft.ex +++ b/lib/pleroma/web/common_api/activity_draft.ex @@ -179,7 +179,7 @@ defmodule Pleroma.Web.CommonAPI.ActivityDraft do end defp sensitive(draft) do - sensitive = draft.params[:sensitive] || Enum.member?(draft.tags, {"#nsfw", "nsfw"}) + sensitive = draft.params[:sensitive] %__MODULE__{draft | sensitive: sensitive} end diff --git a/lib/pleroma/web/common_api/utils.ex b/lib/pleroma/web/common_api/utils.ex index 9587dfa25..4e6a3feb0 100644 --- a/lib/pleroma/web/common_api/utils.ex +++ b/lib/pleroma/web/common_api/utils.ex @@ -217,7 +217,6 @@ defmodule Pleroma.Web.CommonAPI.Utils do draft.status |> format_input(content_type, options) |> maybe_add_attachments(draft.attachments, attachment_links) - |> maybe_add_nsfw_tag(draft.params) end defp get_content_type(content_type) do @@ -228,13 +227,6 @@ defmodule Pleroma.Web.CommonAPI.Utils do end end - defp maybe_add_nsfw_tag({text, mentions, tags}, %{"sensitive" => sensitive}) - when sensitive in [true, "True", "true", "1"] do - {text, mentions, [{"#nsfw", "nsfw"} | tags]} - end - - defp maybe_add_nsfw_tag(data, _), do: data - def make_context(_, %Participation{} = participation) do Repo.preload(participation, :conversation).conversation.ap_id end diff --git a/lib/pleroma/web/feed/feed_view.ex b/lib/pleroma/web/feed/feed_view.ex index df97d2f46..66940f311 100644 --- a/lib/pleroma/web/feed/feed_view.ex +++ b/lib/pleroma/web/feed/feed_view.ex @@ -32,6 +32,7 @@ defmodule Pleroma.Web.Feed.FeedView do %{ activity: activity, + object: object, data: Map.get(object, :data), actor: actor } diff --git a/lib/pleroma/web/mastodon_api/controllers/instance_controller.ex b/lib/pleroma/web/mastodon_api/controllers/instance_controller.ex index 267d0f03b..c7a5267d4 100644 --- a/lib/pleroma/web/mastodon_api/controllers/instance_controller.ex +++ b/lib/pleroma/web/mastodon_api/controllers/instance_controller.ex @@ -5,7 +5,7 @@ defmodule Pleroma.Web.MastodonAPI.InstanceController do use Pleroma.Web, :controller - plug(OpenApiSpex.Plug.CastAndValidate) + plug(Pleroma.Web.ApiSpec.CastAndValidate) plug( :skip_plug, diff --git a/lib/pleroma/web/mastodon_api/controllers/timeline_controller.ex b/lib/pleroma/web/mastodon_api/controllers/timeline_controller.ex index cef299aa4..c611958be 100644 --- a/lib/pleroma/web/mastodon_api/controllers/timeline_controller.ex +++ b/lib/pleroma/web/mastodon_api/controllers/timeline_controller.ex @@ -133,34 +133,25 @@ defmodule Pleroma.Web.MastodonAPI.TimelineController do end defp hashtag_fetching(params, user, local_only) do - tags = + # Note: not sanitizing tag options at this stage (may be mix-cased, have duplicates etc.) + tags_any = [params[:tag], params[:any]] |> List.flatten() - |> Enum.uniq() - |> Enum.reject(&is_nil/1) - |> Enum.map(&String.downcase/1) - - tag_all = - params - |> Map.get(:all, []) - |> Enum.map(&String.downcase/1) - - tag_reject = - params - |> Map.get(:none, []) - |> Enum.map(&String.downcase/1) - - _activities = - params - |> Map.put(:type, "Create") - |> Map.put(:local_only, local_only) - |> Map.put(:blocking_user, user) - |> Map.put(:muting_user, user) - |> Map.put(:user, user) - |> Map.put(:tag, tags) - |> Map.put(:tag_all, tag_all) - |> Map.put(:tag_reject, tag_reject) - |> ActivityPub.fetch_public_activities() + |> Enum.filter(& &1) + + tag_all = Map.get(params, :all, []) + tag_reject = Map.get(params, :none, []) + + params + |> Map.put(:type, "Create") + |> Map.put(:local_only, local_only) + |> Map.put(:blocking_user, user) + |> Map.put(:muting_user, user) + |> Map.put(:user, user) + |> Map.put(:tag, tags_any) + |> Map.put(:tag_all, tag_all) + |> Map.put(:tag_reject, tag_reject) + |> ActivityPub.fetch_public_activities() end # GET /api/v1/timelines/tag/:tag diff --git a/lib/pleroma/web/mastodon_api/views/status_view.ex b/lib/pleroma/web/mastodon_api/views/status_view.ex index 71f659ba0..3753588f2 100644 --- a/lib/pleroma/web/mastodon_api/views/status_view.ex +++ b/lib/pleroma/web/mastodon_api/views/status_view.ex @@ -198,8 +198,10 @@ defmodule Pleroma.Web.MastodonAPI.StatusView do like_count = object.data["like_count"] || 0 announcement_count = object.data["announcement_count"] || 0 - tags = object.data["tag"] || [] - sensitive = object.data["sensitive"] || Enum.member?(tags, "nsfw") + hashtags = Object.hashtags(object) + sensitive = object.data["sensitive"] || Enum.member?(hashtags, "nsfw") + + tags = Object.tags(object) tag_mentions = tags diff --git a/lib/pleroma/web/media_proxy.ex b/lib/pleroma/web/media_proxy.ex index 27f337138..d0d4bb4b3 100644 --- a/lib/pleroma/web/media_proxy.ex +++ b/lib/pleroma/web/media_proxy.ex @@ -121,6 +121,11 @@ defmodule Pleroma.Web.MediaProxy do end end + def decode_url(encoded) do + [_, "proxy", sig, base64 | _] = URI.parse(encoded).path |> String.split("/") + decode_url(sig, base64) + end + defp signed_url(url) do :crypto.hmac(:sha, Config.get([Web.Endpoint, :secret_key_base]), url) end diff --git a/lib/pleroma/web/pleroma_api/controllers/backup_controller.ex b/lib/pleroma/web/pleroma_api/controllers/backup_controller.ex index 315657e9c..fc5d16771 100644 --- a/lib/pleroma/web/pleroma_api/controllers/backup_controller.ex +++ b/lib/pleroma/web/pleroma_api/controllers/backup_controller.ex @@ -10,7 +10,7 @@ defmodule Pleroma.Web.PleromaAPI.BackupController do action_fallback(Pleroma.Web.MastodonAPI.FallbackController) plug(OAuthScopesPlug, %{scopes: ["read:accounts"]} when action in [:index, :create]) - plug(OpenApiSpex.Plug.CastAndValidate, render_error: Pleroma.Web.ApiSpec.RenderError) + plug(Pleroma.Web.ApiSpec.CastAndValidate) defdelegate open_api_operation(action), to: Pleroma.Web.ApiSpec.PleromaBackupOperation diff --git a/lib/pleroma/web/pleroma_api/controllers/chat_controller.ex b/lib/pleroma/web/pleroma_api/controllers/chat_controller.ex index 4adc685fe..dcd54b1af 100644 --- a/lib/pleroma/web/pleroma_api/controllers/chat_controller.ex +++ b/lib/pleroma/web/pleroma_api/controllers/chat_controller.ex @@ -38,7 +38,7 @@ defmodule Pleroma.Web.PleromaAPI.ChatController do %{scopes: ["read:chats"]} when action in [:messages, :index, :index2, :show] ) - plug(OpenApiSpex.Plug.CastAndValidate, render_error: Pleroma.Web.ApiSpec.RenderError) + plug(Pleroma.Web.ApiSpec.CastAndValidate) defdelegate open_api_operation(action), to: Pleroma.Web.ApiSpec.ChatOperation diff --git a/lib/pleroma/web/pleroma_api/controllers/user_import_controller.ex b/lib/pleroma/web/pleroma_api/controllers/user_import_controller.ex index 6d9a11fb6..078d470d9 100644 --- a/lib/pleroma/web/pleroma_api/controllers/user_import_controller.ex +++ b/lib/pleroma/web/pleroma_api/controllers/user_import_controller.ex @@ -15,7 +15,7 @@ defmodule Pleroma.Web.PleromaAPI.UserImportController do plug(OAuthScopesPlug, %{scopes: ["follow", "write:blocks"]} when action == :blocks) plug(OAuthScopesPlug, %{scopes: ["follow", "write:mutes"]} when action == :mutes) - plug(OpenApiSpex.Plug.CastAndValidate) + plug(Pleroma.Web.ApiSpec.CastAndValidate) defdelegate open_api_operation(action), to: ApiSpec.UserImportOperation def follow(%{body_params: %{list: %Plug.Upload{path: path}}} = conn, _) do diff --git a/lib/pleroma/web/templates/feed/feed/_activity.atom.eex b/lib/pleroma/web/templates/feed/feed/_activity.atom.eex index 3fd150c4e..6688830ba 100644 --- a/lib/pleroma/web/templates/feed/feed/_activity.atom.eex +++ b/lib/pleroma/web/templates/feed/feed/_activity.atom.eex @@ -22,7 +22,7 @@ <link type="text/html" href='<%= @data["external_url"] %>' rel="alternate"/> <% end %> - <%= for tag <- @data["tag"] || [] do %> + <%= for tag <- Pleroma.Object.hashtags(@object) do %> <category term="<%= tag %>"></category> <% end %> diff --git a/lib/pleroma/web/templates/feed/feed/_activity.rss.eex b/lib/pleroma/web/templates/feed/feed/_activity.rss.eex index 947bbb099..592b9dcdc 100644 --- a/lib/pleroma/web/templates/feed/feed/_activity.rss.eex +++ b/lib/pleroma/web/templates/feed/feed/_activity.rss.eex @@ -22,7 +22,7 @@ <link rel="ostatus:conversation"><%= activity_context(@activity) %></link> - <%= for tag <- @data["tag"] || [] do %> + <%= for tag <- Pleroma.Object.hashtags(@object) do %> <category term="<%= tag %>"></category> <% end %> diff --git a/lib/pleroma/web/templates/feed/feed/_tag_activity.atom.eex b/lib/pleroma/web/templates/feed/feed/_tag_activity.atom.eex index cf5874a91..c2de28fe4 100644 --- a/lib/pleroma/web/templates/feed/feed/_tag_activity.atom.eex +++ b/lib/pleroma/web/templates/feed/feed/_tag_activity.atom.eex @@ -41,7 +41,7 @@ <% end %> <% end %> - <%= for tag <- @data["tag"] || [] do %> + <%= for tag <- Pleroma.Object.hashtags(@object) do %> <category term="<%= tag %>"></category> <% end %> |