From ae00a5a62e83e585c46ae46a5b110b1474f4ea40 Mon Sep 17 00:00:00 2001 From: "Paul J. Davis" Date: Tue, 28 Mar 2017 16:21:38 -0500 Subject: [PATCH] Improve compaction task status updates Previous the emsort related operations did not update the compaction task status. For large databases this leads to some very long waits while the compaction task stays at 100%. This change adds progress reports to the steps for sorting and copying document ids back into the database file. --- src/couch_db_updater.erl | 97 ++++++++++++++++++++++++++++++++++++---- src/couch_emsort.erl | 71 ++++++++++++++++++----------- 2 files changed, 134 insertions(+), 34 deletions(-) diff --git a/src/couch_db_updater.erl b/src/couch_db_updater.erl index 270fffe4..3f3c9198 100644 --- a/src/couch_db_updater.erl +++ b/src/couch_db_updater.erl @@ -34,6 +34,8 @@ infos }). +-define(COMP_DOCID_BATCH_SIZE, 1000). + init({DbName, Filepath, Fd, Options}) -> erlang:put(io_priority, {db_update, DbName}), case lists:member(create, Options) of @@ -1108,10 +1110,10 @@ copy_docs(Db, #db{fd = DestFd} = NewDb, MixedInfos, Retry) -> NewDb#db{id_tree=IdEms, seq_tree=SeqTree}. -copy_compact(Db, NewDb0, Retry) -> +copy_compact(Db, NewDb0, Retry, TotalChanges) -> Compression = couch_compress:get_compression_method(), NewDb = NewDb0#db{compression=Compression}, - TotalChanges = couch_db:count_changes_since(Db, NewDb#db.update_seq), + BufferSize = list_to_integer( config:get("database_compaction", "doc_buffer_size", "524288")), CheckpointAfter = couch_util:to_integer( @@ -1147,6 +1149,7 @@ copy_compact(Db, NewDb0, Retry) -> TaskProps0 = [ {type, database_compaction}, {database, Db#db.name}, + {phase, seq_tree}, {progress, 0}, {changes_done, 0}, {total_changes, TotalChanges} @@ -1193,6 +1196,8 @@ start_copy_compact(#db{}=Db) -> open_compaction_files(Name, Header, Filepath, Options), erlang:monitor(process, MFd), + TotalChanges = couch_db:count_changes_since(Db, NewDb#db.update_seq), + % This is a bit worrisome. init_db/4 will monitor the data fd % but it doesn't know about the meta fd. For now I'll maintain % that the data fd is the old normal fd and meta fd is special @@ -1200,10 +1205,10 @@ start_copy_compact(#db{}=Db) -> unlink(DFd), NewDb1 = copy_purge_info(Db, NewDb), - NewDb2 = copy_compact(Db, NewDb1, Retry), - NewDb3 = sort_meta_data(NewDb2), + NewDb2 = copy_compact(Db, NewDb1, Retry, TotalChanges), + NewDb3 = sort_meta_data(NewDb2, TotalChanges), NewDb4 = commit_compaction_data(NewDb3), - NewDb5 = copy_meta_data(NewDb4), + NewDb5 = copy_meta_data(NewDb4, TotalChanges), NewDb6 = sync_header(NewDb5, db_to_header(NewDb5, NewDb5#db.header)), close_db(NewDb6), @@ -1323,12 +1328,84 @@ bind_id_tree(Db, Fd, State) -> Db#db{id_tree=IdBtree}. -sort_meta_data(Db0) -> - {ok, Ems} = couch_emsort:merge(Db0#db.id_tree), - Db0#db{id_tree=Ems}. +sort_meta_data(Db0, TotalChanges) -> + couch_task_status:update([ + {phase, sort_ids_init}, + {total_changes, TotalChanges}, + {changes_done, 0}, + {progress, 0} + ]), + Ems0 = Db0#db.id_tree, + Options = [ + {event_cb, fun emsort_cb/3}, + {event_st, {init, 0, 0}} + ], + Ems1 = couch_emsort:set_options(Ems0, Options), + {ok, Ems2} = couch_emsort:merge(Ems1), + Db0#db{id_tree=Ems2}. + + +emsort_cb(_Ems, {merge, chain}, {init, Copied, Nodes}) -> + {init, Copied, Nodes + 1}; +emsort_cb(_Ems, row_copy, {init, Copied, Nodes}) + when Copied >= ?COMP_DOCID_BATCH_SIZE -> + update_compact_task(Copied + 1), + {init, 0, Nodes}; +emsort_cb(_Ems, row_copy, {init, Copied, Nodes}) -> + {init, Copied + 1, Nodes}; +emsort_cb(Ems, {merge_start, reverse}, {init, Copied, Nodes}) -> + BBChunkSize = couch_emsort:get_bb_chunk_size(Ems), + + % Subtract one because we already finished the first + % iteration when we were counting the number of nodes + % in the backbone. + Iters = calculate_sort_iters(Nodes, BBChunkSize, 0) - 1, + + % Compaction retries mean we may have copied more than + % doc count rows. This accounts for that by using the + % number we've actually copied. + [PrevCopied] = couch_task_status:get([changes_done]), + TotalCopied = PrevCopied + Copied, + + couch_task_status:update([ + {phase, sort_ids}, + {total_changes, Iters * TotalCopied}, + {changes_done, 0}, + {progress, 0} + ]), + 0; + +emsort_cb(_Ems, row_copy, Copied) + when is_integer(Copied), Copied > ?COMP_DOCID_BATCH_SIZE -> + update_compact_task(Copied + 1), + 0; + +emsort_cb(_Ems, row_copy, Copied) when is_integer(Copied) -> + Copied + 1; +emsort_cb(_Ems, _Event, St) -> + St. -copy_meta_data(#db{fd=Fd, header=Header}=Db) -> + +calculate_sort_iters(Nodes, BBChunk, Count) when Nodes < BBChunk -> + Count; +calculate_sort_iters(Nodes0, BBChunk, Count) when BBChunk > 1 -> + Calc = fun(N0) -> + N1 = N0 div BBChunk, + N1 + if N1 rem BBChunk == 0 -> 0; true -> 1 end + end, + Nodes1 = Calc(Nodes0), + Nodes2 = Calc(Nodes1), + calculate_sort_iters(Nodes2, BBChunk, Count + 2). + + +copy_meta_data(#db{fd=Fd, header=Header}=Db, TotalChanges) -> + couch_task_status:update([ + {phase, copy_ids}, + {changes_done, 0}, + {total_changes, TotalChanges}, + {progress, 0} + ]), Src = Db#db.id_tree, DstState = couch_db_header:id_tree_state(Header), {ok, IdTree0} = couch_btree:open(DstState, Fd, [ @@ -1348,6 +1425,7 @@ copy_meta_data(#db{fd=Fd, header=Header}=Db) -> {ok, SeqTree} = couch_btree:add_remove( Acc#merge_st.seq_tree, [], Acc#merge_st.rem_seqs ), + update_compact_task(length(Acc#merge_st.infos)), Db#db{id_tree=IdTree, seq_tree=SeqTree}. @@ -1359,6 +1437,7 @@ merge_docids(Iter, #merge_st{infos=Infos}=Acc) when length(Infos) > 1000 -> } = Acc, {ok, IdTree1} = couch_btree:add(IdTree0, Infos), {ok, SeqTree1} = couch_btree:add_remove(SeqTree0, [], RemSeqs), + update_compact_task(length(Infos)), Acc1 = Acc#merge_st{ id_tree=IdTree1, seq_tree=SeqTree1, diff --git a/src/couch_emsort.erl b/src/couch_emsort.erl index 2a25a232..d7f1b2be 100644 --- a/src/couch_emsort.erl +++ b/src/couch_emsort.erl @@ -129,7 +129,8 @@ % CA3 CD3 % --export([open/1, open/2, get_fd/1, get_state/1]). +-export([open/1, open/2, set_options/2, get_fd/1, get_state/1]). +-export([get_bb_chunk_size/1]). -export([add/2, merge/1, sort/1, iter/1, next/1]). @@ -137,7 +138,9 @@ fd, root, bb_chunk = 10, - chain_chunk = 100 + chain_chunk = 100, + event_cb, + event_st }). @@ -156,7 +159,11 @@ set_options(Ems, [{root, Root} | Rest]) -> set_options(Ems, [{chain_chunk, Count} | Rest]) when is_integer(Count) -> set_options(Ems#ems{chain_chunk=Count}, Rest); set_options(Ems, [{back_bone_chunk, Count} | Rest]) when is_integer(Count) -> - set_options(Ems#ems{bb_chunk=Count}, Rest). + set_options(Ems#ems{bb_chunk=Count}, Rest); +set_options(Ems, [{event_cb, EventCB} | Rest]) when is_function(EventCB, 3) -> + set_options(Ems#ems{event_cb=EventCB}, Rest); +set_options(Ems, [{event_st, EventSt} | Rest]) -> + set_options(Ems#ems{event_st=EventSt}, Rest). get_fd(#ems{fd=Fd}) -> @@ -167,6 +174,10 @@ get_state(#ems{root=Root}) -> Root. +get_bb_chunk_size(#ems{bb_chunk = Size}) -> + Size. + + add(Ems, []) -> {ok, Ems}; add(Ems, KVs) -> @@ -224,7 +235,7 @@ decimate(#ems{root={_BB, nil}}=Ems) -> % We have less than bb_chunk backbone pointers so we're % good to start streaming KV's back to the client. Ems; -decimate(#ems{root={BB, NextBB}}=Ems) -> +decimate(#ems{}=Ems0) -> % To make sure we have a bounded amount of data in RAM % at any given point we first need to decimate the data % by performing the first couple iterations of a merge @@ -232,43 +243,47 @@ decimate(#ems{root={BB, NextBB}}=Ems) -> % The first pass gives us a sort with pointers linked from % largest to smallest. - {RevBB, RevNextBB} = merge_back_bone(Ems, small, BB, NextBB), + {ok, Ems1} = event_notify(Ems0, {merge_start, forward}), + {ok, Ems2} = merge_back_bone(Ems1, small), % We have to run a second pass so that links are pointed % back from smallest to largest. - {FwdBB, FwdNextBB} = merge_back_bone(Ems, big, RevBB, RevNextBB), + {ok, Ems3} = event_notify(Ems2, {merge_start, reverse}), + {ok, Ems4} = merge_back_bone(Ems3, big), % Continue deicmating until we have an acceptable bound on % the number of keys to use. - decimate(Ems#ems{root={FwdBB, FwdNextBB}}). + decimate(Ems4). -merge_back_bone(Ems, Choose, BB, NextBB) -> - BBPos = merge_chains(Ems, Choose, BB), - merge_rest_back_bone(Ems, Choose, NextBB, {[BBPos], nil}). +merge_back_bone(#ems{root={BB, NextBB}}=Ems0, Choose) -> + {ok, Ems1, BBPos} = merge_chains(Ems0, Choose, BB), + merge_rest_back_bone(Ems1, Choose, NextBB, {[BBPos], nil}). -merge_rest_back_bone(_Ems, _Choose, nil, Acc) -> - Acc; -merge_rest_back_bone(Ems, Choose, BBPos, Acc) -> - {ok, {BB, NextBB}} = couch_file:pread_term(Ems#ems.fd, BBPos), - NewPos = merge_chains(Ems, Choose, BB), - {NewBB, NewPrev} = append_item(Ems, Acc, NewPos, Ems#ems.bb_chunk), - merge_rest_back_bone(Ems, Choose, NextBB, {NewBB, NewPrev}). +merge_rest_back_bone(Ems, _Choose, nil, Acc) -> + {ok, Ems#ems{root=Acc}}; +merge_rest_back_bone(Ems0, Choose, BBPos, Acc) -> + {ok, {BB, NextBB}} = couch_file:pread_term(Ems0#ems.fd, BBPos), + {ok, Ems1, NewPos} = merge_chains(Ems0, Choose, BB), + {NewBB, NewPrev} = append_item(Ems1, Acc, NewPos, Ems1#ems.bb_chunk), + merge_rest_back_bone(Ems1, Choose, NextBB, {NewBB, NewPrev}). -merge_chains(Ems, Choose, BB) -> - Chains = init_chains(Ems, Choose, BB), - merge_chains(Ems, Choose, Chains, {[], nil}). +merge_chains(Ems0, Choose, BB) -> + {ok, Ems1} = event_notify(Ems0, {merge, chain}), + Chains = init_chains(Ems1, Choose, BB), + merge_chains(Ems1, Choose, Chains, {[], nil}). merge_chains(Ems, _Choose, [], ChainAcc) -> {ok, CPos, _} = couch_file:append_term(Ems#ems.fd, ChainAcc), - CPos; -merge_chains(#ems{chain_chunk=CC}=Ems, Choose, Chains, Acc) -> - {KV, RestChains} = choose_kv(Choose, Ems, Chains), - {NewKVs, NewPrev} = append_item(Ems, Acc, KV, CC), - merge_chains(Ems, Choose, RestChains, {NewKVs, NewPrev}). + {ok, Ems, CPos}; +merge_chains(#ems{chain_chunk=CC}=Ems0, Choose, Chains, Acc) -> + {KV, RestChains} = choose_kv(Choose, Ems0, Chains), + {NewKVs, NewPrev} = append_item(Ems0, Acc, KV, CC), + {ok, Ems1} = event_notify(Ems0, row_copy), + merge_chains(Ems1, Choose, RestChains, {NewKVs, NewPrev}). init_chains(Ems, Choose, BB) -> @@ -316,3 +331,9 @@ append_item(Ems, {List, Prev}, Pos, Size) when length(List) >= Size -> append_item(_Ems, {List, Prev}, Pos, _Size) -> {[Pos | List], Prev}. + +event_notify(#ems{event_cb = undefined} = Ems, _) -> + {ok, Ems}; +event_notify(#ems{event_cb=EventCB, event_st=EventSt}=Ems, Event) -> + NewSt = EventCB(Ems, Event, EventSt), + {ok, Ems#ems{event_st=NewSt}}.