From 652cc6ba4b7a0494cc96ef4a9bfcaa3b5e5be60e Mon Sep 17 00:00:00 2001 From: Maksim Pechnikov Date: Tue, 3 Dec 2019 21:30:10 +0300 Subject: updated ScheduledActivity --- lib/pleroma/scheduled_activity.ex | 66 ++++++++++++++++++---- .../controllers/scheduled_activity_controller.ex | 3 +- .../mastodon_api/controllers/status_controller.ex | 19 ++++--- .../workers/cron/scheduled_activity_worker.ex | 41 -------------- lib/pleroma/workers/scheduled_activity_worker.ex | 44 +++++++++++++++ 5 files changed, 112 insertions(+), 61 deletions(-) delete mode 100644 lib/pleroma/workers/cron/scheduled_activity_worker.ex create mode 100644 lib/pleroma/workers/scheduled_activity_worker.ex (limited to 'lib') diff --git a/lib/pleroma/scheduled_activity.ex b/lib/pleroma/scheduled_activity.ex index fea2cf3ff..96fa6a987 100644 --- a/lib/pleroma/scheduled_activity.ex +++ b/lib/pleroma/scheduled_activity.ex @@ -5,11 +5,13 @@ defmodule Pleroma.ScheduledActivity do use Ecto.Schema + alias Ecto.Multi alias Pleroma.Config alias Pleroma.Repo alias Pleroma.ScheduledActivity alias Pleroma.User alias Pleroma.Web.CommonAPI.Utils + alias Pleroma.Workers.ScheduledActivityWorker import Ecto.Query import Ecto.Changeset @@ -105,14 +107,29 @@ defmodule Pleroma.ScheduledActivity do end def new(%User{} = user, attrs) do - %ScheduledActivity{user_id: user.id} - |> changeset(attrs) + changeset(%ScheduledActivity{user_id: user.id}, attrs) end + @doc """ + Creates ScheduledActivity and add to queue to perform at scheduled_at date + """ + @spec create(User.t(), map()) :: {:ok, ScheduledActivity.t()} | {:error, Ecto.Changeset.t()} def create(%User{} = user, attrs) do - user - |> new(attrs) - |> Repo.insert() + Multi.new() + |> Multi.insert(:scheduled_activity, new(user, attrs)) + |> Multi.run(:scheduled_activity_job, fn _repo, %{scheduled_activity: activity} -> + %{activity_id: activity.id} + |> ScheduledActivityWorker.new(scheduled_at: activity.scheduled_at) + |> Oban.insert() + end) + |> Repo.transaction() + |> case do + {:ok, %{scheduled_activity: scheduled_activity}} -> + {:ok, scheduled_activity} + + {:error, _, changeset, _} -> + {:error, changeset} + end end def get(%User{} = user, scheduled_activity_id) do @@ -122,15 +139,35 @@ defmodule Pleroma.ScheduledActivity do |> Repo.one() end - def update(%ScheduledActivity{} = scheduled_activity, attrs) do - scheduled_activity - |> update_changeset(attrs) - |> Repo.update() + @spec update(ScheduledActivity.t(), map()) :: + {:ok, ScheduledActivity.t()} | {:error, Ecto.Changeset.t()} + def update(%ScheduledActivity{id: id} = scheduled_activity, attrs) do + with {:error, %Ecto.Changeset{valid?: true} = changeset} <- + {:error, update_changeset(scheduled_activity, attrs)} do + Multi.new() + |> Multi.update(:scheduled_activity, changeset) + |> Multi.update_all(:scheduled_job, job_query(id), + set: [scheduled_at: changeset.changes[:scheduled_at]] + ) + |> Repo.transaction() + |> case do + {:ok, %{scheduled_activity: scheduled_activity}} -> + {:ok, scheduled_activity} + + {:error, _, changeset, _} -> + {:error, changeset} + end + end + end + + def delete_job(%ScheduledActivity{id: id} = _scheduled_activity) do + id + |> job_query + |> Repo.delete_all() end def delete(%ScheduledActivity{} = scheduled_activity) do - scheduled_activity - |> Repo.delete() + Repo.delete(scheduled_activity) end def delete(id) when is_binary(id) or is_integer(id) do @@ -158,4 +195,11 @@ defmodule Pleroma.ScheduledActivity do |> where([sa], sa.scheduled_at < ^naive_datetime) |> Repo.all() end + + def job_query(scheduled_activity_id) do + from(j in Oban.Job, + where: j.queue == "scheduled_activities", + where: fragment("args ->> 'activity_id' = ?::text", ^to_string(scheduled_activity_id)) + ) + end end diff --git a/lib/pleroma/web/mastodon_api/controllers/scheduled_activity_controller.ex b/lib/pleroma/web/mastodon_api/controllers/scheduled_activity_controller.ex index ff9276541..4f9a8bdbe 100644 --- a/lib/pleroma/web/mastodon_api/controllers/scheduled_activity_controller.ex +++ b/lib/pleroma/web/mastodon_api/controllers/scheduled_activity_controller.ex @@ -45,7 +45,8 @@ defmodule Pleroma.Web.MastodonAPI.ScheduledActivityController do @doc "DELETE /api/v1/scheduled_statuses/:id" def delete(%{assigns: %{scheduled_activity: scheduled_activity}} = conn, _params) do - with {:ok, scheduled_activity} <- ScheduledActivity.delete(scheduled_activity) do + with {:ok, scheduled_activity} <- ScheduledActivity.delete(scheduled_activity), + _ <- ScheduledActivity.delete_job(scheduled_activity) do render(conn, "show.json", scheduled_activity: scheduled_activity) end end diff --git a/lib/pleroma/web/mastodon_api/controllers/status_controller.ex b/lib/pleroma/web/mastodon_api/controllers/status_controller.ex index 74b223cf4..d70749dfa 100644 --- a/lib/pleroma/web/mastodon_api/controllers/status_controller.ex +++ b/lib/pleroma/web/mastodon_api/controllers/status_controller.ex @@ -124,15 +124,18 @@ defmodule Pleroma.Web.MastodonAPI.StatusController do ) do params = Map.put(params, "in_reply_to_status_id", params["in_reply_to_id"]) - if ScheduledActivity.far_enough?(scheduled_at) do - with {:ok, scheduled_activity} <- - ScheduledActivity.create(user, %{"params" => params, "scheduled_at" => scheduled_at}) do - conn - |> put_view(ScheduledActivityView) - |> render("show.json", scheduled_activity: scheduled_activity) - end + with {:far_enough, true} <- {:far_enough, ScheduledActivity.far_enough?(scheduled_at)}, + attrs <- %{"params" => params, "scheduled_at" => scheduled_at}, + {:ok, scheduled_activity} <- ScheduledActivity.create(user, attrs) do + conn + |> put_view(ScheduledActivityView) + |> render("show.json", scheduled_activity: scheduled_activity) else - create(conn, Map.drop(params, ["scheduled_at"])) + {:far_enough, _} -> + create(conn, Map.drop(params, ["scheduled_at"])) + + error -> + error end end diff --git a/lib/pleroma/workers/cron/scheduled_activity_worker.ex b/lib/pleroma/workers/cron/scheduled_activity_worker.ex deleted file mode 100644 index 407ab687a..000000000 --- a/lib/pleroma/workers/cron/scheduled_activity_worker.ex +++ /dev/null @@ -1,41 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2019 Pleroma Authors -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Workers.Cron.ScheduledActivityWorker do - @moduledoc """ - The worker to post scheduled actvities. - """ - - use Oban.Worker, queue: "scheduled_activities" - alias Pleroma.Config - alias Pleroma.ScheduledActivity - alias Pleroma.User - alias Pleroma.Web.CommonAPI - - require Logger - - @schedule_interval :timer.minutes(1) - - @impl Oban.Worker - def perform(_opts, _job) do - if Config.get([ScheduledActivity, :enabled]) do - @schedule_interval - |> ScheduledActivity.due_activities() - |> Enum.each(&post_activity/1) - end - end - - def post_activity(scheduled_activity) do - try do - {:ok, scheduled_activity} = ScheduledActivity.delete(scheduled_activity) - %User{} = user = User.get_cached_by_id(scheduled_activity.user_id) - {:ok, _result} = CommonAPI.post(user, scheduled_activity.params) - rescue - error -> - Logger.error( - "#{__MODULE__} Couldn't create a status from the scheduled activity: #{inspect(error)}" - ) - end - end -end diff --git a/lib/pleroma/workers/scheduled_activity_worker.ex b/lib/pleroma/workers/scheduled_activity_worker.ex new file mode 100644 index 000000000..5109d7f75 --- /dev/null +++ b/lib/pleroma/workers/scheduled_activity_worker.ex @@ -0,0 +1,44 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.ScheduledActivityWorker do + @moduledoc """ + The worker to post scheduled activity. + """ + + use Pleroma.Workers.WorkerHelper, queue: "scheduled_activities" + + alias Pleroma.Config + alias Pleroma.ScheduledActivity + alias Pleroma.User + alias Pleroma.Web.CommonAPI + + require Logger + + @impl Oban.Worker + def perform(%{"activity_id" => activity_id}, _job) do + if Config.get([ScheduledActivity, :enabled]) do + case Pleroma.Repo.get(ScheduledActivity, activity_id) do + %ScheduledActivity{} = scheduled_activity -> + post_activity(scheduled_activity) + + _ -> + Logger.error("#{__MODULE__} Couldn't find scheduled activity: #{activity_id}") + end + end + end + + defp post_activity(%ScheduledActivity{} = scheduled_activity) do + try do + {:ok, scheduled_activity} = ScheduledActivity.delete(scheduled_activity) + %User{} = user = User.get_cached_by_id(scheduled_activity.user_id) + {:ok, _result} = CommonAPI.post(user, scheduled_activity.params) + rescue + error -> + Logger.error( + "#{__MODULE__} Couldn't create a status from the scheduled activity: #{inspect(error)}" + ) + end + end +end -- cgit v1.2.3