aboutsummaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorhref <href+git-pleroma@random.sh>2019-02-01 09:14:35 +0000
committerhref <href+git-pleroma@random.sh>2019-02-01 09:14:35 +0000
commitb3b0855456a92351667a50c8ea77f328bded76ca (patch)
tree329d20c3eb8279caf01e0fb1ffbb0d08ae406f87 /lib
parent0340fcaecadae858b9c8fe39cc18bc25aef3e495 (diff)
parent92753b0cd9cfcdc5edb64a5e55ad27f73079f9e0 (diff)
downloadpleroma-b3b0855456a92351667a50c8ea77f328bded76ca.tar.gz
Merge branch '534_federation_targets_reachability' into 'develop'
[#534] Unreachable federation targets retirement Closes #534 See merge request pleroma/pleroma!703
Diffstat (limited to 'lib')
-rw-r--r--lib/pleroma/instances.ex36
-rw-r--r--lib/pleroma/instances/instance.ex102
-rw-r--r--lib/pleroma/web/activity_pub/activity_pub.ex34
-rw-r--r--lib/pleroma/web/activity_pub/activity_pub_controller.ex11
-rw-r--r--lib/pleroma/web/federator/federator.ex6
-rw-r--r--lib/pleroma/web/ostatus/ostatus.ex3
-rw-r--r--lib/pleroma/web/ostatus/ostatus_controller.ex1
-rw-r--r--lib/pleroma/web/salmon/salmon.ex30
-rw-r--r--lib/pleroma/web/websub/websub.ex31
-rw-r--r--lib/pleroma/web/websub/websub_controller.ex2
10 files changed, 221 insertions, 35 deletions
diff --git a/lib/pleroma/instances.ex b/lib/pleroma/instances.ex
new file mode 100644
index 000000000..5e107f4c9
--- /dev/null
+++ b/lib/pleroma/instances.ex
@@ -0,0 +1,36 @@
+defmodule Pleroma.Instances do
+ @moduledoc "Instances context."
+
+ @adapter Pleroma.Instances.Instance
+
+ defdelegate filter_reachable(urls_or_hosts), to: @adapter
+ defdelegate reachable?(url_or_host), to: @adapter
+ defdelegate set_reachable(url_or_host), to: @adapter
+ defdelegate set_unreachable(url_or_host, unreachable_since \\ nil), to: @adapter
+
+ def set_consistently_unreachable(url_or_host),
+ do: set_unreachable(url_or_host, reachability_datetime_threshold())
+
+ def reachability_datetime_threshold do
+ federation_reachability_timeout_days =
+ Pleroma.Config.get(:instance)[:federation_reachability_timeout_days] || 0
+
+ if federation_reachability_timeout_days > 0 do
+ NaiveDateTime.add(
+ NaiveDateTime.utc_now(),
+ -federation_reachability_timeout_days * 24 * 3600,
+ :second
+ )
+ else
+ ~N[0000-01-01 00:00:00]
+ end
+ end
+
+ def host(url_or_host) when is_binary(url_or_host) do
+ if url_or_host =~ ~r/^http/i do
+ URI.parse(url_or_host).host
+ else
+ url_or_host
+ end
+ end
+end
diff --git a/lib/pleroma/instances/instance.ex b/lib/pleroma/instances/instance.ex
new file mode 100644
index 000000000..a87590d8b
--- /dev/null
+++ b/lib/pleroma/instances/instance.ex
@@ -0,0 +1,102 @@
+defmodule Pleroma.Instances.Instance do
+ @moduledoc "Instance."
+
+ alias Pleroma.Instances
+ alias Pleroma.Instances.Instance
+
+ use Ecto.Schema
+
+ import Ecto.{Query, Changeset}
+
+ alias Pleroma.Repo
+
+ schema "instances" do
+ field(:host, :string)
+ field(:unreachable_since, :naive_datetime)
+
+ timestamps()
+ end
+
+ defdelegate host(url_or_host), to: Instances
+
+ def changeset(struct, params \\ %{}) do
+ struct
+ |> cast(params, [:host, :unreachable_since])
+ |> validate_required([:host])
+ |> unique_constraint(:host)
+ end
+
+ def filter_reachable([]), do: []
+
+ def filter_reachable(urls_or_hosts) when is_list(urls_or_hosts) do
+ hosts =
+ urls_or_hosts
+ |> Enum.map(&(&1 && host(&1)))
+ |> Enum.filter(&(to_string(&1) != ""))
+
+ unreachable_hosts =
+ Repo.all(
+ from(i in Instance,
+ where:
+ i.host in ^hosts and
+ i.unreachable_since <= ^Instances.reachability_datetime_threshold(),
+ select: i.host
+ )
+ )
+
+ Enum.filter(urls_or_hosts, &(&1 && host(&1) not in unreachable_hosts))
+ end
+
+ def reachable?(url_or_host) when is_binary(url_or_host) do
+ !Repo.one(
+ from(i in Instance,
+ where:
+ i.host == ^host(url_or_host) and
+ i.unreachable_since <= ^Instances.reachability_datetime_threshold(),
+ select: true
+ )
+ )
+ end
+
+ def reachable?(_), do: true
+
+ def set_reachable(url_or_host) when is_binary(url_or_host) do
+ with host <- host(url_or_host),
+ %Instance{} = existing_record <- Repo.get_by(Instance, %{host: host}) do
+ {:ok, _instance} =
+ existing_record
+ |> changeset(%{unreachable_since: nil})
+ |> Repo.update()
+ end
+ end
+
+ def set_reachable(_), do: {:error, nil}
+
+ def set_unreachable(url_or_host, unreachable_since \\ nil)
+
+ def set_unreachable(url_or_host, unreachable_since) when is_binary(url_or_host) do
+ unreachable_since = unreachable_since || DateTime.utc_now()
+ host = host(url_or_host)
+ existing_record = Repo.get_by(Instance, %{host: host})
+
+ changes = %{unreachable_since: unreachable_since}
+
+ cond do
+ is_nil(existing_record) ->
+ %Instance{}
+ |> changeset(Map.put(changes, :host, host))
+ |> Repo.insert()
+
+ existing_record.unreachable_since &&
+ NaiveDateTime.compare(existing_record.unreachable_since, unreachable_since) != :gt ->
+ {:ok, existing_record}
+
+ true ->
+ existing_record
+ |> changeset(changes)
+ |> Repo.update()
+ end
+ end
+
+ def set_unreachable(_, _), do: {:error, nil}
+end
diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex
index 0199ac9e7..06e8c3f1c 100644
--- a/lib/pleroma/web/activity_pub/activity_pub.ex
+++ b/lib/pleroma/web/activity_pub/activity_pub.ex
@@ -3,7 +3,7 @@
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.ActivityPub.ActivityPub do
- alias Pleroma.{Activity, Repo, Object, Upload, User, Notification}
+ alias Pleroma.{Activity, Repo, Object, Upload, User, Notification, Instances}
alias Pleroma.Web.ActivityPub.{Transmogrifier, MRF}
alias Pleroma.Web.WebFinger
alias Pleroma.Web.Federator
@@ -734,7 +734,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
end
def publish(actor, activity) do
- followers =
+ remote_followers =
if actor.follower_address in activity.recipients do
{:ok, followers} = User.get_followers(actor)
followers |> Enum.filter(&(!&1.local))
@@ -745,13 +745,14 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
public = is_public?(activity)
remote_inboxes =
- (Pleroma.Web.Salmon.remote_users(activity) ++ followers)
+ (Pleroma.Web.Salmon.remote_users(activity) ++ remote_followers)
|> Enum.filter(fn user -> User.ap_enabled?(user) end)
|> Enum.map(fn %{info: %{source_data: data}} ->
(is_map(data["endpoints"]) && Map.get(data["endpoints"], "sharedInbox")) || data["inbox"]
end)
|> Enum.uniq()
|> Enum.filter(fn inbox -> should_federate?(inbox, public) end)
+ |> Instances.filter_reachable()
{:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
json = Jason.encode!(data)
@@ -779,15 +780,24 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
digest: digest
})
- @httpoison.post(
- inbox,
- json,
- [
- {"Content-Type", "application/activity+json"},
- {"signature", signature},
- {"digest", digest}
- ]
- )
+ with {:ok, %{status: code}} when code in 200..299 <-
+ result =
+ @httpoison.post(
+ inbox,
+ json,
+ [
+ {"Content-Type", "application/activity+json"},
+ {"signature", signature},
+ {"digest", digest}
+ ]
+ ) do
+ Instances.set_reachable(inbox)
+ result
+ else
+ {_post_result, response} ->
+ Instances.set_unreachable(inbox)
+ {:error, response}
+ end
end
# TODO:
diff --git a/lib/pleroma/web/activity_pub/activity_pub_controller.ex b/lib/pleroma/web/activity_pub/activity_pub_controller.ex
index 7eed0a600..4dea6ab83 100644
--- a/lib/pleroma/web/activity_pub/activity_pub_controller.ex
+++ b/lib/pleroma/web/activity_pub/activity_pub_controller.ex
@@ -4,6 +4,7 @@
defmodule Pleroma.Web.ActivityPub.ActivityPubController do
use Pleroma.Web, :controller
+
alias Pleroma.{Activity, User, Object}
alias Pleroma.Web.ActivityPub.{ObjectView, UserView}
alias Pleroma.Web.ActivityPub.ActivityPub
@@ -17,6 +18,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do
action_fallback(:errors)
plug(Pleroma.Web.FederatingPlug when action in [:inbox, :relay])
+ plug(:set_requester_reachable when action in [:inbox])
plug(:relay_active? when action in [:relay])
def relay_active?(conn, _) do
@@ -289,4 +291,13 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do
|> put_status(500)
|> json("error")
end
+
+ defp set_requester_reachable(%Plug.Conn{} = conn, _) do
+ with actor <- conn.params["actor"],
+ true <- is_binary(actor) do
+ Pleroma.Instances.set_reachable(actor)
+ end
+
+ conn
+ end
end
diff --git a/lib/pleroma/web/federator/federator.ex b/lib/pleroma/web/federator/federator.ex
index f3a0e18b8..46f7a4973 100644
--- a/lib/pleroma/web/federator/federator.ex
+++ b/lib/pleroma/web/federator/federator.ex
@@ -6,7 +6,7 @@ defmodule Pleroma.Web.Federator do
use GenServer
alias Pleroma.User
alias Pleroma.Activity
- alias Pleroma.Web.{WebFinger, Websub}
+ alias Pleroma.Web.{WebFinger, Websub, Salmon}
alias Pleroma.Web.Federator.RetryQueue
alias Pleroma.Web.ActivityPub.ActivityPub
alias Pleroma.Web.ActivityPub.Relay
@@ -124,6 +124,10 @@ defmodule Pleroma.Web.Federator do
end
end
+ def handle(:publish_single_salmon, {user_or_url, feed, poster}) do
+ Salmon.send_to_user(user_or_url, feed, poster)
+ end
+
def handle(:publish_single_ap, params) do
case ActivityPub.publish_one(params) do
{:ok, _} ->
diff --git a/lib/pleroma/web/ostatus/ostatus.ex b/lib/pleroma/web/ostatus/ostatus.ex
index a3155b79d..a20ca17bb 100644
--- a/lib/pleroma/web/ostatus/ostatus.ex
+++ b/lib/pleroma/web/ostatus/ostatus.ex
@@ -48,6 +48,9 @@ defmodule Pleroma.Web.OStatus do
def handle_incoming(xml_string) do
with doc when doc != :error <- parse_document(xml_string) do
+ with {:ok, actor_user} <- find_make_or_update_user(doc),
+ do: Pleroma.Instances.set_reachable(actor_user.ap_id)
+
entries = :xmerl_xpath.string('//entry', doc)
activities =
diff --git a/lib/pleroma/web/ostatus/ostatus_controller.ex b/lib/pleroma/web/ostatus/ostatus_controller.ex
index 297aca2f9..302ff38a4 100644
--- a/lib/pleroma/web/ostatus/ostatus_controller.ex
+++ b/lib/pleroma/web/ostatus/ostatus_controller.ex
@@ -14,6 +14,7 @@ defmodule Pleroma.Web.OStatus.OStatusController do
alias Pleroma.Web.ActivityPub.ActivityPub
plug(Pleroma.Web.FederatingPlug when action in [:salmon_incoming])
+
action_fallback(:errors)
def feed_redirect(conn, %{"nickname" => nickname}) do
diff --git a/lib/pleroma/web/salmon/salmon.ex b/lib/pleroma/web/salmon/salmon.ex
index e41657da1..07ca42a5f 100644
--- a/lib/pleroma/web/salmon/salmon.ex
+++ b/lib/pleroma/web/salmon/salmon.ex
@@ -6,6 +6,7 @@ defmodule Pleroma.Web.Salmon do
@httpoison Application.get_env(:pleroma, :httpoison)
use Bitwise
+ alias Pleroma.Instances
alias Pleroma.Web.XML
alias Pleroma.Web.OStatus.ActivityRepresenter
alias Pleroma.User
@@ -163,23 +164,28 @@ defmodule Pleroma.Web.Salmon do
# push an activity to remote accounts
#
- defp send_to_user(%{info: %{salmon: salmon}}, feed, poster),
+ def send_to_user(%{info: %{salmon: salmon}}, feed, poster),
do: send_to_user(salmon, feed, poster)
- defp send_to_user(url, feed, poster) when is_binary(url) do
- with {:ok, %{status: code}} <-
+ def send_to_user(url, feed, poster) when is_binary(url) do
+ with {:ok, %{status: code}} when code in 200..299 <-
poster.(
url,
feed,
[{"Content-Type", "application/magic-envelope+xml"}]
) do
+ Instances.set_reachable(url)
Logger.debug(fn -> "Pushed to #{url}, code #{code}" end)
+ :ok
else
- e -> Logger.debug(fn -> "Pushing Salmon to #{url} failed, #{inspect(e)}" end)
+ e ->
+ Instances.set_unreachable(url)
+ Logger.debug(fn -> "Pushing Salmon to #{url} failed, #{inspect(e)}" end)
+ :error
end
end
- defp send_to_user(_, _, _), do: nil
+ def send_to_user(_, _, _), do: :noop
@supported_activities [
"Create",
@@ -209,12 +215,16 @@ defmodule Pleroma.Web.Salmon do
{:ok, private, _} = keys_from_pem(keys)
{:ok, feed} = encode(private, feed)
- remote_users(activity)
+ remote_users = remote_users(activity)
+
+ salmon_urls = Enum.map(remote_users, & &1.info.salmon)
+ reachable_salmon_urls = Instances.filter_reachable(salmon_urls)
+
+ remote_users
+ |> Enum.filter(&(&1.info.salmon in reachable_salmon_urls))
|> Enum.each(fn remote_user ->
- Task.start(fn ->
- Logger.debug(fn -> "Sending Salmon to #{remote_user.ap_id}" end)
- send_to_user(remote_user, feed, poster)
- end)
+ Logger.debug(fn -> "Sending Salmon to #{remote_user.ap_id}" end)
+ Pleroma.Web.Federator.enqueue(:publish_single_salmon, {remote_user, feed, poster})
end)
end
end
diff --git a/lib/pleroma/web/websub/websub.ex b/lib/pleroma/web/websub/websub.ex
index 7ca62c83b..8f7d53b03 100644
--- a/lib/pleroma/web/websub/websub.ex
+++ b/lib/pleroma/web/websub/websub.ex
@@ -5,6 +5,7 @@
defmodule Pleroma.Web.Websub do
alias Ecto.Changeset
alias Pleroma.Repo
+ alias Pleroma.Instances
alias Pleroma.Web.Websub.{WebsubServerSubscription, WebsubClientSubscription}
alias Pleroma.Web.OStatus.FeedRepresenter
alias Pleroma.Web.{XML, Endpoint, OStatus}
@@ -53,23 +54,27 @@ defmodule Pleroma.Web.Websub do
]
def publish(topic, user, %{data: %{"type" => type}} = activity)
when type in @supported_activities do
- # TODO: Only send to still valid subscriptions.
+ response =
+ user
+ |> FeedRepresenter.to_simple_form([activity], [user])
+ |> :xmerl.export_simple(:xmerl_xml)
+ |> to_string
+
query =
from(
sub in WebsubServerSubscription,
where: sub.topic == ^topic and sub.state == "active",
- where: fragment("? > NOW()", sub.valid_until)
+ where: fragment("? > (NOW() at time zone 'UTC')", sub.valid_until)
)
subscriptions = Repo.all(query)
- Enum.each(subscriptions, fn sub ->
- response =
- user
- |> FeedRepresenter.to_simple_form([activity], [user])
- |> :xmerl.export_simple(:xmerl_xml)
- |> to_string
+ callbacks = Enum.map(subscriptions, & &1.callback)
+ reachable_callbacks = Instances.filter_reachable(callbacks)
+ subscriptions
+ |> Enum.filter(&(&1.callback in reachable_callbacks))
+ |> Enum.each(fn sub ->
data = %{
xml: response,
topic: topic,
@@ -267,7 +272,7 @@ defmodule Pleroma.Web.Websub do
signature = sign(secret || "", xml)
Logger.info(fn -> "Pushing #{topic} to #{callback}" end)
- with {:ok, %{status: code}} <-
+ with {:ok, %{status: code}} when code in 200..299 <-
@httpoison.post(
callback,
xml,
@@ -276,12 +281,14 @@ defmodule Pleroma.Web.Websub do
{"X-Hub-Signature", "sha1=#{signature}"}
]
) do
+ Instances.set_reachable(callback)
Logger.info(fn -> "Pushed to #{callback}, code #{code}" end)
{:ok, code}
else
- e ->
- Logger.debug(fn -> "Couldn't push to #{callback}, #{inspect(e)}" end)
- {:error, e}
+ {_post_result, response} ->
+ Instances.set_unreachable(callback)
+ Logger.debug(fn -> "Couldn't push to #{callback}, #{inspect(response)}" end)
+ {:error, response}
end
end
end
diff --git a/lib/pleroma/web/websub/websub_controller.ex b/lib/pleroma/web/websub/websub_controller.ex
index e58f144e5..a92dfe87b 100644
--- a/lib/pleroma/web/websub/websub_controller.ex
+++ b/lib/pleroma/web/websub/websub_controller.ex
@@ -4,9 +4,11 @@
defmodule Pleroma.Web.Websub.WebsubController do
use Pleroma.Web, :controller
+
alias Pleroma.{Repo, User}
alias Pleroma.Web.{Websub, Federator}
alias Pleroma.Web.Websub.WebsubClientSubscription
+
require Logger
plug(