diff --git a/src/couch_work_queue.erl b/src/couch_work_queue.erl index 5d747de8..69c51915 100644 --- a/src/couch_work_queue.erl +++ b/src/couch_work_queue.erl @@ -17,7 +17,7 @@ -include_lib("couch/include/couch_db.hrl"). % public API --export([new/1, queue/2, dequeue/1, dequeue/2, close/1, item_count/1, size/1]). +-export([new/1, queue/2, queue/3, dequeue/1, dequeue/2, close/1, item_count/1, size/1, total_in/1, total_out/1, total_held/1]). % gen_server callbacks -export([init/1, terminate/2]). @@ -30,6 +30,8 @@ max_items, items = 0, size = 0, + total_in = 0, + total_out = 0, work_waiters = [], close_on_dequeue = false, multi_workers = false @@ -40,10 +42,13 @@ new(Options) -> gen_server:start_link(couch_work_queue, Options, []). -queue(Wq, Item) when is_binary(Item) -> - gen_server:call(Wq, {queue, Item, byte_size(Item)}, infinity); queue(Wq, Item) -> - gen_server:call(Wq, {queue, Item, ?term_size(Item)}, infinity). + queue(Wq, Item, 1). + +queue(Wq, Item, CountAs) when is_binary(Item) -> + gen_server:call(Wq, {queue, Item, byte_size(Item), CountAs}, infinity); +queue(Wq, Item, CountAs) -> + gen_server:call(Wq, {queue, Item, ?term_size(Item), CountAs}, infinity). dequeue(Wq) -> @@ -73,6 +78,27 @@ size(Wq) -> _:_ -> closed end. +total_in(Wq) -> + try + gen_server:call(Wq, total_in, infinity) + catch + _:_ -> closed + end. + +total_out(Wq) -> + try + gen_server:call(Wq, total_out, infinity) + catch + _:_ -> closed + end. + +total_held(Wq) -> + try + gen_server:call(Wq, total_held, infinity) + catch + _:_ -> closed + end. + close(Wq) -> gen_server:cast(Wq, close). @@ -90,11 +116,12 @@ init(Options) -> terminate(_Reason, #q{work_waiters=Workers}) -> lists:foreach(fun({W, _}) -> gen_server:reply(W, closed) end, Workers). - -handle_call({queue, Item, Size}, From, #q{work_waiters = []} = Q0) -> + +handle_call({queue, Item, Size, CountAs}, From, #q{work_waiters = []} = Q0) -> Q = Q0#q{size = Q0#q.size + Size, items = Q0#q.items + 1, - queue = queue:in({Item, Size}, Q0#q.queue)}, + total_in = Q0#q.total_in + CountAs, + queue = queue:in({Item, Size, CountAs}, Q0#q.queue)}, case (Q#q.size >= Q#q.max_size) orelse (Q#q.items >= Q#q.max_items) of true -> @@ -103,9 +130,12 @@ handle_call({queue, Item, Size}, From, #q{work_waiters = []} = Q0) -> {reply, ok, Q, hibernate} end; -handle_call({queue, Item, _}, _From, #q{work_waiters = [{W, _Max} | Rest]} = Q) -> +handle_call({queue, Item, _, CountAs}, _From, #q{work_waiters = [{W, _Max} | Rest]} = Q0) -> gen_server:reply(W, {ok, [Item]}), - {reply, ok, Q#q{work_waiters = Rest}, hibernate}; + Q = Q0#q{work_waiters = Rest, + total_in = Q0#q.total_in + CountAs, + total_out = Q0#q.total_out + CountAs}, + {reply, ok, Q, hibernate}; handle_call({dequeue, Max}, From, Q) -> #q{work_waiters = Workers, multi_workers = Multi, items = Count} = Q, @@ -127,7 +157,16 @@ handle_call(item_count, _From, Q) -> {reply, Q#q.items, Q}; handle_call(size, _From, Q) -> - {reply, Q#q.size, Q}. + {reply, Q#q.size, Q}; + +handle_call(total_in, _From, Q) -> + {reply, Q#q.total_in, Q}; + +handle_call(total_out, _From, Q) -> + {reply, Q#q.total_out, Q}; + +handle_call(total_held, _From, Q) -> + {reply, Q#q.total_in - Q#q.total_out, Q}. deliver_queue_items(Max, Q) -> @@ -135,21 +174,23 @@ deliver_queue_items(Max, Q) -> queue = Queue, items = Count, size = Size, + total_in = TotalIn, + total_out = TotalOut, close_on_dequeue = Close, blocked = Blocked } = Q, case (Max =:= all) orelse (Max >= Count) of false -> - {Items, Size2, Queue2, Blocked2} = dequeue_items( - Max, Size, Queue, Blocked, []), + {Items, Size2, TotalOut2, Queue2, Blocked2} = dequeue_items( + Max, Size, TotalOut, Queue, Blocked, []), Q2 = Q#q{ - items = Count - Max, size = Size2, blocked = Blocked2, queue = Queue2 + items = Count - Max, size = Size2, total_out = TotalOut2, blocked = Blocked2, queue = Queue2 }, {reply, {ok, Items}, Q2}; true -> lists:foreach(fun(F) -> gen_server:reply(F, ok) end, Blocked), - Q2 = Q#q{items = 0, size = 0, blocked = [], queue = queue:new()}, - Items = [Item || {Item, _} <- queue:to_list(Queue)], + Q2 = Q#q{items = 0, size = 0, total_out = TotalIn, blocked = [], queue = queue:new()}, + Items = [Item || {Item, _, _} <- queue:to_list(Queue)], case Close of false -> {reply, {ok, Items}, Q2}; @@ -159,11 +200,11 @@ deliver_queue_items(Max, Q) -> end. -dequeue_items(0, Size, Queue, Blocked, DequeuedAcc) -> - {lists:reverse(DequeuedAcc), Size, Queue, Blocked}; +dequeue_items(0, Size, TotalOut, Queue, Blocked, DequeuedAcc) -> + {lists:reverse(DequeuedAcc), Size, TotalOut, Queue, Blocked}; -dequeue_items(NumItems, Size, Queue, Blocked, DequeuedAcc) -> - {{value, {Item, ItemSize}}, Queue2} = queue:out(Queue), +dequeue_items(NumItems, Size, TotalOut, Queue, Blocked, DequeuedAcc) -> + {{value, {Item, ItemSize, CountAs}}, Queue2} = queue:out(Queue), case Blocked of [] -> Blocked2 = Blocked; @@ -171,8 +212,8 @@ dequeue_items(NumItems, Size, Queue, Blocked, DequeuedAcc) -> gen_server:reply(From, ok) end, dequeue_items( - NumItems - 1, Size - ItemSize, Queue2, Blocked2, [Item | DequeuedAcc]). - + NumItems - 1, Size - ItemSize, TotalOut + CountAs, Queue2, Blocked2, [Item | DequeuedAcc]). + handle_cast(close, #q{items = 0} = Q) -> {stop, normal, Q};