aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/pleroma/web/mastodon_api/mastodon_socket.ex23
-rw-r--r--lib/pleroma/web/streamer.ex41
-rw-r--r--test/web/mastodon_api/mastodon_socket_test.exs33
3 files changed, 84 insertions, 13 deletions
diff --git a/lib/pleroma/web/mastodon_api/mastodon_socket.ex b/lib/pleroma/web/mastodon_api/mastodon_socket.ex
index 0f3d5ff7c..f3c13d1aa 100644
--- a/lib/pleroma/web/mastodon_api/mastodon_socket.ex
+++ b/lib/pleroma/web/mastodon_api/mastodon_socket.ex
@@ -11,9 +11,8 @@ defmodule Pleroma.Web.MastodonAPI.MastodonSocket do
timeout: :infinity
)
- def connect(params, socket) do
- with token when not is_nil(token) <- params["access_token"],
- %Token{user_id: user_id} <- Repo.get_by(Token, token: token),
+ def connect(%{"access_token" => token} = params, socket) do
+ with %Token{user_id: user_id} <- Repo.get_by(Token, token: token),
%User{} = user <- Repo.get(User, user_id),
stream
when stream in [
@@ -45,6 +44,24 @@ defmodule Pleroma.Web.MastodonAPI.MastodonSocket do
end
end
+ def connect(%{"stream" => stream} = params, socket)
+ when stream in ["public", "public:local", "hashtag"] do
+ topic =
+ case stream do
+ "hashtag" -> "hashtag:#{params["tag"]}"
+ _ -> stream
+ end
+
+ with socket =
+ socket
+ |> assign(:topic, topic) do
+ Pleroma.Web.Streamer.add_socket(topic, socket)
+ {:ok, socket}
+ else
+ _e -> :error
+ end
+ end
+
def id(_), do: nil
def handle(:text, message, _state) do
diff --git a/lib/pleroma/web/streamer.ex b/lib/pleroma/web/streamer.ex
index 6b6d40346..5cab62c85 100644
--- a/lib/pleroma/web/streamer.ex
+++ b/lib/pleroma/web/streamer.ex
@@ -169,16 +169,33 @@ defmodule Pleroma.Web.Streamer do
|> Jason.encode!()
end
+ defp represent_update(%Activity{} = activity) do
+ %{
+ event: "update",
+ payload:
+ Pleroma.Web.MastodonAPI.StatusView.render(
+ "status.json",
+ activity: activity
+ )
+ |> Jason.encode!()
+ }
+ |> Jason.encode!()
+ end
+
def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do
Enum.each(topics[topic] || [], fn socket ->
# Get the current user so we have up-to-date blocks etc.
- user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id)
- blocks = user.info["blocks"] || []
+ if socket.assigns[:user] do
+ user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id)
+ blocks = user.info["blocks"] || []
- parent = Object.normalize(item.data["object"])
+ parent = Object.normalize(item.data["object"])
- unless is_nil(parent) or item.actor in blocks or parent.data["actor"] in blocks do
- send(socket.transport_pid, {:text, represent_update(item, user)})
+ unless is_nil(parent) or item.actor in blocks or parent.data["actor"] in blocks do
+ send(socket.transport_pid, {:text, represent_update(item, user)})
+ end
+ else
+ send(socket.transport_pid, {:text, represent_update(item)})
end
end)
end
@@ -186,11 +203,15 @@ defmodule Pleroma.Web.Streamer do
def push_to_socket(topics, topic, item) do
Enum.each(topics[topic] || [], fn socket ->
# Get the current user so we have up-to-date blocks etc.
- user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id)
- blocks = user.info["blocks"] || []
-
- unless item.actor in blocks do
- send(socket.transport_pid, {:text, represent_update(item, user)})
+ if socket.assigns[:user] do
+ user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id)
+ blocks = user.info["blocks"] || []
+
+ unless item.actor in blocks do
+ send(socket.transport_pid, {:text, represent_update(item, user)})
+ end
+ else
+ send(socket.transport_pid, {:text, represent_update(item)})
end
end)
end
diff --git a/test/web/mastodon_api/mastodon_socket_test.exs b/test/web/mastodon_api/mastodon_socket_test.exs
new file mode 100644
index 000000000..c7d71defc
--- /dev/null
+++ b/test/web/mastodon_api/mastodon_socket_test.exs
@@ -0,0 +1,33 @@
+defmodule Pleroma.Web.MastodonApi.MastodonSocketTest do
+ use Pleroma.DataCase
+
+ alias Pleroma.Web.MastodonApi.MastodonSocket
+ alias Pleroma.Web.{Streamer, CommonAPI}
+ alias Pleroma.User
+
+ import Pleroma.Factory
+
+ test "public is working when non-authenticated" do
+ user = insert(:user)
+
+ task =
+ Task.async(fn ->
+ assert_receive {:text, _}, 4_000
+ end)
+
+ fake_socket = %{
+ transport_pid: task.pid,
+ assigns: %{}
+ }
+
+ topics = %{
+ "public" => [fake_socket]
+ }
+
+ {:ok, activity} = CommonAPI.post(user, %{"status" => "Test"})
+
+ Streamer.push_to_socket(topics, "public", activity)
+
+ Task.await(task)
+ end
+end