Skip to content

Commit 6a97d67

Browse files
committed
Support detach on transaction prepare
Split method `transaction::xa_detach()` in `before_xa_detach()` and `after_xa_detach()`. This allows to handle bf abort or replay while transaction is detaching and to distinguish if a transaction is bf aborted before or after the DBMS has detached a transaction from its storage engine.
1 parent f8ff2cf commit 6a97d67

9 files changed

+185
-40
lines changed

dbsim/db_high_priority_service.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,11 @@ const wsrep::transaction& db::high_priority_service::transaction() const
4545
return client_.client_state().transaction();
4646
}
4747

48+
wsrep::client_state& db::high_priority_service::client_state() const
49+
{
50+
return client_.client_state();
51+
}
52+
4853
int db::high_priority_service::adopt_transaction(const wsrep::transaction&)
4954
{
5055
throw wsrep::not_implemented_error();

dbsim/db_high_priority_service.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ namespace db
3434
const wsrep::ws_meta&) override;
3535
int next_fragment(const wsrep::ws_meta&) override;
3636
const wsrep::transaction& transaction() const override;
37+
wsrep::client_state& client_state() const override;
3738
int adopt_transaction(const wsrep::transaction&) override;
3839
int apply_write_set(const wsrep::ws_meta&,
3940
const wsrep::const_buffer&,

include/wsrep/client_state.hpp

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -628,27 +628,31 @@ namespace wsrep
628628
* transaction disconnects, and the transaction must not rollback.
629629
* After this call, a different client may later attempt to terminate
630630
* the transaction by calling method commit_by_xid() or rollback_by_xid().
631+
*
632+
* @return Zero on success, non-zero if the transaction was BF aborted
631633
*/
632-
void xa_detach()
633-
{
634-
assert(mode_ == m_local);
635-
assert(state_ == s_none || state_ == s_exec || state_ == s_quitting);
636-
transaction_.xa_detach();
637-
}
634+
int before_xa_detach();
635+
636+
/**
637+
* This method should be called to conclude the XA detach operation,
638+
* after the DBMS has detached the transaction.
639+
*
640+
* @return Zero on success, non-zero if transaction was BF aborted
641+
*/
642+
int after_xa_detach();
638643

639644
/**
640645
* Replay a XA transaction
641646
*
642-
* Replay a XA transaction that is in s_idle state.
647+
* Replay a local XA transaction in s_idle state,
648+
* or detached.
643649
* This may happen if the transaction is BF aborted
644650
* between prepare and commit.
645-
* Since the victim is idle, this method can be called
646-
* by the BF aborter or the backround rollbacker.
647651
*/
648652
void xa_replay()
649653
{
650-
assert(mode_ == m_local);
651-
assert(state_ == s_idle);
654+
assert((mode_ == m_local && state_ == s_idle) ||
655+
(mode_ == m_high_priority));
652656
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
653657
transaction_.xa_replay(lock);
654658
}

include/wsrep/high_priority_service.hpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,11 @@ namespace wsrep
6363
*/
6464
virtual const wsrep::transaction& transaction() const = 0;
6565

66+
/**
67+
* Return the associated client_state object
68+
*/
69+
virtual wsrep::client_state& client_state() const = 0;
70+
6671
/**
6772
* Adopt a transaction.
6873
*/

include/wsrep/transaction.hpp

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,11 @@ namespace wsrep
127127
return !xid_.is_null();
128128
}
129129

130+
/**
131+
* Return true if the transaction has completed the prepare step.
132+
*/
133+
bool is_prepared_xa() const;
134+
130135
void assign_xid(const wsrep::xid& xid);
131136

132137
const wsrep::xid& xid() const
@@ -138,7 +143,9 @@ namespace wsrep
138143

139144
int commit_or_rollback_by_xid(const wsrep::xid& xid, bool commit);
140145

141-
void xa_detach();
146+
int before_xa_detach(wsrep::unique_lock<mutex>&);
147+
148+
int after_xa_detach(wsrep::unique_lock<mutex>&);
142149

143150
int xa_replay(wsrep::unique_lock<wsrep::mutex>&);
144151

@@ -214,6 +221,11 @@ namespace wsrep
214221
return bf_aborted_in_total_order_;
215222
}
216223

