diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/pleroma/application.ex | 3 | ||||
-rw-r--r-- | lib/pleroma/web/mastodon_api/websocket_handler.ex | 9 | ||||
-rw-r--r-- | lib/pleroma/web/o_auth/token/strategy/revoke.ex | 14 | ||||
-rw-r--r-- | lib/pleroma/web/streamer.ex | 24 |
4 files changed, 42 insertions, 8 deletions
diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex index bf5c57840..1c1db8c10 100644 --- a/lib/pleroma/application.ex +++ b/lib/pleroma/application.ex @@ -94,7 +94,8 @@ defmodule Pleroma.Application do Pleroma.Repo, Config.TransferTask, Pleroma.Emoji, - Pleroma.Web.Plugs.RateLimiter.Supervisor + Pleroma.Web.Plugs.RateLimiter.Supervisor, + {Task.Supervisor, name: Pleroma.TaskSupervisor} ] ++ cachex_children() ++ http_children(adapter, @mix_env) ++ diff --git a/lib/pleroma/web/mastodon_api/websocket_handler.ex b/lib/pleroma/web/mastodon_api/websocket_handler.ex index e62b8a135..88444106d 100644 --- a/lib/pleroma/web/mastodon_api/websocket_handler.ex +++ b/lib/pleroma/web/mastodon_api/websocket_handler.ex @@ -32,7 +32,8 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do req end - {:cowboy_websocket, req, %{user: user, topic: topic, count: 0, timer: nil}, + {:cowboy_websocket, req, + %{user: user, topic: topic, oauth_token: oauth_token, count: 0, timer: nil}, %{idle_timeout: @timeout}} else {:error, :bad_topic} -> @@ -52,7 +53,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do "#{__MODULE__} accepted websocket connection for user #{(state.user || %{id: "anonymous"}).id}, topic #{state.topic}" ) - Streamer.add_socket(state.topic, state.user) + Streamer.add_socket(state.topic, state.oauth_token) {:ok, %{state | timer: timer()}} end @@ -98,6 +99,10 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do {:reply, :ping, %{state | timer: nil, count: 0}, :hibernate} end + def websocket_info(:close, state) do + {:stop, state} + end + # State can be `[]` only in case we terminate before switching to websocket, # we already log errors for these cases in `init/1`, so just do nothing here def terminate(_reason, _req, []), do: :ok diff --git a/lib/pleroma/web/o_auth/token/strategy/revoke.ex b/lib/pleroma/web/o_auth/token/strategy/revoke.ex index 752efca89..3b265b339 100644 --- a/lib/pleroma/web/o_auth/token/strategy/revoke.ex +++ b/lib/pleroma/web/o_auth/token/strategy/revoke.ex @@ -21,6 +21,18 @@ defmodule Pleroma.Web.OAuth.Token.Strategy.Revoke do @doc "Revokes access token" @spec revoke(Token.t()) :: {:ok, Token.t()} | {:error, Ecto.Changeset.t()} def revoke(%Token{} = token) do - Repo.delete(token) + with {:ok, token} <- Repo.delete(token) do + Task.Supervisor.start_child( + Pleroma.TaskSupervisor, + Pleroma.Web.Streamer, + :close_streams_by_oauth_token, + [token], + restart: :transient + ) + + {:ok, token} + else + result -> result + end end end diff --git a/lib/pleroma/web/streamer.ex b/lib/pleroma/web/streamer.ex index fe909df0a..3c0da5c27 100644 --- a/lib/pleroma/web/streamer.ex +++ b/lib/pleroma/web/streamer.ex @@ -37,7 +37,7 @@ defmodule Pleroma.Web.Streamer do {:ok, topic :: String.t()} | {:error, :bad_topic} | {:error, :unauthorized} def get_topic_and_add_socket(stream, user, oauth_token, params \\ %{}) do with {:ok, topic} <- get_topic(stream, user, oauth_token, params) do - add_socket(topic, user) + add_socket(topic, oauth_token) end end @@ -120,10 +120,10 @@ defmodule Pleroma.Web.Streamer do end @doc "Registers the process for streaming. Use `get_topic/3` to get the full authorized topic." - def add_socket(topic, user) do + def add_socket(topic, oauth_token) do if should_env_send?() do - auth? = if user, do: true - Registry.register(@registry, topic, auth?) + oauth_token_id = if oauth_token, do: oauth_token.id, else: false + Registry.register(@registry, topic, oauth_token_id) end {:ok, topic} @@ -338,6 +338,22 @@ defmodule Pleroma.Web.Streamer do end end + def close_streams_by_oauth_token(oauth_token) do + if should_env_send?() do + Registry.select( + @registry, + [ + { + {:"$1", :"$2", :"$3"}, + [{:==, :"$3", oauth_token.id}], + [:"$2"] + } + ] + ) + |> Enum.each(fn pid -> send(pid, :close) end) + end + end + # In test environement, only return true if the registry is started. # In benchmark environment, returns false. # In any other environment, always returns true. |