Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion sql/rpl_rli.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2169,7 +2169,8 @@ rpl_group_info::rpl_group_info(Relay_log_info *rli)
deferred_events(NULL), m_annotate_event(0), is_parallel_exec(false),
gtid_ev_flags2(0), gtid_ev_flags_extra(0), gtid_ev_sa_seq_no(0),
reserved_start_alter_thread(0), finish_event_group_called(0), rpt(NULL),
start_alter_ev(NULL), direct_commit_alter(false), sa_info(NULL)
start_alter_ev(NULL), direct_commit_alter(false), sa_info(NULL),
is_new_trans(false)
{
reinit(rli);
bzero(&current_gtid, sizeof(current_gtid));
Expand Down
24 changes: 23 additions & 1 deletion sql/rpl_rli.h
Original file line number Diff line number Diff line change
Expand Up @@ -913,7 +913,7 @@ struct rpl_group_info
Query_log_event *start_alter_ev;
bool direct_commit_alter;
start_alter_info *sa_info;

bool is_new_trans; // marker of start_new_trans context
rpl_group_info(Relay_log_info *rli_);
~rpl_group_info();
void reinit(Relay_log_info *rli);
Expand Down Expand Up @@ -1055,6 +1055,28 @@ struct rpl_group_info

};

/**
The function prohibits access to THD::rgi_slave, screens it
in the contexts like one with start_new_trans defined.
*/
inline rpl_group_info* get_rgi_slave(rpl_group_info* rgi, bool check_new_trans= false)
{
return (rgi && rgi->is_new_trans && check_new_trans) ? NULL : rgi;
}
/**
The following functions for start_new_trans' ctor and dtor.
*/
inline void hide_rgi_slave(rpl_group_info* rgi)
{
if (rgi)
rgi->is_new_trans= true;
}
inline void unhide_rgi_slave(rpl_group_info* rgi)
{
if (rgi)
rgi->is_new_trans= false;
}


/*
The class rpl_sql_thread_info is the THD::system_thread_info for an SQL
Expand Down
5 changes: 2 additions & 3 deletions sql/sql_class.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6364,8 +6364,7 @@ start_new_trans::start_new_trans(THD *thd)
thd->server_status&= ~(SERVER_STATUS_IN_TRANS |
SERVER_STATUS_IN_TRANS_READONLY);
thd->server_status|= SERVER_STATUS_AUTOCOMMIT;
org_rgi_slave= thd->rgi_slave;
thd->rgi_slave= NULL;
hide_rgi_slave(thd->rgi_slave);
}


Expand All @@ -6382,7 +6381,7 @@ void start_new_trans::restore_old_transaction()
MYSQL_COMMIT_TRANSACTION(org_thd->m_transaction_psi);
org_thd->m_transaction_psi= m_transaction_psi;
org_thd->variables.wsrep_on= wsrep_on;
org_thd->rgi_slave= org_rgi_slave;
unhide_rgi_slave(org_thd->rgi_slave);
org_thd= 0;
}

Expand Down
5 changes: 0 additions & 5 deletions sql/sql_class.h
Original file line number Diff line number Diff line change
Expand Up @@ -5885,11 +5885,6 @@ class start_new_trans
uint in_sub_stmt;
uint server_status;
my_bool wsrep_on;
/*
THD:rgi_slave may hold a part of the replicated "old" transaction's
execution context. Therefore it has to be reset/restored too.
*/
rpl_group_info* org_rgi_slave;

public:
start_new_trans(THD *thd);
Expand Down
34 changes: 21 additions & 13 deletions sql/temporary_tables.cc
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ bool THD::open_temporary_table(TABLE_LIST *tl)
DBUG_ASSERT(!tl->derived);
DBUG_ASSERT(!tl->schema_table);
DBUG_ASSERT(has_temporary_tables() ||
(rgi_slave && rgi_slave->is_parallel_exec));
(get_rgi_slave(rgi_slave) && rgi_slave->is_parallel_exec));

