diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/pleroma/application.ex | 6 | ||||
-rw-r--r-- | lib/pleroma/user.ex | 66 | ||||
-rw-r--r-- | lib/pleroma/user/query.ex | 19 | ||||
-rw-r--r-- | lib/pleroma/user/synchronization.ex | 60 | ||||
-rw-r--r-- | lib/pleroma/user/synchronization_worker.ex | 32 |
5 files changed, 177 insertions, 6 deletions
diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex index ba4cf8486..86c348a0d 100644 --- a/lib/pleroma/application.ex +++ b/lib/pleroma/application.ex @@ -151,7 +151,11 @@ defmodule Pleroma.Application do start: {Pleroma.Web.Endpoint, :start_link, []}, type: :supervisor }, - %{id: Pleroma.Gopher.Server, start: {Pleroma.Gopher.Server, :start_link, []}} + %{id: Pleroma.Gopher.Server, start: {Pleroma.Gopher.Server, :start_link, []}}, + %{ + id: Pleroma.User.SynchronizationWorker, + start: {Pleroma.User.SynchronizationWorker, :start_link, []} + } ] # See http://elixir-lang.org/docs/stable/elixir/Supervisor.html diff --git a/lib/pleroma/user.ex b/lib/pleroma/user.ex index 09f86aaa2..d03810d1a 100644 --- a/lib/pleroma/user.ex +++ b/lib/pleroma/user.ex @@ -107,15 +107,25 @@ defmodule Pleroma.User do def ap_followers(%User{follower_address: fa}) when is_binary(fa), do: fa def ap_followers(%User{} = user), do: "#{ap_id(user)}/followers" - def user_info(%User{} = user) do + def user_info(%User{} = user, args \\ %{}) do + following_count = + if args[:following_count], do: args[:following_count], else: following_count(user) + + follower_count = + if args[:follower_count], do: args[:follower_count], else: user.info.follower_count + %{ - following_count: following_count(user), note_count: user.info.note_count, - follower_count: user.info.follower_count, locked: user.info.locked, confirmation_pending: user.info.confirmation_pending, default_scope: user.info.default_scope } + |> Map.put(:following_count, following_count) + |> Map.put(:follower_count, follower_count) + end + + def set_info_cache(user, args) do + Cachex.put(:user_cache, "user_info:#{user.id}", user_info(user, args)) end def restrict_deactivated(query) do @@ -1000,6 +1010,56 @@ defmodule Pleroma.User do ) end + @spec sync_follow_counter() :: :ok + def sync_follow_counter, + do: PleromaJobQueue.enqueue(:background, __MODULE__, [:sync_follow_counters]) + + @spec perform(:sync_follow_counters) :: :ok + def perform(:sync_follow_counters) do + {:ok, _pid} = Agent.start_link(fn -> %{} end, name: :domain_errors) + config = Pleroma.Config.get([:instance, :external_user_synchronization]) + + :ok = sync_follow_counters(config) + Agent.stop(:domain_errors) + end + + @spec sync_follow_counters(keyword()) :: :ok + def sync_follow_counters(opts \\ []) do + users = external_users(opts) + + if length(users) > 0 do + errors = Agent.get(:domain_errors, fn state -> state end) + {last, updated_errors} = User.Synchronization.call(users, errors, opts) + Agent.update(:domain_errors, fn _state -> updated_errors end) + sync_follow_counters(max_id: last.id, limit: opts[:limit]) + else + :ok + end + end + + @spec external_users(keyword()) :: [User.t()] + def external_users(opts \\ []) do + query = + User.Query.build(%{ + external: true, + active: true, + order_by: :id, + select: [:id, :ap_id, :info] + }) + + query = + if opts[:max_id], + do: where(query, [u], u.id > ^opts[:max_id]), + else: query + + query = + if opts[:limit], + do: limit(query, ^opts[:limit]), + else: query + + Repo.all(query) + end + def blocks_import(%User{} = blocker, blocked_identifiers) when is_list(blocked_identifiers), do: PleromaJobQueue.enqueue(:background, __MODULE__, [ diff --git a/lib/pleroma/user/query.ex b/lib/pleroma/user/query.ex index ace9c05f2..f9bcc9e19 100644 --- a/lib/pleroma/user/query.ex +++ b/lib/pleroma/user/query.ex @@ -7,7 +7,7 @@ defmodule Pleroma.User.Query do User query builder module. Builds query from new query or another user query. ## Example: - query = Pleroma.User.Query(%{nickname: "nickname"}) + query = Pleroma.User.Query.build(%{nickname: "nickname"}) another_query = Pleroma.User.Query.build(query, %{email: "email@example.com"}) Pleroma.Repo.all(query) Pleroma.Repo.all(another_query) @@ -47,7 +47,10 @@ defmodule Pleroma.User.Query do friends: User.t(), recipients_from_activity: [String.t()], nickname: [String.t()], - ap_id: [String.t()] + ap_id: [String.t()], + order_by: term(), + select: term(), + limit: pos_integer() } | %{} @@ -141,6 +144,18 @@ defmodule Pleroma.User.Query do where(query, [u], u.ap_id in ^to or fragment("? && ?", u.following, ^to)) end + defp compose_query({:order_by, key}, query) do + order_by(query, [u], field(u, ^key)) + end + + defp compose_query({:select, keys}, query) do + select(query, [u], ^keys) + end + + defp compose_query({:limit, limit}, query) do + limit(query, ^limit) + end + defp compose_query(_unsupported_param, query), do: query defp prepare_tag_criteria(tag, query) do diff --git a/lib/pleroma/user/synchronization.ex b/lib/pleroma/user/synchronization.ex new file mode 100644 index 000000000..93660e08c --- /dev/null +++ b/lib/pleroma/user/synchronization.ex @@ -0,0 +1,60 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2018 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.User.Synchronization do + alias Pleroma.HTTP + alias Pleroma.User + + @spec call([User.t()], map(), keyword()) :: {User.t(), map()} + def call(users, errors, opts \\ []) do + do_call(users, errors, opts) + end + + defp do_call([user | []], errors, opts) do + updated = fetch_counters(user, errors, opts) + {user, updated} + end + + defp do_call([user | others], errors, opts) do + updated = fetch_counters(user, errors, opts) + do_call(others, updated, opts) + end + + defp fetch_counters(user, errors, opts) do + %{host: host} = URI.parse(user.ap_id) + + info = %{} + {following, errors} = fetch_counter(user.ap_id <> "/following", host, errors, opts) + info = if following, do: Map.put(info, :following_count, following), else: info + + {followers, errors} = fetch_counter(user.ap_id <> "/followers", host, errors, opts) + info = if followers, do: Map.put(info, :follower_count, followers), else: info + + User.set_info_cache(user, info) + errors + end + + defp available_domain?(domain, errors, opts) do + max_retries = Keyword.get(opts, :max_retries, 3) + not (Map.has_key?(errors, domain) && errors[domain] >= max_retries) + end + + defp fetch_counter(url, host, errors, opts) do + with true <- available_domain?(host, errors, opts), + {:ok, %{body: body, status: code}} when code in 200..299 <- + HTTP.get( + url, + [{:Accept, "application/activity+json"}] + ), + {:ok, data} <- Jason.decode(body) do + {data["totalItems"], errors} + else + false -> + {nil, errors} + + _ -> + {nil, Map.update(errors, host, 1, &(&1 + 1))} + end + end +end diff --git a/lib/pleroma/user/synchronization_worker.ex b/lib/pleroma/user/synchronization_worker.ex new file mode 100644 index 000000000..ba9cc3556 --- /dev/null +++ b/lib/pleroma/user/synchronization_worker.ex @@ -0,0 +1,32 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2018 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-onl + +defmodule Pleroma.User.SynchronizationWorker do + use GenServer + + def start_link do + config = Pleroma.Config.get([:instance, :external_user_synchronization]) + + if config[:enabled] do + GenServer.start_link(__MODULE__, interval: config[:interval]) + else + :ignore + end + end + + def init(opts) do + schedule_next(opts) + {:ok, opts} + end + + def handle_info(:sync_follow_counters, opts) do + Pleroma.User.sync_follow_counter() + schedule_next(opts) + {:noreply, opts} + end + + defp schedule_next(opts) do + Process.send_after(self(), :sync_follow_counters, opts[:interval]) + end +end |