#0 group_replication_trans_before_commit (param=0x7ffff0e7b8d0) at /root/softm/percona-server-5.7.22-22/rapid/plugin/group_replication/src/observer_trans.cc:511#1 0x00000000014e4814 in Trans_delegate::before_commit (this=0x2e44800, thd=0x7fffd8000df0, all=false, trx_cache_log=0x7fffd8907a10, stmt_cache_log=0x7fffd8907858, cache_log_max_size=18446744073709547520) at /root/softm/percona-server-5.7.22-22/sql/rpl_handler.cc:325#2 0x000000000188a386 in MYSQL_BIN_LOG::commit (this=0x2e7b440, thd=0x7fffd8000df0, all=false) at /root/softm/percona-server-5.7.22-22/sql/binlog.cc:8974#3 0x0000000000f80623 in ha_commit_trans (thd=0x7fffd8000df0, all=false, ignore_global_read_lock=false) at /root/softm/percona-server-5.7.22-22/sql/handler.cc:1830#4 0x00000000016ddab9 in trans_commit_stmt (thd=0x7fffd8000df0) at /root/softm/percona-server-5.7.22-22/sql/transaction.cc:458#5 0x00000000015d1a8d in mysql_execute_command (thd=0x7fffd8000df0, first_level=true) at /root/softm/percona-server-5.7.22-22/sql/sql_parse.cc:5293#6 0x00000000015d3182 in mysql_parse (thd=0x7fffd8000df0, parser_state=0x7ffff0e7e600) at /root/softm/percona-server-5.7.22-22/sql/sql_parse.cc:5901#7 0x00000000015c6d16 in dispatch_command (thd=0x7fffd8000df0, com_data=0x7ffff0e7ed70, command=COM_QUERY) at /root/softm/percona-server-5.7.22-22/sql/sql_parse.cc:1490#8 0x00000000015c5aa3 in do_command (thd=0x7fffd8000df0) at /root/softm/percona-server-5.7.22-22/sql/sql_parse.cc:1021#9 0x000000000170ebb0 in handle_connection (arg=0x3cd32d0) at /root/softm/percona-server-5.7.22-22/sql/conn_handler/connection_handler_per_thread.cc:312#10 0x0000000001946140 in pfs_spawn_thread (arg=0x3c71630) at /root/softm/percona-server-5.7.22-22/storage/perfschema/pfs.cc:2190#11 0x00007ffff7bc7851 in start_thread () from /lib64/libpthread.so.0#12 0x00007ffff651290d in clone () from /lib64/libc.so.6 三、MGR全局认证发送内容的生成过程 下面是我通过对源码浅显的理解得出过程:
经过hash的Write set (集合)->拷贝到write_set变量(类数组)->通过base64算法写入到Transaction_context_log_event ->合并其他binlog event到transaction_msg->gcs_module广播transaction_msg到其他节点->等待认证结果 四、相关源码 1、group_replication_trans_before_commit 函数相关内容 if (trx_cache_log_position > 0 && stmt_cache_log_position == 0) //如果存在事物cache { cache_log= param->trx_cache_log; //设置到IO_cache cache_log_position= trx_cache_log_position; } else if (trx_cache_log_position == 0 && stmt_cache_log_position > 0)//如果存在语句cache { cache_log= param->stmt_cache_log; cache_log_position= stmt_cache_log_position; is_dml= false; may_have_sbr_stmts= true; } else { /* purecov: begin inspected */ log_message(MY_ERROR_LEVEL, "We can only use one cache type at a " "time on session %u", param->thread_id); shared_plugin_stop_lock->release_read_lock(); DBUG_RETURN(1); /* purecov: end */ } applier_module->get_pipeline_stats_member_collector() ->increment_transactions_local(); DBUG_ASSERT(cache_log->type == WRITE_CACHE); DBUG_PRINT("cache_log", ("thread_id: %u, trx_cache_log_position: %llu," " stmt_cache_log_position: %llu", param->thread_id, trx_cache_log_position, stmt_cache_log_position)); /* Open group replication cache. Reuse the same cache on each session for improved performance. */ cache= observer_trans_get_io_cache(param->thread_id, param->cache_log_max_size); //获取一个新的IO_CACHE系统 if (cache == NULL) //错误处理 { /* purecov: begin inspected */ error= pre_wait_error; goto err; /* purecov: end */ } // Reinit binlog cache to read. if (reinit_cache(cache_log, READ_CACHE, 0)) ////将IO_CACHE类型进行转换 并且位置还原 { /* purecov: begin inspected */ log_message(MY_ERROR_LEVEL, "Failed to reinit binlog cache log for read " "on session %u", param->thread_id); error= pre_wait_error; goto err; /* purecov: end */ } /* After this, cache_log should be reinit to old saved value when we are going out of the function scope. */ reinit_cache_log_required= true; // Create transaction context. tcle= new Transaction_context_log_event(param->server_uuid, Rpl_transaction_write_set_ctx is_dml, param->thread_id, is_gtid_specified); //初始化 Transaction_context_log_event if (!tcle->is_valid()) { /* purecov: begin inspected */ log_message(MY_ERROR_LEVEL, "Failed to create the context of the current " "transaction on session %u", param->thread_id); error= pre_wait_error; goto err; /* purecov: end */ } if (is_dml) { Transaction_write_set* write_set= get_transaction_write_set(param->thread_id);// 获取前期得到write set 并且放回到一个临时内存空间 write_set中 /* When GTID is specified we may have empty transactions, that is, a transaction may have not write set at all because it didn't change any data, it will just persist that GTID as applied. */ if ((write_set == NULL) && (!is_gtid_specified)) { log_message(MY_ERROR_LEVEL, "Failed to extract the set of items written " "during the execution of the current " "transaction on session %u", param->thread_id); error= pre_wait_error; goto err; } if (write_set != NULL) { if (add_write_set(tcle, write_set))//将整个wirte_set内容复制到event Transaction_context_log_event中 此时就进入了event了 { /* purecov: begin inspected */ cleanup_transaction_write_set(write_set); //write set已经完成了它的功能需要析构 log_message(MY_ERROR_LEVEL, "Failed to gather the set of items written " "during the execution of the current " "transaction on session %u", param->thread_id); error= pre_wait_error; goto err; /* purecov: end */ } cleanup_transaction_write_set(write_set); //如果add_write_set函数调用出现 有问题 也需要析构掉 DBUG_ASSERT(is_gtid_specified || (tcle->get_write_set()->size() > 0)); } else { /* For empty transactions we should set the GTID may_have_sbr_stmts. See comment at binlog_cache_data::may_have_sbr_stmts(). */ may_have_sbr_stmts= true; } Log_event::write } // Write transaction context to group replication cache. tcle->write(cache); //写入到MGR CACHE 写入 TCLE的header(virtual) body(virtual) footer // Write Gtid log event to group replication cache. gle= new Gtid_log_event(param->server_id, is_dml, 0, 1, may_have_sbr_stmts, gtid_specification); gle->write(cache); //写入GTID event到MGR CACHE 占位 transaction_size= cache_log_position + my_b_tell(cache); if (is_dml && transaction_size_limit && transaction_size > transaction_size_limit) { log_message(MY_ERROR_LEVEL, "Error on session %u. " "Transaction of size %llu exceeds specified limit %lu. " "To increase the limit please adjust group_replication_transaction_size_limit option.", param->thread_id, transaction_size, transaction_size_limit); //group_replication_transaction_size_limit 事物大小参数 error= pre_wait_error; goto err; } // Reinit group replication cache to read. if (reinit_cache(cache, READ_CACHE, 0))//将IO_CACHE类型进行转换 并且位置还原 { /* purecov: begin inspected */ log_message(MY_ERROR_LEVEL, "Error while re-initializing an internal " "cache, for read operations, on session %u", param->thread_id); error= pre_wait_error; goto err; /* purecov: end */ } // Copy group replication cache to buffer. if (transaction_msg.append_cache(cache)) //加入到transaction_msg { /* purecov: begin inspected */ log_message(MY_ERROR_LEVEL, "Error while appending data to an internal " "cache on session %u", param->thread_id); error= pre_wait_error; goto err; /* purecov: end */ } // Copy binlog cache content to buffer. if (transaction_msg.append_cache(cache_log))//加入到transaction_msg { /* purecov: begin inspected */ log_message(MY_ERROR_LEVEL, "Error while writing binary log cache on " "session %u", param->thread_id); error= pre_wait_error; goto err; /* purecov: end */ } DBUG_ASSERT(certification_latch != NULL); if (certification_latch->registerTicket(param->thread_id)) { /* purecov: begin inspected */ log_message(MY_ERROR_LEVEL, "Unable to register for getting notifications " "regarding the outcome of the transaction on " "session %u", param->thread_id); error= pre_wait_error; goto err; /* purecov: end */ }#ifndef DBUG_OFF DBUG_EXECUTE_IF("test_basic_CRUD_operations_sql_service_interface", { DBUG_SET("-d,test_basic_CRUD_operations_sql_service_interface"); DBUG_ASSERT(!sql_command_check()); };); DBUG_EXECUTE_IF("group_replication_before_message_broadcast", { const char act[]= "now wait_for waiting"; DBUG_ASSERT(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act))); });#endif /* Check if member needs to throttle its transactions to avoid cause starvation on the group. */ applier_module->get_flow_control_module()->do_wait(); //流控相关 //Broadcast the Transaction Message send_error= gcs_module->send_message(transaction_msg); //gcs广播 if (send_error == GCS_MESSAGE_TOO_BIG) { /* purecov: begin inspected */ log_message(MY_ERROR_LEVEL, "Error broadcasting transaction to the group " "on session %u. Message is too big.", param->thread_id); error= pre_wait_error; goto err; /* purecov: end */ } else if (send_error == GCS_NOK) { /* purecov: begin inspected */ log_message(MY_ERROR_LEVEL, "Error while broadcasting the transaction to " "the group on session %u", param->thread_id); error= pre_wait_error; goto err; /* purecov: end */ } shared_plugin_stop_lock->release_read_lock(); DBUG_ASSERT(certification_latch != NULL); if (certification_latch->waitTicket(param->thread_id)) //等待认证结果 { /* purecov: begin inspected */ log_message(MY_ERROR_LEVEL, "Error while waiting for conflict detection " "procedure to finish on session %u", param->thread_id); error= post_wait_error; goto err; /* purecov: end */ } 2、add_write_set函数 int add_write_set(Transaction_context_log_event *tcle, Transaction_write_set *set){ DBUG_ENTER("add_write_set"); int iterator= set->write_set_size; //将循环次数设置为 set的长度 也就是有多少个write sets for (int i = 0; i < iterator; i++) { uchar buff[BUFFER_READ_PKE]; int8store(buff, set->write_set[i]); //逐字节复制到buffer中 uint64 const tmp_str_sz= base64_needed_encoded_length((uint64) BUFFER_READ_PKE); char *write_set_value= (char *) my_malloc(PSI_NOT_INSTRUMENTED, static_cast<size_t>(tmp_str_sz), MYF(MY_WME)); //13bytes (gdb) p tmp_str_sz $2 = 13 if (!write_set_value)//分配内存错误 { /* purecov: begin inspected */ log_message(MY_ERROR_LEVEL, "No memory to generate write identification hash"); DBUG_RETURN(1); /* purecov: end */ } if (base64_encode(buff, (size_t) BUFFER_READ_PKE, write_set_value)) //做base64算法 { /* purecov: begin inspected */ log_message(MY_ERROR_LEVEL, "Base 64 encoding of the write identification hash failed"); DBUG_RETURN(1); /* purecov: end */ } tcle->add_write_set(write_set_value); //最终将base64格式的write set写入到event中 } DBUG_RETURN(0); } 3、get_transaction_write_set函数 Transaction_write_set* get_transaction_write_set(unsigned long m_thread_id){ DBUG_ENTER("get_transaction_write_set"); THD *thd= NULL; Transaction_write_set *result_set= NULL; Find_thd_with_id find_thd_with_id(m_thread_id, false); thd= Global_THD_manager::get_instance()->find_thd(&find_thd_with_id); if (thd) { std::set<uint64> *write_set= thd->get_transaction() ->get_transaction_write_set_ctx()->get_write_set(); //Rpl_transaction_write_set_ctx std::set<uint64> *get_write_set(); unsigned long write_set_size= write_set->size(); //返回集合大小 if (write_set_size == 0) { mysql_mutex_unlock(&thd->LOCK_thd_data); DBUG_RETURN(NULL); } result_set= (Transaction_write_set*)my_malloc(key_memory_write_set_extraction, sizeof(Transaction_write_set), MYF(0));//这里为其Transaction_write_set分配内存空间 result_set->write_set_size= write_set_size; //获取size result_set->write_set= (unsigned long long*)my_malloc(key_memory_write_set_extraction, write_set_size * sizeof(unsigned long long), MYF(0));//分配内存 int result_set_index= 0; for (std::set<uint64>::iterator it= write_set->begin();//完成复制注意是从set中复制到简单的内存中 it != write_set->end(); ++it) { uint64 temp= *it; result_set->write_set[result_set_index++]=temp; } mysql_mutex_unlock(&thd->LOCK_thd_data); } DBUG_RETURN(result_set); }