if (tl->open_type == OT_BASE_ONLY)
{
Expand All @@ -365,9 +365,13 @@ bool THD::open_temporary_table(TABLE_LIST *tl)

So for now, anything that uses temporary tables will be serialised
with anything before it, when using parallel replication.

Because slave applier can also execute an "out-of-band" transaction
within start_new_trans context the slave thread must not see
the replicated temporary table and for that use a rgi_slave getter.
*/

if (rgi_slave &&
if (get_rgi_slave(rgi_slave, true) &&
rgi_slave->is_parallel_exec &&
find_temporary_table(tl) &&
wait_for_prior_commit())
Expand Down Expand Up @@ -397,7 +401,7 @@ bool THD::open_temporary_table(TABLE_LIST *tl)
So for now, anything that uses temporary tables will be serialised
with anything before it, when using parallel replication.
*/
if (table && rgi_slave &&
if (table && get_rgi_slave(rgi_slave, true) &&
rgi_slave->is_parallel_exec &&
wait_for_prior_commit())
DBUG_RETURN(true);
Expand Down Expand Up @@ -524,7 +528,7 @@ bool THD::close_temporary_tables()
DBUG_RETURN(false);
}

DBUG_ASSERT(!rgi_slave);
DBUG_ASSERT(!get_rgi_slave(rgi_slave));

/*
Ensure we don't have open HANDLERs for tables we are about to close.
Expand Down Expand Up @@ -778,7 +782,7 @@ void THD::mark_tmp_tables_as_free_for_reuse()
unlock_temporary_tables();
}

if (rgi_slave)
if (get_rgi_slave(rgi_slave, true))
{
/*
Temporary tables are shared with other by sql execution threads.
Expand Down Expand Up @@ -893,7 +897,7 @@ bool THD::has_temporary_tables()
DBUG_ENTER("THD::has_temporary_tables");
bool result;
#ifdef HAVE_REPLICATION
if (rgi_slave)
if (get_rgi_slave(rgi_slave, true))
{
mysql_mutex_lock(&rgi_slave->rli->data_lock);
result= rgi_slave->rli->save_temporary_tables &&
Expand Down Expand Up @@ -972,7 +976,7 @@ TMP_TABLE_SHARE *THD::create_temporary_table(LEX_CUSTRING *frm,
int res;

/* Temporary tables are not safe for parallel replication. */
if (rgi_slave &&
if (get_rgi_slave(rgi_slave, true) &&
rgi_slave->is_parallel_exec &&
wait_for_prior_commit())
DBUG_RETURN(NULL);
Expand Down Expand Up @@ -1167,7 +1171,7 @@ TABLE *THD::open_temporary_table(TMP_TABLE_SHARE *share,
share->all_tmp_tables.push_front(table);

/* Increment Slave_open_temp_table_definitions status variable count. */
if (rgi_slave)
if (get_rgi_slave(rgi_slave, true))
slave_open_temp_tables++;

DBUG_PRINT("tmptable", ("Opened table: '%s'.'%s table: %p",
Expand Down Expand Up @@ -1235,7 +1239,7 @@ bool THD::use_temporary_table(TABLE *table, TABLE **out_table)
it though, as statement-based replication using temporary tables is
in any case rather fragile.
*/
if (rgi_slave &&
if (get_rgi_slave(rgi_slave, true) &&
rgi_slave->is_parallel_exec &&
wait_for_prior_commit())
DBUG_RETURN(true);
Expand Down Expand Up @@ -1268,7 +1272,7 @@ void THD::close_temporary_table(TABLE *table)
closefrm(table);
my_free(table);

if (rgi_slave)
if (get_rgi_slave(rgi_slave, true))
{
/* Natural invariant of temporary_tables */
DBUG_ASSERT(slave_open_temp_tables || !temporary_tables);
Expand All @@ -1290,7 +1294,11 @@ void THD::close_temporary_table(TABLE *table)
bool THD::log_events_and_free_tmp_shares()
{
DBUG_ENTER("THD::log_events_and_free_tmp_shares");

/*
Inferentially !get_rgi_slave() holds too but is weaker as
the start_new_trans context is not allowed to log anything,
on slave incl.
*/
DBUG_ASSERT(!rgi_slave);

TMP_TABLE_SHARE *share;
Expand Down Expand Up @@ -1540,7 +1548,7 @@ bool THD::lock_temporary_tables()
}

#ifdef HAVE_REPLICATION
if (rgi_slave)
if (get_rgi_slave(rgi_slave, true))
{
mysql_mutex_lock(&rgi_slave->rli->data_lock);
temporary_tables= rgi_slave->rli->save_temporary_tables;
Expand Down Expand Up @@ -1568,7 +1576,7 @@ void THD::unlock_temporary_tables()
}

#ifdef HAVE_REPLICATION
if (rgi_slave)
if (get_rgi_slave(rgi_slave, true))
{
rgi_slave->rli->save_temporary_tables= temporary_tables;
temporary_tables= NULL; /* Safety */
Expand Down