224+
wsrep::seqno bf_seqno() const
225+
{
226+
return bf_seqno_;
227+
}
228+
217229
int flags() const
218230
{
219231
return flags_;
@@ -269,6 +281,7 @@ namespace wsrep
269281
enum wsrep::provider::status bf_abort_provider_status_;
270282
int bf_abort_client_state_;
271283
bool bf_aborted_in_total_order_;
284+
wsrep::seqno bf_seqno_;
272285
wsrep::ws_handle ws_handle_;
273286
wsrep::ws_meta ws_meta_;
274287
int flags_;

src/client_state.cpp

Lines changed: 70 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include "wsrep/server_state.hpp"
2424
#include "wsrep/server_service.hpp"
2525
#include "wsrep/client_service.hpp"
26+
#include "wsrep/high_priority_service.hpp"
2627

2728
#include <unistd.h> // usleep()
2829
#include <sstream>
@@ -304,8 +305,9 @@ void wsrep::client_state::sync_rollback_complete()
304305
{
305306
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
306307
debug_log_state("sync_rollback_complete: enter");
307-
assert(state_ == s_idle && mode_ == m_local &&
308-
transaction_.state() == wsrep::transaction::s_aborted);
308+
assert((state_ == s_idle && mode_ == m_local &&
309+
transaction_.state() == wsrep::transaction::s_aborted) ||
310+
mode_ == m_high_priority);
309311
set_rollbacker_active(false);
310312
cond_.notify_all();
311313
debug_log_state("sync_rollback_complete: leave");
@@ -315,14 +317,74 @@ void wsrep::client_state::wait_rollback_complete_and_acquire_ownership()
315317
{
316318
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
317319
debug_log_state("wait_rollback_complete_and_acquire_ownership: enter");
318-
if (state_ == s_idle)
320+
if (state_ == s_idle || mode_ == m_high_priority)
319321
{
320322
do_wait_rollback_complete_and_acquire_ownership(lock);
321323
}
322324
assert(state_ == s_exec);
323325
debug_log_state("wait_rollback_complete_and_acquire_ownership: leave");
324326
}
325327

328+
//////////////////////////////////////////////////////////////////////////////
329+
// XA //
330+
//////////////////////////////////////////////////////////////////////////////
331+
332+
int wsrep::client_state::before_xa_detach()
333+
{
334+
int ret(0);
335+
client_service_.debug_sync("wsrep_before_xa_detach_enter");
336+
{
337+
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
338+
assert(mode_ == m_local);
339+
assert(state_ == s_none || state_ == s_exec || state_ == s_quitting);
340+
if (transaction_.state() == wsrep::transaction::s_must_abort)
341+
{
342+
transaction_.state(lock, wsrep::transaction::s_must_replay);
343+
lock.unlock();
344+
client_service_.bf_rollback();
345+
lock.lock();
346+
ret = 1;
347+
}
348+
else
349+
{
350+
ret = transaction_.before_xa_detach(lock);
351+
}
352+
}
353+
client_service_.debug_sync("wsrep_before_xa_detach_leave");
354+
return ret;
355+
}
356+
357+
int wsrep::client_state::after_xa_detach()
358+
{
359+
int ret(0);
360+
client_service_.debug_sync("wsrep_after_xa_detach_enter");
361+
{
362+
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
363+
assert(mode_ == m_local);
364+
if (transaction_.state() == wsrep::transaction::s_must_abort)
365+
{
366+
wsrep::high_priority_service* sa(
367+
server_state_.find_streaming_applier(transaction_.server_id(),
368+
transaction_.id()));
369+
assert(sa);
370+
if (sa)
371+
{
372+
wsrep::client_state& cs(sa->client_state());
373+
cs.transaction_.state(lock, wsrep::transaction::s_must_abort);
374+
cs.transaction_.state(lock, wsrep::transaction::s_must_replay);
375+
cs.set_rollbacker_active(true);
376+
lock.unlock();
377+
server_state_.server_service().background_rollback(
378+
sa->client_state());
379+
lock.lock();
380+
}
381+
}
382+
ret = transaction_.after_xa_detach(lock);
383+
}
384+
client_service_.debug_sync("wsrep_after_xa_detach_leave");
385+
return ret;
386+
}
387+
326388
//////////////////////////////////////////////////////////////////////////////
327389
// Streaming //
328390
//////////////////////////////////////////////////////////////////////////////
@@ -786,13 +848,16 @@ void wsrep::client_state::do_wait_rollback_complete_and_acquire_ownership(
786848
wsrep::unique_lock<wsrep::mutex>& lock)
787849
{
788850
assert(lock.owns_lock());
789-
assert(state_ == s_idle);
851+
assert(state_ == s_idle || mode_ == m_high_priority);
790852
while (is_rollbacker_active())
791853
{
792854
cond_.wait(lock);
793855
}
794856
do_acquire_ownership(lock);
795-
state(lock, s_exec);
857+
if (state_ == s_idle)
858+
{
859+
state(lock, s_exec);
860+
}
796861
}
797862

798863
void wsrep::client_state::update_last_written_gtid(const wsrep::gtid& gtid)

0 commit comments

Comments
 (0)