diff options
author | Lain Iwakura <lain@soykaf.club> | 2017-12-05 18:21:30 +0100 |
---|---|---|
committer | Lain Iwakura <lain@soykaf.club> | 2017-12-05 18:21:30 +0100 |
commit | 66c3813ea6388e9933af2b15e903f1cf6254cd3a (patch) | |
tree | b77556ae86bf102ef5976e00b63fdb41594f04a0 /lib | |
parent | e7c2472abd470b3b0ba8e9321a378e5a77412e26 (diff) | |
download | pleroma-66c3813ea6388e9933af2b15e903f1cf6254cd3a.tar.gz |
Add basic queue prioritization.
Diffstat (limited to 'lib')
-rw-r--r-- | lib/pleroma/web/federator/federator.ex | 21 |
1 files changed, 15 insertions, 6 deletions
diff --git a/lib/pleroma/web/federator/federator.ex b/lib/pleroma/web/federator/federator.ex index 9f6f983aa..f384b313c 100644 --- a/lib/pleroma/web/federator/federator.ex +++ b/lib/pleroma/web/federator/federator.ex @@ -15,8 +15,8 @@ defmodule Pleroma.Web.Federator do enqueue(:refresh_subscriptions, nil) end) GenServer.start_link(__MODULE__, %{ - in: {:sets.new(), :queue.new()}, - out: {:sets.new(), :queue.new()} + in: {:sets.new(), [], + out: {:sets.new(), []} }, name: __MODULE__) end @@ -88,8 +88,8 @@ defmodule Pleroma.Web.Federator do end def maybe_start_job(running_jobs, queue) do - if (:sets.size(running_jobs) < @max_jobs) && !:queue.is_empty(queue) do - {{:value, {type, payload}}, queue} = :queue.out(queue) + if (:sets.size(running_jobs) < @max_jobs) && queue != [] do + {{:value, {type, payload}}, queue} = queue_pop(queue) {:ok, pid} = Task.start(fn -> handle(type, payload) end) mref = Process.monitor(pid) {:sets.add_element(mref, running_jobs), queue} @@ -100,14 +100,14 @@ defmodule Pleroma.Web.Federator do def handle_cast({:enqueue, type, payload}, state) when type in [:incoming_doc] do %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}} = state - i_queue = :queue.in({type, payload}, i_queue) + i_queue = enqueue_sorted(i_queue, {type, payload}, 1) {i_running_jobs, i_queue} = maybe_start_job(i_running_jobs, i_queue) {:noreply, %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}}} end def handle_cast({:enqueue, type, payload}, state) do %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}} = state - o_queue = :queue.in({type, payload}, o_queue) + o_queue = enqueue_sorted(o_queue, {type, payload}, 1) {o_running_jobs, o_queue} = maybe_start_job(o_running_jobs, o_queue) {:noreply, %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}}} end @@ -126,4 +126,13 @@ defmodule Pleroma.Web.Federator do {:noreply, %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}}} end + + def enqueue_sorted(queue, element, priority) do + [%{item: element, priority: priority} | queue] + |> Enum.sort_by(fn (%{priority: priority}) -> priority end) + end + + def queue_pop([%{item: element} | queue]) do + {element, queue} + end end |