aboutsummaryrefslogtreecommitdiff
path: root/lib/pleroma/scheduled_activity.ex
diff options
context:
space:
mode:
Diffstat (limited to 'lib/pleroma/scheduled_activity.ex')
-rw-r--r--lib/pleroma/scheduled_activity.ex66
1 files changed, 55 insertions, 11 deletions
diff --git a/lib/pleroma/scheduled_activity.ex b/lib/pleroma/scheduled_activity.ex
index fea2cf3ff..96fa6a987 100644
--- a/lib/pleroma/scheduled_activity.ex
+++ b/lib/pleroma/scheduled_activity.ex
@@ -5,11 +5,13 @@
defmodule Pleroma.ScheduledActivity do
use Ecto.Schema
+ alias Ecto.Multi
alias Pleroma.Config
alias Pleroma.Repo
alias Pleroma.ScheduledActivity
alias Pleroma.User
alias Pleroma.Web.CommonAPI.Utils
+ alias Pleroma.Workers.ScheduledActivityWorker
import Ecto.Query
import Ecto.Changeset
@@ -105,14 +107,29 @@ defmodule Pleroma.ScheduledActivity do
end
def new(%User{} = user, attrs) do
- %ScheduledActivity{user_id: user.id}
- |> changeset(attrs)
+ changeset(%ScheduledActivity{user_id: user.id}, attrs)
end
+ @doc """
+ Creates ScheduledActivity and add to queue to perform at scheduled_at date
+ """
+ @spec create(User.t(), map()) :: {:ok, ScheduledActivity.t()} | {:error, Ecto.Changeset.t()}
def create(%User{} = user, attrs) do
- user
- |> new(attrs)
- |> Repo.insert()
+ Multi.new()
+ |> Multi.insert(:scheduled_activity, new(user, attrs))
+ |> Multi.run(:scheduled_activity_job, fn _repo, %{scheduled_activity: activity} ->
+ %{activity_id: activity.id}
+ |> ScheduledActivityWorker.new(scheduled_at: activity.scheduled_at)
+ |> Oban.insert()
+ end)
+ |> Repo.transaction()
+ |> case do
+ {:ok, %{scheduled_activity: scheduled_activity}} ->
+ {:ok, scheduled_activity}
+
+ {:error, _, changeset, _} ->
+ {:error, changeset}
+ end
end
def get(%User{} = user, scheduled_activity_id) do
@@ -122,15 +139,35 @@ defmodule Pleroma.ScheduledActivity do
|> Repo.one()
end
- def update(%ScheduledActivity{} = scheduled_activity, attrs) do
- scheduled_activity
- |> update_changeset(attrs)
- |> Repo.update()
+ @spec update(ScheduledActivity.t(), map()) ::
+ {:ok, ScheduledActivity.t()} | {:error, Ecto.Changeset.t()}
+ def update(%ScheduledActivity{id: id} = scheduled_activity, attrs) do
+ with {:error, %Ecto.Changeset{valid?: true} = changeset} <-
+ {:error, update_changeset(scheduled_activity, attrs)} do
+ Multi.new()
+ |> Multi.update(:scheduled_activity, changeset)
+ |> Multi.update_all(:scheduled_job, job_query(id),
+ set: [scheduled_at: changeset.changes[:scheduled_at]]
+ )
+ |> Repo.transaction()
+ |> case do
+ {:ok, %{scheduled_activity: scheduled_activity}} ->
+ {:ok, scheduled_activity}
+
+ {:error, _, changeset, _} ->
+ {:error, changeset}
+ end
+ end
+ end
+
+ def delete_job(%ScheduledActivity{id: id} = _scheduled_activity) do
+ id
+ |> job_query
+ |> Repo.delete_all()
end
def delete(%ScheduledActivity{} = scheduled_activity) do
- scheduled_activity
- |> Repo.delete()
+ Repo.delete(scheduled_activity)
end
def delete(id) when is_binary(id) or is_integer(id) do
@@ -158,4 +195,11 @@ defmodule Pleroma.ScheduledActivity do
|> where([sa], sa.scheduled_at < ^naive_datetime)
|> Repo.all()
end
+
+ def job_query(scheduled_activity_id) do
+ from(j in Oban.Job,
+ where: j.queue == "scheduled_activities",
+ where: fragment("args ->> 'activity_id' = ?::text", ^to_string(scheduled_activity_id))
+ )
+ end
end