23 #include <drizzled/message.h>
24 #include "read_replication.h"
25 #include "create_replication.h"
28 #include "dict0crea.ic"
47 UNIV_INTERN ulint dict_create_sys_replication_log(
void)
70 trx->
op_info=
"creating replication sys table";
72 row_mysql_lock_data_dictionary(trx);
78 "PROCEDURE CREATE_SYS_REPLICATION_LOG_PROC () IS\n"
80 "CREATE TABLE SYS_REPLICATION_LOG(ID INT(8), SEGID INT, COMMIT_ID INT(8), END_TIMESTAMP INT(8), ORIGINATING_SERVER_UUID BLOB, ORIGINATING_COMMIT_ID INT(8), MESSAGE_LEN INT, MESSAGE BLOB);\n"
81 "CREATE UNIQUE CLUSTERED INDEX PRIMARY ON SYS_REPLICATION_LOG (ID, SEGID);\n"
82 "CREATE INDEX COMMIT_IDX ON SYS_REPLICATION_LOG (COMMIT_ID, ID);\n"
88 if (error != DB_SUCCESS)
90 fprintf(stderr,
"InnoDB: error %lu in creation.\n", (ulong) error);
92 ut_a(error == DB_OUT_OF_FILE_SPACE || error == DB_TOO_MANY_CONCURRENT_TRXS);
95 "InnoDB: creation failed\n"
96 "InnoDB: tablespace is full\n"
97 "InnoDB: dropping incompletely created SYS_REPLICATION_LOG table.\n");
101 error = DB_MUST_GET_MORE_FILE_SPACE;
115 std::string search_string(table_name);
116 boost::algorithm::to_lower(search_string);
118 if (search_string.compare(
"sys_replication_log") != 0)
122 engine->set_name(
"InnoDB");
123 table_message->set_name(
"SYS_REPLICATION_LOG");
124 table_message->set_schema(
"DATA_DICTIONARY");
125 table_message->set_type(drizzled::message::Table::STANDARD);
126 table_message->set_creation_timestamp(0);
127 table_message->set_update_timestamp(0);
130 options->set_collation_id(drizzled::my_charset_bin.number);
131 options->set_collation(drizzled::my_charset_bin.name);
132 drizzled::message::set_is_replicated(*table_message,
false);
135 field->set_name(
"ID");
136 field->set_type(drizzled::message::Table::Field::BIGINT);
138 field= table_message->add_field();
139 field->set_name(
"SEGID");
140 field->set_type(drizzled::message::Table::Field::INTEGER);
142 field= table_message->add_field();
143 field->set_name(
"COMMIT_ID");
144 field->set_type(drizzled::message::Table::Field::BIGINT);
146 field= table_message->add_field();
147 field->set_name(
"END_TIMESTAMP");
148 field->set_type(drizzled::message::Table::Field::BIGINT);
150 field= table_message->add_field();
151 field->set_name(
"ORIGINATING_SERVER_UUID");
152 field->set_type(drizzled::message::Table::Field::BLOB);
154 field= table_message->add_field();
155 field->set_name(
"ORIGINATING_COMMIT_ID");
156 field->set_type(drizzled::message::Table::Field::BIGINT);
158 field= table_message->add_field();
159 field->set_name(
"MESSAGE_LEN");
160 field->set_type(drizzled::message::Table::Field::INTEGER);
162 field= table_message->add_field();
163 field->set_name(
"MESSAGE");
164 field->set_type(drizzled::message::Table::Field::BLOB);
166 stropt->set_collation_id(drizzled::my_charset_bin.number);
167 stropt->set_collation(drizzled::my_charset_bin.name);
170 index->set_name(
"PRIMARY");
171 index->set_is_primary(
true);
172 index->set_is_unique(
true);
173 index->set_type(drizzled::message::Table::Index::BTREE);
174 index->set_key_length(12);
176 part->set_fieldnr(0);
177 part->set_compare_length(8);
178 part= index->add_index_part();
179 part->set_fieldnr(1);
180 part->set_compare_length(4);
182 index= table_message->add_indexes();
183 index->set_name(
"COMMIT_IDX");
184 index->set_is_primary(
false);
185 index->set_is_unique(
false);
186 index->set_type(drizzled::message::Table::Index::BTREE);
187 index->set_key_length(16);
188 part= index->add_index_part();
189 part->set_fieldnr(2);
190 part->set_compare_length(8);
191 part= index->add_index_part();
192 part->set_fieldnr(0);
193 part->set_compare_length(8);
200 ulint insert_replication_message(
const char *message,
size_t size,
201 trx_t *trx, uint64_t trx_id,
202 uint64_t end_timestamp,
bool is_end_segment,
203 uint32_t seg_id,
const char *server_uuid,
204 bool use_originating_server_uuid,
205 const char *originating_server_uuid,
206 uint64_t originating_commit_id)
218 if (prebuilt->
trx != trx)
230 bool is_started=
true;
236 dtuple_t* dtuple= row_get_prebuilt_insert_row(prebuilt);
239 dfield = dtuple_get_nth_field(dtuple, 0);
244 dfield = dtuple_get_nth_field(dtuple, 1);
250 uint64_t commit_id= 0;
256 dfield = dtuple_get_nth_field(dtuple, 2);
261 dfield = dtuple_get_nth_field(dtuple, 3);
266 if (not use_originating_server_uuid)
270 originating_server_uuid= server_uuid;
271 originating_commit_id= commit_id;
274 dfield = dtuple_get_nth_field(dtuple, 4);
277 dfield = dtuple_get_nth_field(dtuple, 5);
282 dfield = dtuple_get_nth_field(dtuple, 6);
287 dfield = dtuple_get_nth_field(dtuple, 7);
295 node->
state = INS_NODE_SET_IX_LOCK;
298 node->
state = INS_NODE_ALLOC_ROW_ID;
353 rec= btr_pcur_get_rec(&state->pcur);
365 field = rec_get_nth_field_old(rec, 0, &len);
367 convert_to_mysql_format(idbyte, field, 8);
368 ret.id= *(uint64_t *)idbyte;
371 field = rec_get_nth_field_old(rec, 1, &len);
373 convert_to_mysql_format(segbyte, field, 4);
374 ret.seg_id= *(uint32_t *)segbyte;
376 field = rec_get_nth_field_old(rec, 4, &len);
378 convert_to_mysql_format(commitbyte, field, 8);
379 ret.commit_id= *(uint64_t *)commitbyte;
381 field = rec_get_nth_field_old(rec, 5, &len);
382 byte timestampbyte[8];
383 convert_to_mysql_format(timestampbyte, field, 8);
384 ret.end_timestamp= *(uint64_t *)timestampbyte;
386 field = rec_get_nth_field_old(rec, 6, &len);
387 ret.originating_server_uuid= (
char *)field;
389 field = rec_get_nth_field_old(rec, 7, &len);
390 byte originatingcommitbyte[8];
391 convert_to_mysql_format(originatingcommitbyte, field, 8);
392 ret.originating_commit_id= *(uint64_t *)originatingcommitbyte;
395 field = rec_get_nth_field_old(rec, 9, &len);
396 ret.message= (
char *)field;
397 ret.message_length= len;
412 memset(&ret, 0,
sizeof(ret));
417 UNIV_INTERN
void convert_to_mysql_format(byte* out,
const byte* in,
int len)
431 out[len - 1] = (byte) (out[len - 1] ^ 128);
UNIV_INTERN void que_thr_stop_for_mysql_no_error(que_thr_t *thr, trx_t *trx)
UNIV_INTERN void trx_free_for_mysql(trx_t *trx)
UNIV_INLINE ibool btr_pcur_is_on_user_rec(const btr_pcur_t *cursor)
UNIV_INLINE ibool dict_table_is_comp(const dict_table_t *table)
UNIV_INTERN byte * row_mysql_store_col_in_innobase_format(dfield_t *dfield, byte *buf, ibool row_format_col, const byte *mysql_data, ulint col_len, ulint comp)
UNIV_INTERN ulint trx_commit_for_mysql(trx_t *trx)
UNIV_INLINE dict_table_t * dict_table_get_low(const char *table_name)
UNIV_INLINE void btr_pcur_close(btr_pcur_t *cursor)
UNIV_INTERN void trx_sys_read_commit_id(void)
UNIV_INLINE ulint rec_get_deleted_flag(const rec_t *rec, ulint comp)
UNIV_INTERN void mtr_commit(mtr_t *mtr) __attribute__((nonnull))
UNIV_INLINE void dfield_set_data(dfield_t *field, const void *data, ulint len)
UNIV_INTERN void row_prebuilt_free(row_prebuilt_t *prebuilt, ibool dict_locked)
UNIV_INTERN void btr_pcur_store_position(btr_pcur_t *cursor, mtr_t *mtr)
UNIV_INTERN pars_info_t * pars_info_create(void)
UNIV_INLINE void * mem_heap_alloc(mem_heap_t *heap, ulint n)
UNIV_INTERN ulint que_eval_sql(pars_info_t *info, const char *sql, ibool reserve_dict_mutex, trx_t *trx)
UNIV_INLINE que_thr_t * que_fork_get_first_thr(que_fork_t *fork)
#define UT_LIST_GET_FIRST(BASE)
UNIV_INTERN row_prebuilt_t * row_create_prebuilt(dict_table_t *table)
UNIV_INTERN trx_t * trx_allocate_for_mysql(void)
UNIV_INTERN void que_thr_move_to_run_state_for_mysql(que_thr_t *thr, trx_t *trx)
UNIV_INTERN que_thr_t * row_ins_step(que_thr_t *thr)
drizzled::atomic< uint64_t > trx_sys_commit_id
UNIV_INLINE void mtr_start(mtr_t *mtr) __attribute__((nonnull))
UNIV_INLINE ibool btr_pcur_move_to_next_user_rec(btr_pcur_t *cursor, mtr_t *mtr)
UNIV_INTERN dict_table_t * dict_table_get(const char *table_name, ibool inc_mysql_count)
UNIV_INTERN void row_update_prebuilt_trx(row_prebuilt_t *prebuilt, trx_t *trx)
UNIV_INLINE void btr_pcur_open_at_index_side(ibool from_left, dict_index_t *index, ulint latch_mode, btr_pcur_t *pcur, ibool do_init, mtr_t *mtr)
UNIV_INTERN void row_mysql_unlock_data_dictionary(trx_t *trx)
UNIV_INTERN int row_drop_table_for_mysql(const char *name, trx_t *trx, ibool drop_db)