Skip to content

Interrupt production on irreversible block #1569

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jun 26, 2025
Merged
90 changes: 56 additions & 34 deletions libraries/chain/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1410,7 +1410,7 @@ struct controller_impl {

// When in IRREVERSIBLE mode fork_db blocks are applied and marked valid when they become irreversible
template<typename ForkDB, typename BSP>
controller::apply_blocks_result apply_irreversible_block(ForkDB& fork_db, const BSP& bsp) {
controller::apply_blocks_result_t::status_t apply_irreversible_block(ForkDB& fork_db, const BSP& bsp) {
if constexpr (std::is_same_v<block_state_legacy_ptr, std::decay_t<decltype(bsp)>>) {
// before transition to savanna
return apply_block(bsp, controller::block_status::complete, trx_meta_cache_lookup{});
Expand All @@ -1426,13 +1426,13 @@ struct controller_impl {
return apply_block(bsp, controller::block_status::complete, trx_meta_cache_lookup{});
}
// only called during transition when not a proper savanna block
return fork_db_.apply_l<controller::apply_blocks_result>([&](const auto& fork_db_l) {
return fork_db_.apply_l<controller::apply_blocks_result_t::status_t>([&](const auto& fork_db_l) {
block_state_legacy_ptr legacy = fork_db_l.get_block(bsp->id());
fork_db_.switch_to(fork_database::in_use_t::legacy); // apply block uses to know what types to create
block_state_ptr prev = fork_db.get_block(legacy->previous(), include_root_t::yes);
assert(prev);
controller::apply_blocks_result r = apply_block(legacy, controller::block_status::complete, trx_meta_cache_lookup{});
if( r == controller::apply_blocks_result::complete) {
controller::apply_blocks_result_t::status_t r = apply_block(legacy, controller::block_status::complete, trx_meta_cache_lookup{});
if (r == controller::apply_blocks_result_t::status_t::complete) {
fc::scoped_exit<std::function<void()>> e([&]{fork_db_.switch_to(fork_database::in_use_t::both);});
// irreversible apply was just done, calculate new_valid here instead of in transition_to_savanna()
assert(legacy->action_mroot_savanna);
Expand Down Expand Up @@ -1550,7 +1550,7 @@ struct controller_impl {
}
}

controller::apply_blocks_result log_irreversible() {
controller::apply_blocks_result_t log_irreversible() {
EOS_ASSERT( fork_db_has_root(), fork_database_exception, "fork database not properly initialized" );

const std::optional<block_id_type> log_head_id = blog.head_id();
Expand Down Expand Up @@ -1590,13 +1590,13 @@ struct controller_impl {

const block_id_type new_lib_id = pending_lib_id();
const block_num_type new_lib_num = block_header::num_from_id(new_lib_id);
controller::apply_blocks_result result = controller::apply_blocks_result::complete;

if( new_lib_num <= lib_num )
return result;
return controller::apply_blocks_result_t{};

const fc::time_point start = fc::time_point::now();

controller::apply_blocks_result_t result;
auto mark_branch_irreversible = [&, this](auto& fork_db) {
assert(!irreversible_mode() || fork_db.head());
const auto& head_id = irreversible_mode() ? fork_db.head()->id() : chain_head.id();
Expand All @@ -1621,9 +1621,12 @@ struct controller_impl {

for( auto bitr = branch.rbegin(); bitr != branch.rend() && should_process(*bitr); ++bitr ) {
if (irreversible_mode()) {
result = apply_irreversible_block(fork_db, *bitr);
if (result != controller::apply_blocks_result::complete)
controller::apply_blocks_result_t::status_t r = apply_irreversible_block(fork_db, *bitr);
if (r != controller::apply_blocks_result_t::status_t::complete) {
result.status = r;
break;
}
++result.num_blocks_applied;
}

emit( irreversible_block, std::tie((*bitr)->block, (*bitr)->id()), __FILE__, __LINE__ );
Expand All @@ -1643,7 +1646,7 @@ struct controller_impl {
// In irreversible mode, break every ~500ms to allow other tasks (e.g. get_info, SHiP) opportunity to run
const bool more_blocks_to_process = bitr + 1 != branch.rend();
if (!replaying && more_blocks_to_process && fc::time_point::now() - start > fc::milliseconds(500)) {
result = controller::apply_blocks_result::incomplete;
result.status = controller::apply_blocks_result_t::status_t::incomplete;
break;
}
}
Expand Down Expand Up @@ -3793,16 +3796,16 @@ struct controller_impl {
}

template<class BSP>
controller::apply_blocks_result apply_block( const BSP& bsp, controller::block_status s,
const trx_meta_cache_lookup& trx_lookup ) {
controller::apply_blocks_result_t::status_t apply_block( const BSP& bsp, controller::block_status s,
const trx_meta_cache_lookup& trx_lookup ) {
try {
try {
if (should_terminate()) {
shutdown();
return controller::apply_blocks_result::incomplete;
return controller::apply_blocks_result_t::status_t::incomplete;
}
if (should_pause()) {
return controller::apply_blocks_result::paused;
return controller::apply_blocks_result_t::status_t::paused;
}

auto start = fc::time_point::now(); // want to report total time of applying a block
Expand Down Expand Up @@ -3959,7 +3962,7 @@ struct controller_impl {

commit_block(s);

return controller::apply_blocks_result::complete;
return controller::apply_blocks_result_t::status_t::complete;
} catch ( const std::bad_alloc& ) {
throw;
} catch ( const boost::interprocess::bad_alloc& ) {
Expand Down Expand Up @@ -4448,7 +4451,7 @@ struct controller_impl {

BSP bsp = std::make_shared<typename BSP::element_type>(*head, b, protocol_features.get_protocol_feature_set(), validator, skip_validate_signee);

if (apply_block(bsp, controller::block_status::irreversible, trx_meta_cache_lookup{}) == controller::apply_blocks_result::complete) {
if (apply_block(bsp, controller::block_status::irreversible, trx_meta_cache_lookup{}) == controller::apply_blocks_result_t::status_t::complete) {
// On replay, log_irreversible is not called and so no irreversible_block signal is emitted.
// So emit it explicitly here.
emit( irreversible_block, std::tie(bsp->block, bsp->id()), __FILE__, __LINE__ );
Expand All @@ -4464,7 +4467,7 @@ struct controller_impl {
} FC_LOG_AND_RETHROW( )
}

controller::apply_blocks_result apply_blocks(const forked_callback_t& cb, const trx_meta_cache_lookup& trx_lookup) {
controller::apply_blocks_result_t apply_blocks(const forked_callback_t& cb, const trx_meta_cache_lookup& trx_lookup) {
try {
if( !irreversible_mode() ) {
return maybe_apply_blocks( cb, trx_lookup );
Expand All @@ -4484,13 +4487,13 @@ struct controller_impl {
}
}

controller::apply_blocks_result maybe_apply_blocks( const forked_callback_t& forked_cb, const trx_meta_cache_lookup& trx_lookup )
controller::apply_blocks_result_t maybe_apply_blocks( const forked_callback_t& forked_cb, const trx_meta_cache_lookup& trx_lookup )
{
controller::apply_blocks_result result = controller::apply_blocks_result::complete;
auto do_apply_blocks = [&](auto& fork_db) {
auto do_apply_blocks = [&](auto& fork_db) -> controller::apply_blocks_result_t {
controller::apply_blocks_result_t result;
auto new_head = fork_db.head(); // use best head
if (!new_head)
return;// nothing to do, fork_db at root
return result;// nothing to do, fork_db at root
auto [new_head_branch, old_head_branch] = fork_db.fetch_branch_from( new_head->id(), chain_head.id() );

bool switch_fork = !old_head_branch.empty();
Expand Down Expand Up @@ -4538,25 +4541,29 @@ struct controller_impl {
auto except = std::exception_ptr{};
const auto& bsp = *ritr;
try {
result = apply_block( bsp, bsp->is_valid() ? controller::block_status::validated
: controller::block_status::complete, trx_lookup );
controller::apply_blocks_result_t::status_t r =
apply_block( bsp, bsp->is_valid() ? controller::block_status::validated
: controller::block_status::complete, trx_lookup );
if (r == controller::apply_blocks_result_t::status_t::complete)
++result.num_blocks_applied;

if (!switch_fork) {
if (check_shutdown()) {
shutdown();
result = controller::apply_blocks_result::incomplete; // doesn't really matter since we are shutting down
result.status = controller::apply_blocks_result_t::status_t::incomplete; // doesn't really matter since we are shutting down
break;
}
if (result == controller::apply_blocks_result::complete) {
if (r == controller::apply_blocks_result_t::status_t::complete) {
// Break every ~500ms to allow other tasks (e.g. get_info, SHiP) opportunity to run. User expected
// to call apply_blocks again if this returns incomplete.
const bool more_blocks_to_process = ritr + 1 != new_head_branch.rend();
if (!replaying && more_blocks_to_process && fc::time_point::now() - start_apply_blocks_loop > fc::milliseconds(500)) {
result = controller::apply_blocks_result::incomplete;
result.status = controller::apply_blocks_result_t::status_t::incomplete;
break;
}
}
}
if (result != controller::apply_blocks_result::complete) {
if (r != controller::apply_blocks_result_t::status_t::complete) {
break;
}
} catch ( const std::bad_alloc& ) {
Expand Down Expand Up @@ -4617,15 +4624,13 @@ struct controller_impl {
}

// irreversible can change even if block not applied to head, integrated qc can move LIB
auto log_result = log_irreversible();
log_irreversible();
transition_to_savanna_if_needed();
if (log_result != controller::apply_blocks_result::complete)
result = log_result;
};

fork_db_.apply<void>(do_apply_blocks);
return result;
};

return result;
return fork_db_.apply<controller::apply_blocks_result_t>(do_apply_blocks);
}

deque<transaction_metadata_ptr> abort_block() {
Expand Down Expand Up @@ -4892,6 +4897,18 @@ struct controller_impl {
return is_trx_transient ? nullptr : deep_mind_logger;
}

bool is_head_descendant_of_pending_lib() const {
return fork_db_.apply<bool>(
[&](const fork_database_legacy_t& fork_db) -> bool {
// there is no pending lib in legacy
return true;
},
[&](const fork_database_if_t& fork_db) -> bool {
return fork_db.is_descendant_of_pending_savanna_lib(chain_head.id());
}
);
}

void set_savanna_lib_id(const block_id_type& id) {
fork_db_.apply_s<void>([&](auto& fork_db) {
fork_db.set_pending_savanna_lib_id(id);
Expand Down Expand Up @@ -5417,7 +5434,7 @@ void controller::set_async_aggregation(async_t val) {
my->async_aggregation = val;
}

controller::apply_blocks_result controller::apply_blocks(const forked_callback_t& cb, const trx_meta_cache_lookup& trx_lookup) {
controller::apply_blocks_result_t controller::apply_blocks(const forked_callback_t& cb, const trx_meta_cache_lookup& trx_lookup) {
validate_db_available_size();
return my->apply_blocks(cb, trx_lookup);
}
Expand Down Expand Up @@ -5581,6 +5598,11 @@ std::optional<block_id_type> controller::pending_producer_block_id()const {
return my->pending_producer_block_id();
}

bool controller::is_head_descendant_of_pending_lib() const {
return my->is_head_descendant_of_pending_lib();
}


void controller::set_savanna_lib_id(const block_id_type& id) {
my->set_savanna_lib_id(id);
}
Expand Down
41 changes: 41 additions & 0 deletions libraries/chain/fork_database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ namespace eosio::chain {
void remove_impl( block_num_type block_num );
bsp_t head_impl(include_root_t include_root) const;
bool set_pending_savanna_lib_id_impl(const block_id_type& id);
bool is_descendant_of_pending_savanna_lib_impl(const block_id_type& id) const;
bool is_descendant_of_impl(const block_id_type& ancestor, const block_id_type& descendant) const;
branch_t fetch_branch_impl( const block_id_type& h, uint32_t trim_after_block_num ) const;
block_branch_t fetch_block_branch_impl( const block_id_type& h, uint32_t trim_after_block_num ) const;
branch_t fetch_branch_impl( const block_id_type& h, const block_id_type& b ) const;
Expand Down Expand Up @@ -375,6 +377,45 @@ namespace eosio::chain {
return false;
}

template<class BSP>
bool fork_database_t<BSP>::is_descendant_of_pending_savanna_lib( const block_id_type& id ) const {
std::lock_guard g( my->mtx );
return my->is_descendant_of_pending_savanna_lib_impl(id);
}

template<class BSP>
bool fork_database_impl<BSP>::is_descendant_of_pending_savanna_lib_impl(const block_id_type& id) const {
if (pending_savanna_lib_id == id)
return true;
return is_descendant_of_impl(pending_savanna_lib_id, id);
}

template<class BSP>
bool fork_database_t<BSP>::is_descendant_of(const block_id_type& ancestor, const block_id_type& descendant) const {
std::lock_guard g( my->mtx );
return my->is_descendant_of_impl(ancestor, descendant);
}

template<class BSP>
bool fork_database_impl<BSP>::is_descendant_of_impl(const block_id_type& ancestor, const block_id_type& descendant) const {
block_num_type ancestor_block_num = block_header::num_from_id(ancestor);
if (ancestor_block_num >= block_header::num_from_id(descendant))
return false;

auto i = index.find(descendant);
for (; i != index.end(); i = index.find((*i)->previous())) {
if ((*i)->previous() == ancestor)
return true;
if ((*i)->block_num() <= ancestor_block_num+1) // +1 since comparison of previous() already done
return false;
}

// At this point descendant is not found in ancestor, but root has not been checked.
// However, root is either the ancestor or we can't make determination if descendant is a child because
// ancestor < root. Therefore, no reason to check root.
return false;
}

template <class BSP>
eosio::chain::fork_database_t<BSP>::branch_t
fork_database_t<BSP>::fetch_branch(const block_id_type& h, uint32_t trim_after_block_num) const {
Expand Down
18 changes: 13 additions & 5 deletions libraries/chain/include/eosio/chain/controller.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,12 +246,16 @@ namespace eosio::chain {
accepted_block_result accept_block( const block_id_type& id, const signed_block_ptr& b ) const;

/// Apply any blocks that are ready from the fork_db
enum class apply_blocks_result {
complete, // all ready blocks in forkdb have been applied
incomplete, // time limit reached, additional blocks may be available in forkdb to process
paused // apply blocks currently paused
struct apply_blocks_result_t {
enum class status_t {
complete, // all ready blocks in forkdb have been applied
incomplete, // time limit reached, additional blocks may be available in forkdb to process
paused // apply blocks currently paused
};
status_t status = status_t::complete;
size_t num_blocks_applied = 0;
};
apply_blocks_result apply_blocks(const forked_callback_t& cb, const trx_meta_cache_lookup& trx_lookup);
apply_blocks_result_t apply_blocks(const forked_callback_t& cb, const trx_meta_cache_lookup& trx_lookup);

boost::asio::io_context& get_thread_pool();

Expand Down Expand Up @@ -339,6 +343,10 @@ namespace eosio::chain {
// thread-safe
qc_vote_metrics_t::fin_auth_set_t missing_votes(const block_id_type& id, const qc_t& qc) const;

// not thread-safe
bool is_head_descendant_of_pending_lib() const;

// thread-safe
void set_savanna_lib_id(const block_id_type& id);

// thread-safe
Expand Down
14 changes: 14 additions & 0 deletions libraries/chain/include/eosio/chain/fork_database.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,20 @@ namespace eosio::chain {
block_id_type pending_savanna_lib_id() const;
bool set_pending_savanna_lib_id( const block_id_type& id );

/**
* @return true if id is built on top of pending savanna lib or id == pending_savanna_lib
*/
bool is_descendant_of_pending_savanna_lib( const block_id_type& id ) const;

/**
* @param ancestor the id of a possible ancestor block
* @param descendant the id of a possible descendant block
* @return false if either ancestor or descendant not found.
* true if any descendant->previous.. == ancestor.
* false if unable to find ancestor in any descendant->previous..
*/
bool is_descendant_of(const block_id_type& ancestor, const block_id_type& descendant) const;

/**
* Returns the sequence of block states resulting from trimming the branch from the
* root block (exclusive) to the block with an id of `h` (inclusive) by removing any
Expand Down
2 changes: 1 addition & 1 deletion libraries/testing/tester.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ namespace eosio::testing {
}

void base_tester::apply_blocks() {
while (control->apply_blocks( {}, {} ) == controller::apply_blocks_result::incomplete)
while (control->apply_blocks( {}, {} ).status == controller::apply_blocks_result_t::status_t::incomplete)
;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ namespace eosio::chain::plugin_interface {
namespace incoming {
namespace methods {
// synchronously push a block/trx to a single provider, block_state_legacy_ptr may be null
using block_sync = method_decl<chain_plugin_interface, controller::apply_blocks_result(const signed_block_ptr&, const block_id_type&, const block_handle&), first_provider_policy>;
using block_sync = method_decl<chain_plugin_interface, controller::apply_blocks_result_t(const signed_block_ptr&, const block_id_type&, const block_handle&), first_provider_policy>;
using transaction_async = method_decl<chain_plugin_interface, void(const packed_transaction_ptr&, bool, transaction_metadata::trx_type, bool, next_function<transaction_trace_ptr>), first_provider_policy>;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class producer_plugin : public appbase::plugin<producer_plugin> {
virtual void plugin_shutdown();
void handle_sighup() override;

controller::apply_blocks_result on_incoming_block();
controller::apply_blocks_result_t on_incoming_block();

struct pause_at_block_params {
chain::block_num_type block_num{0}; // block height to pause block evaluation/production
Expand Down
Loading
Loading