aboutsummaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/pleroma/application.ex1
-rw-r--r--lib/pleroma/emails/mailer.ex2
-rw-r--r--lib/pleroma/jobs.ex152
-rw-r--r--lib/pleroma/web/federator/federator.ex19
4 files changed, 10 insertions, 164 deletions
diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex
index cc81e1805..782d1d589 100644
--- a/lib/pleroma/application.ex
+++ b/lib/pleroma/application.ex
@@ -110,7 +110,6 @@ defmodule Pleroma.Application do
worker(Pleroma.Web.Federator.RetryQueue, []),
worker(Pleroma.Stats, []),
worker(Pleroma.Web.Push, []),
- worker(Pleroma.Jobs, []),
worker(Task, [&Pleroma.Web.Federator.init/0], restart: :temporary)
] ++
streamer_child() ++
diff --git a/lib/pleroma/emails/mailer.ex b/lib/pleroma/emails/mailer.ex
index f7e3aa78b..b384e6fec 100644
--- a/lib/pleroma/emails/mailer.ex
+++ b/lib/pleroma/emails/mailer.ex
@@ -6,7 +6,7 @@ defmodule Pleroma.Mailer do
use Swoosh.Mailer, otp_app: :pleroma
def deliver_async(email, config \\ []) do
- Pleroma.Jobs.enqueue(:mailer, __MODULE__, [:deliver_async, email, config])
+ PleromaJobQueue.enqueue(:mailer, __MODULE__, [:deliver_async, email, config])
end
def perform(:deliver_async, email, config), do: deliver(email, config)
diff --git a/lib/pleroma/jobs.ex b/lib/pleroma/jobs.ex
deleted file mode 100644
index 24b7e5e46..000000000
--- a/lib/pleroma/jobs.ex
+++ /dev/null
@@ -1,152 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Jobs do
- @moduledoc """
- A basic job queue
- """
- use GenServer
-
- require Logger
-
- def init(args) do
- {:ok, args}
- end
-
- def start_link do
- queues =
- Pleroma.Config.get(Pleroma.Jobs)
- |> Enum.map(fn {name, _} -> create_queue(name) end)
- |> Enum.into(%{})
-
- state = %{
- queues: queues,
- refs: %{}
- }
-
- GenServer.start_link(__MODULE__, state, name: __MODULE__)
- end
-
- def create_queue(name) do
- {name, {:sets.new(), []}}
- end
-
- @doc """
- Enqueues a job.
-
- Returns `:ok`.
-
- ## Arguments
-
- - `queue_name` - a queue name(must be specified in the config).
- - `mod` - a worker module (must have `perform` function).
- - `args` - a list of arguments for the `perform` function of the worker module.
- - `priority` - a job priority (`0` by default).
-
- ## Examples
-
- Enqueue `Module.perform/0` with `priority=1`:
-
- iex> Pleroma.Jobs.enqueue(:example_queue, Module, [])
- :ok
-
- Enqueue `Module.perform(:job_name)` with `priority=5`:
-
- iex> Pleroma.Jobs.enqueue(:example_queue, Module, [:job_name], 5)
- :ok
-
- Enqueue `Module.perform(:another_job, data)` with `priority=1`:
-
- iex> data = "foobar"
- iex> Pleroma.Jobs.enqueue(:example_queue, Module, [:another_job, data])
- :ok
-
- Enqueue `Module.perform(:foobar_job, :foo, :bar, 42)` with `priority=1`:
-
- iex> Pleroma.Jobs.enqueue(:example_queue, Module, [:foobar_job, :foo, :bar, 42])
- :ok
-
- """
-
- def enqueue(queue_name, mod, args, priority \\ 1)
-
- if Mix.env() == :test do
- def enqueue(_queue_name, mod, args, _priority) do
- apply(mod, :perform, args)
- end
- else
- @spec enqueue(atom(), atom(), [any()], integer()) :: :ok
- def enqueue(queue_name, mod, args, priority) do
- GenServer.cast(__MODULE__, {:enqueue, queue_name, mod, args, priority})
- end
- end
-
- def handle_cast({:enqueue, queue_name, mod, args, priority}, state) do
- {running_jobs, queue} = state[:queues][queue_name]
-
- queue = enqueue_sorted(queue, {mod, args}, priority)
-
- state =
- state
- |> update_queue(queue_name, {running_jobs, queue})
- |> maybe_start_job(queue_name, running_jobs, queue)
-
- {:noreply, state}
- end
-
- def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do
- queue_name = state.refs[ref]
-
- {running_jobs, queue} = state[:queues][queue_name]
-
- running_jobs = :sets.del_element(ref, running_jobs)
-
- state =
- state
- |> remove_ref(ref)
- |> update_queue(queue_name, {running_jobs, queue})
- |> maybe_start_job(queue_name, running_jobs, queue)
-
- {:noreply, state}
- end
-
- def maybe_start_job(state, queue_name, running_jobs, queue) do
- if :sets.size(running_jobs) < Pleroma.Config.get([__MODULE__, queue_name, :max_jobs]) &&
- queue != [] do
- {{mod, args}, queue} = queue_pop(queue)
- {:ok, pid} = Task.start(fn -> apply(mod, :perform, args) end)
- mref = Process.monitor(pid)
-
- state
- |> add_ref(queue_name, mref)
- |> update_queue(queue_name, {:sets.add_element(mref, running_jobs), queue})
- else
- state
- end
- end
-
- def enqueue_sorted(queue, element, priority) do
- [%{item: element, priority: priority} | queue]
- |> Enum.sort_by(fn %{priority: priority} -> priority end)
- end
-
- def queue_pop([%{item: element} | queue]) do
- {element, queue}
- end
-
- defp add_ref(state, queue_name, ref) do
- refs = Map.put(state[:refs], ref, queue_name)
- Map.put(state, :refs, refs)
- end
-
- defp remove_ref(state, ref) do
- refs = Map.delete(state[:refs], ref)
- Map.put(state, :refs, refs)
- end
-
- defp update_queue(state, queue_name, data) do
- queues = Map.put(state[:queues], queue_name, data)
- Map.put(state, :queues, queues)
- end
-end
diff --git a/lib/pleroma/web/federator/federator.ex b/lib/pleroma/web/federator/federator.ex
index 5e690ddb8..c47328e13 100644
--- a/lib/pleroma/web/federator/federator.ex
+++ b/lib/pleroma/web/federator/federator.ex
@@ -4,7 +4,6 @@
defmodule Pleroma.Web.Federator do
alias Pleroma.Activity
- alias Pleroma.Jobs
alias Pleroma.User
alias Pleroma.Web.ActivityPub.ActivityPub
alias Pleroma.Web.ActivityPub.Relay
@@ -31,39 +30,39 @@ defmodule Pleroma.Web.Federator do
# Client API
def incoming_doc(doc) do
- Jobs.enqueue(:federator_incoming, __MODULE__, [:incoming_doc, doc])
+ PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_doc, doc])
end
def incoming_ap_doc(params) do
- Jobs.enqueue(:federator_incoming, __MODULE__, [:incoming_ap_doc, params])
+ PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_ap_doc, params])
end
def publish(activity, priority \\ 1) do
- Jobs.enqueue(:federator_outgoing, __MODULE__, [:publish, activity], priority)
+ PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish, activity], priority)
end
def publish_single_ap(params) do
- Jobs.enqueue(:federator_outgoing, __MODULE__, [:publish_single_ap, params])
+ PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish_single_ap, params])
end
def publish_single_websub(websub) do
- Jobs.enqueue(:federator_outgoing, __MODULE__, [:publish_single_websub, websub])
+ PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish_single_websub, websub])
end
def verify_websub(websub) do
- Jobs.enqueue(:federator_outgoing, __MODULE__, [:verify_websub, websub])
+ PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:verify_websub, websub])
end
def request_subscription(sub) do
- Jobs.enqueue(:federator_outgoing, __MODULE__, [:request_subscription, sub])
+ PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:request_subscription, sub])
end
def refresh_subscriptions do
- Jobs.enqueue(:federator_outgoing, __MODULE__, [:refresh_subscriptions])
+ PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:refresh_subscriptions])
end
def publish_single_salmon(params) do
- Jobs.enqueue(:federator_outgoing, __MODULE__, [:publish_single_salmon, params])
+ PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish_single_salmon, params])
end
# Job Worker Callbacks