40 #include <drizzled/gettext.h>
41 #include <drizzled/plugin/transaction_applier.h>
42 #include <drizzled/message/transaction.pb.h>
43 #include <drizzled/plugin.h>
45 #include <drizzled/item/string.h>
47 #include <boost/program_options.hpp>
51 namespace po= boost::program_options;
53 using namespace drizzled;
55 namespace drizzle_plugin
58 static string sysvar_filtered_replicator_sch_filters;
59 static string sysvar_filtered_replicator_tab_filters;
61 FilteredReplicator::FilteredReplicator(
string name_arg,
62 const std::string &sch_filter,
63 const std::string &tab_filter,
64 const std::string &sch_regex,
65 const std::string &tab_regex) :
66 plugin::TransactionReplicator(name_arg),
69 _sch_filter(sch_filter),
70 _tab_filter(tab_filter),
71 _sch_regex(sch_regex),
72 _tab_regex(tab_regex),
80 if (not _sch_filter.empty())
82 populateFilter(_sch_filter, schemas_to_filter);
89 if (not _tab_filter.empty())
91 populateFilter(_tab_filter, tables_to_filter);
98 if (not _sch_regex.empty())
100 const char *error= NULL;
101 int32_t error_offset= 0;
102 sch_re= pcre_compile(_sch_regex.c_str(),
113 if (not _tab_regex.empty())
115 const char *error= NULL;
116 int32_t error_offset= 0;
117 tab_re= pcre_compile(_tab_regex.c_str(),
124 pthread_mutex_init(&sch_vector_lock, NULL);
125 pthread_mutex_init(&tab_vector_lock, NULL);
126 pthread_mutex_init(&sysvar_sch_lock, NULL);
127 pthread_mutex_init(&sysvar_tab_lock, NULL);
141 pthread_mutex_destroy(&sch_vector_lock);
142 pthread_mutex_destroy(&tab_vector_lock);
143 pthread_mutex_destroy(&sysvar_sch_lock);
144 pthread_mutex_destroy(&sysvar_tab_lock);
149 string &in_schema_name,
150 string &in_table_name)
const
152 switch (in_statement.type())
154 case message::Statement::INSERT:
157 in_schema_name= metadata.schema_name();
158 in_table_name= metadata.table_name();
161 case message::Statement::UPDATE:
164 in_schema_name= metadata.schema_name();
165 in_table_name= metadata.table_name();
168 case message::Statement::DELETE:
171 in_schema_name= metadata.schema_name();
172 in_table_name= metadata.table_name();
175 case message::Statement::CREATE_SCHEMA:
177 in_schema_name= in_statement.create_schema_statement().schema().name();
178 in_table_name.clear();
181 case message::Statement::ALTER_SCHEMA:
183 in_schema_name= in_statement.alter_schema_statement().after().name();
184 in_table_name.clear();
187 case message::Statement::DROP_SCHEMA:
189 in_schema_name= in_statement.drop_schema_statement().schema_name();
190 in_table_name.clear();
193 case message::Statement::CREATE_TABLE:
195 in_schema_name= in_statement.create_table_statement().table().schema();
196 in_table_name= in_statement.create_table_statement().table().name();
199 case message::Statement::ALTER_TABLE:
201 in_schema_name= in_statement.alter_table_statement().after().schema();
202 in_table_name= in_statement.alter_table_statement().after().name();
205 case message::Statement::DROP_TABLE:
208 in_schema_name= metadata.schema_name();
209 in_table_name= metadata.table_name();
215 in_schema_name.clear();
216 in_table_name.clear();
222 plugin::ReplicationReturnCode
230 size_t num_statements= to_replicate.statement_size();
242 for (
size_t x= 0; x < num_statements; ++x)
259 if (statement.type() == message::Statement::RAW_SQL)
261 parseQuery(statement.sql(), schema_name, table_name);
265 parseStatementTableMetadata(statement, schema_name, table_name);
274 boost::to_lower(schema_name);
275 boost::to_lower(table_name);
277 if (! isSchemaFiltered(schema_name) && ! isTableFiltered(table_name))
284 if (filtered_transaction.statement_size() > 0)
292 *tc= to_replicate.transaction_context();
293 return in_applier->
apply(in_session, filtered_transaction);
295 return plugin::SUCCESS;
299 std::vector<string> &filter)
305 boost::to_lower(input);
306 string::size_type last_pos= input.find_first_not_of(
',', 0);
307 string::size_type pos= input.find_first_of(
',', last_pos);
309 while (pos != string::npos || last_pos != string::npos)
311 filter.push_back(input.substr(last_pos, pos - last_pos));
312 last_pos= input.find_first_not_of(
',', pos);
313 pos= input.find_first_of(
',', last_pos);
319 pthread_mutex_lock(&sch_vector_lock);
320 std::vector<string>::iterator it= find(schemas_to_filter.begin(), schemas_to_filter.end(), schema_name);
321 if (it != schemas_to_filter.end())
323 pthread_mutex_unlock(&sch_vector_lock);
326 pthread_mutex_unlock(&sch_vector_lock);
333 if (not _sch_regex.empty())
335 int32_t result= pcre_exec(sch_re, NULL, schema_name.c_str(), schema_name.length(), 0, 0, NULL, 0);
345 pthread_mutex_lock(&tab_vector_lock);
346 std::vector<string>::iterator it= find(tables_to_filter.begin(), tables_to_filter.end(), table_name);
347 if (it != tables_to_filter.end())
349 pthread_mutex_unlock(&tab_vector_lock);
352 pthread_mutex_unlock(&tab_vector_lock);
359 if (not _tab_regex.empty())
361 int32_t result= pcre_exec(tab_re, NULL, table_name.c_str(), table_name.length(), 0, 0, NULL, 0);
377 string::size_type pos= sql.find_first_of(
' ', 0);
378 string type= sql.substr(0, pos);
384 boost::to_upper(type);
386 if (type.compare(
"DROP") == 0)
394 pos= sql.find_first_of(
' ', 11);
395 string cmp_str= sql.substr(11, pos - 11);
396 string target_name(
"");
397 if (cmp_str.compare(
"IF") == 0)
400 pos= sql.find_first_of(
' ', 21);
401 target_name= sql.substr(21, pos - 21);
405 target_name= cmp_str;
412 pos= target_name.find_first_of(
'.', 0);
413 if (pos != string::npos)
418 schema_name= target_name.substr(0, pos);
422 table_name= target_name.substr(pos + 1);
426 table_name= target_name;
429 else if (type.compare(
"CREATE") == 0)
436 pos= sql.find_first_of(
' ', 13);
437 string target_name= sql.substr(13, pos - 13);
443 pos= target_name.find_first_of(
'.', 0);
444 if (pos != string::npos)
449 schema_name= target_name.substr(0, pos);
453 table_name= target_name.substr(pos + 1);
457 table_name= target_name;
469 pthread_mutex_lock(&sch_vector_lock);
470 pthread_mutex_lock(&sysvar_sch_lock);
472 schemas_to_filter.clear();
473 populateFilter(_sch_filter, schemas_to_filter);
474 pthread_mutex_unlock(&sysvar_sch_lock);
475 pthread_mutex_unlock(&sch_vector_lock);
480 pthread_mutex_lock(&tab_vector_lock);
481 pthread_mutex_lock(&sysvar_tab_lock);
483 tables_to_filter.clear();
484 populateFilter(_tab_filter, tables_to_filter);
485 pthread_mutex_unlock(&sysvar_tab_lock);
486 pthread_mutex_unlock(&tab_vector_lock);
493 const char *input= var->value->
str_value.ptr();
494 if (input && filtered_replicator)
505 const char *input= var->value->
str_value.ptr();
506 if (input && filtered_replicator)
519 filtered_replicator=
new FilteredReplicator(
"filtered_replicator",
520 vm[
"filteredschemas"].as<string>(),
521 vm[
"filteredtables"].as<string>(),
522 vm[
"schemaregex"].as<string>(),
523 vm[
"tableregex"].as<string>());
525 context.add(filtered_replicator);
527 sysvar_filtered_replicator_sch_filters,
528 filtered_schemas_validate));
530 sysvar_filtered_replicator_tab_filters,
531 filtered_tables_validate));
534 vm[
"schemaregex"].as<string>()));
536 vm[
"tableregex"].as<string>()));
543 context(
"filteredschemas",
544 po::value<string>(&sysvar_filtered_replicator_sch_filters)->default_value(
""),
545 N_(
"Comma-separated list of schemas to exclude"));
546 context(
"filteredtables",
547 po::value<string>(&sysvar_filtered_replicator_tab_filters)->default_value(
""),
548 N_(
"Comma-separated list of tables to exclude"));
549 context(
"schemaregex",
550 po::value<string>()->default_value(
""),
551 N_(
"Regular expression to apply to schemas to exclude"));
552 context(
"tableregex",
553 po::value<string>()->default_value(
""),
554 N_(
"Regular expression to apply to tables to exclude"));
559 DRIZZLE_DECLARE_PLUGIN
562 "filtered_replicator",
564 "Padraig O Sullivan",
565 N_(
"Replicates events filtered by schema or table name"),
571 DRIZZLE_DECLARE_PLUGIN_END;
bool isTableFiltered(const std::string &table_name)
void parseQuery(const std::string &sql, std::string &schema_name, std::string &table_name)
static void init_options(drizzled::module::option_context &context)
Initialize query-log command line options.
void setTableFilter(const std::string &input)
An Proxy Wrapper around boost::program_options::variables_map.
virtual ReplicationReturnCode apply(Session &in_session, const message::Transaction &to_apply)=0
drizzled::plugin::ReplicationReturnCode replicate(drizzled::plugin::TransactionApplier *in_applier, drizzled::Session &in_session, drizzled::message::Transaction &to_replicate)
void setSchemaFilter(const std::string &input)
static int init(drizzled::module::Context &context)
Add query_log plugin to Drizzle and initalize query_log system variables.
bool isSchemaFiltered(const std::string &schema_name)
void populateFilter(std::string input, std::vector< std::string > &filter)
void parseStatementTableMetadata(const drizzled::message::Statement &in_statement, std::string &in_schema_name, std::string &in_table_name) const