Skip to content

Remove log_position #223

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions dbsim/db_client_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,8 @@ namespace db
{
return true;
}
int prepare_fragment_for_replication(wsrep::mutable_buffer&,
size_t& position) override
int prepare_fragment_for_replication(wsrep::mutable_buffer&) override
{
position = 0;
return 0;
}
int remove_fragments() override { return 0; }
Expand Down
8 changes: 2 additions & 6 deletions include/wsrep/client_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,17 +88,13 @@ namespace wsrep

/**
* Prepare a buffer containing data for the next fragment to replicate.
* The caller may set log_position to record the database specific
* position corresponding to changes contained in the buffer.
* When the call returns, the log_position will be available to read
* from streaming_context::log_position().
*
* @return Zero in case of success, non-zero on failure.
* If there is no data to replicate, the method shall return
* zero and leave the buffer empty.
*/
virtual int prepare_fragment_for_replication(wsrep::mutable_buffer& buffer,
size_t& log_position) = 0;
virtual int
prepare_fragment_for_replication(wsrep::mutable_buffer& buffer) = 0;

/**
* Remove fragments from the storage within current transaction.
Expand Down
8 changes: 8 additions & 0 deletions include/wsrep/client_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,14 @@ namespace wsrep
*/
int after_row();

/**
* Force a streaming step
*
* This method can be used to replicate a fragment in the
* current context.
*/
int stream();

/**
* Set streaming parameters.
*
Expand Down
30 changes: 14 additions & 16 deletions include/wsrep/streaming_context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ namespace wsrep

streaming_context()
: fragments_certified_()
, bytes_certified_()
, fragments_()
, rollback_replicated_for_()
, fragment_unit_()
, fragment_size_()
, unit_counter_()
, log_position_()
{ }

/**
Expand Down Expand Up @@ -78,10 +78,14 @@ namespace wsrep
/** Disable streaming replication. */
void disable();

/** Increment counter for certified fragments. */
void certified()
/**
* Increment counter for certified fragments and total
* number of bytes.
*/
void certified(size_t bytes)
{
++fragments_certified_;
bytes_certified_ += bytes;
}

/** Return number of certified fragments. */
Expand All @@ -90,6 +94,12 @@ namespace wsrep
return fragments_certified_;
}

/** Return total number of bytes replicated. */
size_t bytes_certified() const
{
return bytes_certified_;
}

/** Mark fragment with seqno as stored in fragment store. */
void stored(wsrep::seqno seqno);

Expand Down Expand Up @@ -137,18 +147,6 @@ namespace wsrep
unit_counter_ = 0;
}

/** Return current log position. */
size_t log_position() const
{
return log_position_;
}

/** Set log position. */
void set_log_position(size_t position)
{
log_position_ = position;
}

/** Return vector of stored fragments. */
const std::vector<wsrep::seqno>& fragments() const
{
Expand All @@ -168,12 +166,12 @@ namespace wsrep
void check_fragment_seqno(wsrep::seqno seqno);

size_t fragments_certified_;
size_t bytes_certified_;
std::vector<wsrep::seqno> fragments_;
wsrep::transaction_id rollback_replicated_for_;
enum fragment_unit fragment_unit_;
size_t fragment_size_;
size_t unit_counter_;
size_t log_position_;
};
}

Expand Down
2 changes: 2 additions & 0 deletions include/wsrep/transaction.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ namespace wsrep

int after_row();

int stream();

int before_prepare(wsrep::unique_lock<wsrep::mutex>&);

int after_prepare(wsrep::unique_lock<wsrep::mutex>&);
Expand Down
7 changes: 7 additions & 0 deletions src/client_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,13 @@ int wsrep::client_state::after_row()
: 0);
}

int wsrep::client_state::stream()
{
assert(mode_ == m_local);
assert(state_ == s_exec);
return transaction_.stream();
}

void wsrep::client_state::fragment_applied(wsrep::seqno seqno)
{
assert(mode_ == m_high_priority);
Expand Down
2 changes: 1 addition & 1 deletion src/streaming_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ void wsrep::streaming_context::rolled_back(wsrep::transaction_id id)
void wsrep::streaming_context::cleanup()
{
fragments_certified_ = 0;
bytes_certified_ = 0;
fragments_.clear();
rollback_replicated_for_ = wsrep::transaction_id::undefined();
unit_counter_ = 0;
log_position_ = 0;
}

void wsrep::streaming_context::check_fragment_seqno(
Expand Down
37 changes: 21 additions & 16 deletions src/transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,15 @@ int wsrep::transaction::after_row()
return ret;
}

int wsrep::transaction::stream()
{
wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex());
debug_log_state("stream_enter");
int ret(streaming_step(lock, true));
debug_log_state("stream_leave");
return ret;
}

