diff --git a/dbsim/db_client_service.hpp b/dbsim/db_client_service.hpp index be6f9ad8..1af29f6b 100644 --- a/dbsim/db_client_service.hpp +++ b/dbsim/db_client_service.hpp @@ -51,10 +51,8 @@ namespace db { return true; } - int prepare_fragment_for_replication(wsrep::mutable_buffer&, - size_t& position) override + int prepare_fragment_for_replication(wsrep::mutable_buffer&) override { - position = 0; return 0; } int remove_fragments() override { return 0; } diff --git a/include/wsrep/client_service.hpp b/include/wsrep/client_service.hpp index d47396df..1f83e4d1 100644 --- a/include/wsrep/client_service.hpp +++ b/include/wsrep/client_service.hpp @@ -88,17 +88,13 @@ namespace wsrep /** * Prepare a buffer containing data for the next fragment to replicate. - * The caller may set log_position to record the database specific - * position corresponding to changes contained in the buffer. - * When the call returns, the log_position will be available to read - * from streaming_context::log_position(). * * @return Zero in case of success, non-zero on failure. * If there is no data to replicate, the method shall return * zero and leave the buffer empty. */ - virtual int prepare_fragment_for_replication(wsrep::mutable_buffer& buffer, - size_t& log_position) = 0; + virtual int + prepare_fragment_for_replication(wsrep::mutable_buffer& buffer) = 0; /** * Remove fragments from the storage within current transaction. diff --git a/include/wsrep/client_state.hpp b/include/wsrep/client_state.hpp index d8449d7a..6d247a78 100644 --- a/include/wsrep/client_state.hpp +++ b/include/wsrep/client_state.hpp @@ -354,6 +354,14 @@ namespace wsrep */ int after_row(); + /** + * Force a streaming step + * + * This method can be used to replicate a fragment in the + * current context. + */ + int stream(); + /** * Set streaming parameters. * diff --git a/include/wsrep/streaming_context.hpp b/include/wsrep/streaming_context.hpp index 2b0ef0f3..7796418b 100644 --- a/include/wsrep/streaming_context.hpp +++ b/include/wsrep/streaming_context.hpp @@ -42,12 +42,12 @@ namespace wsrep streaming_context() : fragments_certified_() + , bytes_certified_() , fragments_() , rollback_replicated_for_() , fragment_unit_() , fragment_size_() , unit_counter_() - , log_position_() { } /** @@ -78,10 +78,14 @@ namespace wsrep /** Disable streaming replication. */ void disable(); - /** Increment counter for certified fragments. */ - void certified() + /** + * Increment counter for certified fragments and total + * number of bytes. + */ + void certified(size_t bytes) { ++fragments_certified_; + bytes_certified_ += bytes; } /** Return number of certified fragments. */ @@ -90,6 +94,12 @@ namespace wsrep return fragments_certified_; } + /** Return total number of bytes replicated. */ + size_t bytes_certified() const + { + return bytes_certified_; + } + /** Mark fragment with seqno as stored in fragment store. */ void stored(wsrep::seqno seqno); @@ -137,18 +147,6 @@ namespace wsrep unit_counter_ = 0; } - /** Return current log position. */ - size_t log_position() const - { - return log_position_; - } - - /** Set log position. */ - void set_log_position(size_t position) - { - log_position_ = position; - } - /** Return vector of stored fragments. */ const std::vector& fragments() const { @@ -168,12 +166,12 @@ namespace wsrep void check_fragment_seqno(wsrep::seqno seqno); size_t fragments_certified_; + size_t bytes_certified_; std::vector fragments_; wsrep::transaction_id rollback_replicated_for_; enum fragment_unit fragment_unit_; size_t fragment_size_; size_t unit_counter_; - size_t log_position_; }; } diff --git a/include/wsrep/transaction.hpp b/include/wsrep/transaction.hpp index 3328c093..0cea1bc9 100644 --- a/include/wsrep/transaction.hpp +++ b/include/wsrep/transaction.hpp @@ -175,6 +175,8 @@ namespace wsrep int after_row(); + int stream(); + int before_prepare(wsrep::unique_lock&); int after_prepare(wsrep::unique_lock&); diff --git a/src/client_state.cpp b/src/client_state.cpp index 99c4222f..68e23e9e 100644 --- a/src/client_state.cpp +++ b/src/client_state.cpp @@ -336,6 +336,13 @@ int wsrep::client_state::after_row() : 0); } +int wsrep::client_state::stream() +{ + assert(mode_ == m_local); + assert(state_ == s_exec); + return transaction_.stream(); +} + void wsrep::client_state::fragment_applied(wsrep::seqno seqno) { assert(mode_ == m_high_priority); diff --git a/src/streaming_context.cpp b/src/streaming_context.cpp index c5423079..2fef179d 100644 --- a/src/streaming_context.cpp +++ b/src/streaming_context.cpp @@ -81,10 +81,10 @@ void wsrep::streaming_context::rolled_back(wsrep::transaction_id id) void wsrep::streaming_context::cleanup() { fragments_certified_ = 0; + bytes_certified_ = 0; fragments_.clear(); rollback_replicated_for_ = wsrep::transaction_id::undefined(); unit_counter_ = 0; - log_position_ = 0; } void wsrep::streaming_context::check_fragment_seqno( diff --git a/src/transaction.cpp b/src/transaction.cpp index 451e94dd..a555de97 100644 --- a/src/transaction.cpp +++ b/src/transaction.cpp @@ -273,6 +273,15 @@ int wsrep::transaction::after_row() return ret; } +int wsrep::transaction::stream() +{ + wsrep::unique_lock lock(client_state_.mutex()); + debug_log_state("stream_enter"); + int ret(streaming_step(lock, true)); + debug_log_state("stream_leave"); + return ret; +} + int wsrep::transaction::before_prepare( wsrep::unique_lock& lock) { @@ -1416,24 +1425,23 @@ int wsrep::transaction::streaming_step(wsrep::unique_lock& lock, bool force) { assert(lock.owns_lock()); - assert(streaming_context_.fragment_size() || is_xa()); + assert(streaming_context_.fragment_size() || force); + + const size_t bytes_generated(client_service_.bytes_generated()); + const size_t bytes_certified(streaming_context_.bytes_certified()); - if (client_service_.bytes_generated() < - streaming_context_.log_position()) + if (bytes_generated < bytes_certified) { /* Something went wrong on DBMS side in keeping track of generated bytes. Return an error to abort the transaction. */ - wsrep::log_warning() << "Bytes generated " - << client_service_.bytes_generated() - << " less than bytes certified " - << streaming_context_.log_position() + wsrep::log_warning() << "Bytes generated " << bytes_generated + << " less than bytes certified " << bytes_certified << ", aborting streaming transaction"; return 1; } - int ret(0); - const size_t bytes_to_replicate(client_service_.bytes_generated() - - streaming_context_.log_position()); + int ret(0); + const size_t bytes_to_replicate(bytes_generated - bytes_certified); switch (streaming_context_.fragment_unit()) { @@ -1500,15 +1508,13 @@ int wsrep::transaction::certify_fragment( } wsrep::mutable_buffer data; - size_t log_position(0); - if (client_service_.prepare_fragment_for_replication(data, log_position)) + if (client_service_.prepare_fragment_for_replication(data)) { lock.lock(); state(lock, s_must_abort); client_state_.override_error(wsrep::e_error_during_commit); return 1; } - streaming_context_.set_log_position(log_position); if (data.size() == 0) { @@ -1616,7 +1622,7 @@ int wsrep::transaction::certify_fragment( case wsrep::provider::success: ++fragments_certified_for_statement_; assert(sr_ws_meta.seqno().is_undefined() == false); - streaming_context_.certified(); + streaming_context_.certified(data.size()); if (storage_service.update_fragment_meta(sr_ws_meta)) { storage_service.rollback(wsrep::ws_handle(), @@ -1656,7 +1662,7 @@ int wsrep::transaction::certify_fragment( // we take a risk of sending one rollback fragment for nothing. storage_service.rollback(wsrep::ws_handle(), wsrep::ws_meta()); - streaming_context_.certified(); + streaming_context_.certified(data.size()); ret = 1; error = wsrep::e_deadlock_error; break; @@ -2147,7 +2153,6 @@ void wsrep::transaction::debug_log_state( << ", unit: " << streaming_context_.fragment_unit() << ", size: " << streaming_context_.fragment_size() << ", counter: " << streaming_context_.unit_counter() - << ", log_pos: " << streaming_context_.log_position() << ", sr_rb: " << streaming_context_.rolled_back() << "\n own: " << (client_state_.owning_thread_id_ == wsrep::this_thread::get_id()) << " thread_id: " << client_state_.owning_thread_id_ diff --git a/test/mock_client_state.hpp b/test/mock_client_state.hpp index 73b27755..af77a854 100644 --- a/test/mock_client_state.hpp +++ b/test/mock_client_state.hpp @@ -149,7 +149,7 @@ namespace wsrep bool statement_allowed_for_streaming() const WSREP_OVERRIDE { return true; } - int prepare_fragment_for_replication(wsrep::mutable_buffer& buffer, size_t& position) + int prepare_fragment_for_replication(wsrep::mutable_buffer& buffer) WSREP_OVERRIDE { if (error_during_prepare_data_) @@ -159,7 +159,6 @@ namespace wsrep static const char buf[1] = { 1 }; buffer.push_back(&buf[0], &buf[1]); wsrep::const_buffer data(buffer.data(), buffer.size()); - position = buffer.size(); return client_state_->append_data(data); }