int wsrep::transaction::before_prepare(
wsrep::unique_lock<wsrep::mutex>& lock)
{
Expand Down Expand Up @@ -1416,24 +1425,23 @@ int wsrep::transaction::streaming_step(wsrep::unique_lock<wsrep::mutex>& lock,
bool force)
{
assert(lock.owns_lock());
assert(streaming_context_.fragment_size() || is_xa());
assert(streaming_context_.fragment_size() || force);

const size_t bytes_generated(client_service_.bytes_generated());
const size_t bytes_certified(streaming_context_.bytes_certified());

if (client_service_.bytes_generated() <
streaming_context_.log_position())
if (bytes_generated < bytes_certified)
{
/* Something went wrong on DBMS side in keeping track of
generated bytes. Return an error to abort the transaction. */
wsrep::log_warning() << "Bytes generated "
<< client_service_.bytes_generated()
<< " less than bytes certified "
<< streaming_context_.log_position()
wsrep::log_warning() << "Bytes generated " << bytes_generated
<< " less than bytes certified " << bytes_certified
<< ", aborting streaming transaction";
return 1;
}
int ret(0);

const size_t bytes_to_replicate(client_service_.bytes_generated() -
streaming_context_.log_position());
int ret(0);
const size_t bytes_to_replicate(bytes_generated - bytes_certified);

switch (streaming_context_.fragment_unit())
{
Expand Down Expand Up @@ -1500,15 +1508,13 @@ int wsrep::transaction::certify_fragment(
}

wsrep::mutable_buffer data;
size_t log_position(0);
if (client_service_.prepare_fragment_for_replication(data, log_position))
if (client_service_.prepare_fragment_for_replication(data))
{
lock.lock();
state(lock, s_must_abort);
client_state_.override_error(wsrep::e_error_during_commit);
return 1;
}
streaming_context_.set_log_position(log_position);

if (data.size() == 0)
{
Expand Down Expand Up @@ -1616,7 +1622,7 @@ int wsrep::transaction::certify_fragment(
case wsrep::provider::success:
++fragments_certified_for_statement_;
assert(sr_ws_meta.seqno().is_undefined() == false);
streaming_context_.certified();
streaming_context_.certified(data.size());
if (storage_service.update_fragment_meta(sr_ws_meta))
{
storage_service.rollback(wsrep::ws_handle(),
Expand Down Expand Up @@ -1656,7 +1662,7 @@ int wsrep::transaction::certify_fragment(
// we take a risk of sending one rollback fragment for nothing.
storage_service.rollback(wsrep::ws_handle(),
wsrep::ws_meta());
streaming_context_.certified();
streaming_context_.certified(data.size());
ret = 1;
error = wsrep::e_deadlock_error;
break;
Expand Down Expand Up @@ -2147,7 +2153,6 @@ void wsrep::transaction::debug_log_state(
<< ", unit: " << streaming_context_.fragment_unit()
<< ", size: " << streaming_context_.fragment_size()
<< ", counter: " << streaming_context_.unit_counter()
<< ", log_pos: " << streaming_context_.log_position()
<< ", sr_rb: " << streaming_context_.rolled_back()
<< "\n own: " << (client_state_.owning_thread_id_ == wsrep::this_thread::get_id())
<< " thread_id: " << client_state_.owning_thread_id_
Expand Down
3 changes: 1 addition & 2 deletions test/mock_client_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ namespace wsrep

bool statement_allowed_for_streaming() const WSREP_OVERRIDE
{ return true; }
int prepare_fragment_for_replication(wsrep::mutable_buffer& buffer, size_t& position)
int prepare_fragment_for_replication(wsrep::mutable_buffer& buffer)
WSREP_OVERRIDE
{
if (error_during_prepare_data_)
Expand All @@ -159,7 +159,6 @@ namespace wsrep
static const char buf[1] = { 1 };
buffer.push_back(&buf[0], &buf[1]);
wsrep::const_buffer data(buffer.data(), buffer.size());
position = buffer.size();
return client_state_->append_data(data);
}

Expand Down