Drizzled Public API Documentation

filtered_replicator.cc
Go to the documentation of this file.
1 /* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3  *
4  * Copyright (C) 2009 Sun Microsystems, Inc.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19  */
20 
39 #include <config.h>
40 #include <drizzled/gettext.h>
41 #include <drizzled/plugin/transaction_applier.h>
42 #include <drizzled/message/transaction.pb.h>
43 #include <drizzled/plugin.h>
44 
45 #include <drizzled/item/string.h>
46 #include "filtered_replicator.h"
47 #include <boost/program_options.hpp>
49 #include <vector>
50 #include <string>
51 namespace po= boost::program_options;
52 using namespace std;
53 using namespace drizzled;
54 
55 namespace drizzle_plugin
56 {
57 
58 static string sysvar_filtered_replicator_sch_filters;
59 static string sysvar_filtered_replicator_tab_filters;
60 
61 FilteredReplicator::FilteredReplicator(string name_arg,
62  const std::string &sch_filter,
63  const std::string &tab_filter,
64  const std::string &sch_regex,
65  const std::string &tab_regex) :
66  plugin::TransactionReplicator(name_arg),
67  schemas_to_filter(),
68  tables_to_filter(),
69  _sch_filter(sch_filter),
70  _tab_filter(tab_filter),
71  _sch_regex(sch_regex),
72  _tab_regex(tab_regex),
73  sch_re(NULL),
74  tab_re(NULL)
75 {
76  /*
77  * Add each of the specified schemas to the vector of schemas
78  * to filter.
79  */
80  if (not _sch_filter.empty())
81  {
82  populateFilter(_sch_filter, schemas_to_filter);
83  }
84 
85  /*
86  * Add each of the specified tables to the vector of tables
87  * to filter.
88  */
89  if (not _tab_filter.empty())
90  {
91  populateFilter(_tab_filter, tables_to_filter);
92  }
93 
94  /*
95  * Compile the regular expression for schema's to filter
96  * if one is specified.
97  */
98  if (not _sch_regex.empty())
99  {
100  const char *error= NULL;
101  int32_t error_offset= 0;
102  sch_re= pcre_compile(_sch_regex.c_str(),
103  0,
104  &error,
105  &error_offset,
106  NULL);
107  }
108 
109  /*
110  * Compile the regular expression for table's to filter
111  * if one is specified.
112  */
113  if (not _tab_regex.empty())
114  {
115  const char *error= NULL;
116  int32_t error_offset= 0;
117  tab_re= pcre_compile(_tab_regex.c_str(),
118  0,
119  &error,
120  &error_offset,
121  NULL);
122  }
123 
124  pthread_mutex_init(&sch_vector_lock, NULL);
125  pthread_mutex_init(&tab_vector_lock, NULL);
126  pthread_mutex_init(&sysvar_sch_lock, NULL);
127  pthread_mutex_init(&sysvar_tab_lock, NULL);
128 }
129 
131 {
132  if (sch_re)
133  {
134  pcre_free(sch_re);
135  }
136  if (tab_re)
137  {
138  pcre_free(tab_re);
139  }
140 
141  pthread_mutex_destroy(&sch_vector_lock);
142  pthread_mutex_destroy(&tab_vector_lock);
143  pthread_mutex_destroy(&sysvar_sch_lock);
144  pthread_mutex_destroy(&sysvar_tab_lock);
145 
146 }
147 
149  string &in_schema_name,
150  string &in_table_name) const
151 {
152  switch (in_statement.type())
153  {
154  case message::Statement::INSERT:
155  {
156  const message::TableMetadata &metadata= in_statement.insert_header().table_metadata();
157  in_schema_name= metadata.schema_name();
158  in_table_name= metadata.table_name();
159  break;
160  }
161  case message::Statement::UPDATE:
162  {
163  const message::TableMetadata &metadata= in_statement.update_header().table_metadata();
164  in_schema_name= metadata.schema_name();
165  in_table_name= metadata.table_name();
166  break;
167  }
168  case message::Statement::DELETE:
169  {
170  const message::TableMetadata &metadata= in_statement.delete_header().table_metadata();
171  in_schema_name= metadata.schema_name();
172  in_table_name= metadata.table_name();
173  break;
174  }
175  case message::Statement::CREATE_SCHEMA:
176  {
177  in_schema_name= in_statement.create_schema_statement().schema().name();
178  in_table_name.clear();
179  break;
180  }
181  case message::Statement::ALTER_SCHEMA:
182  {
183  in_schema_name= in_statement.alter_schema_statement().after().name();
184  in_table_name.clear();
185  break;
186  }
187  case message::Statement::DROP_SCHEMA:
188  {
189  in_schema_name= in_statement.drop_schema_statement().schema_name();
190  in_table_name.clear();
191  break;
192  }
193  case message::Statement::CREATE_TABLE:
194  {
195  in_schema_name= in_statement.create_table_statement().table().schema();
196  in_table_name= in_statement.create_table_statement().table().name();
197  break;
198  }
199  case message::Statement::ALTER_TABLE:
200  {
201  in_schema_name= in_statement.alter_table_statement().after().schema();
202  in_table_name= in_statement.alter_table_statement().after().name();
203  break;
204  }
205  case message::Statement::DROP_TABLE:
206  {
207  const message::TableMetadata &metadata= in_statement.drop_table_statement().table_metadata();
208  in_schema_name= metadata.schema_name();
209  in_table_name= metadata.table_name();
210  break;
211  }
212  default:
213  {
214  /* All other types have no schema and table information */
215  in_schema_name.clear();
216  in_table_name.clear();
217  break;
218  }
219  }
220 }
221 
222 plugin::ReplicationReturnCode
224  Session &in_session,
225  message::Transaction &to_replicate)
226 {
227  string schema_name;
228  string table_name;
229 
230  size_t num_statements= to_replicate.statement_size();
231 
232  /*
233  * We build a new transaction message containing only Statement
234  * messages that have not been filtered.
235  *
236  * @todo A more efficient method would be to rework the pointers
237  * that the to_replicate.statement() vector contains and remove
238  * the statement pointers that are filtered...
239  */
240  message::Transaction filtered_transaction;
241 
242  for (size_t x= 0; x < num_statements; ++x)
243  {
244  schema_name.clear();
245  table_name.clear();
246 
247  const message::Statement &statement= to_replicate.statement(x);
248 
249  /*
250  * First, we check to see if the command consists of raw SQL. If so,
251  * we need to parse this SQL and determine whether to filter the event
252  * based on the information we obtain from the parsed SQL.
253  * If not raw SQL, check if this event should be filtered or not
254  * based on the schema and table names in the command message.
255  *
256  * The schema and table names are stored in TableMetadata headers
257  * for most types of Statement messages.
258  */
259  if (statement.type() == message::Statement::RAW_SQL)
260  {
261  parseQuery(statement.sql(), schema_name, table_name);
262  }
263  else
264  {
265  parseStatementTableMetadata(statement, schema_name, table_name);
266  }
267 
268  /*
269  * Convert the schema name and table name strings to lowercase so that it
270  * does not matter what case the table or schema name was specified in. We
271  * also keep all entries in the vectors of schemas and tables to filter in
272  * lowercase.
273  */
274  boost::to_lower(schema_name);
275  boost::to_lower(table_name);
276 
277  if (! isSchemaFiltered(schema_name) && ! isTableFiltered(table_name))
278  {
279  message::Statement *s= filtered_transaction.add_statement();
280  *s= statement; /* copy contruct */
281  }
282  }
283 
284  if (filtered_transaction.statement_size() > 0)
285  {
286 
287  /*
288  * We can now simply call the applier's apply() method, passing
289  * along the supplied command.
290  */
291  message::TransactionContext *tc= filtered_transaction.mutable_transaction_context();
292  *tc= to_replicate.transaction_context(); /* copy construct */
293  return in_applier->apply(in_session, filtered_transaction);
294  }
295  return plugin::SUCCESS;
296 }
297 
298 void FilteredReplicator::populateFilter(std::string input,
299  std::vector<string> &filter)
300 {
301  /*
302  * Convert the input string to lowercase so that all entries in the vector
303  * will be in lowercase.
304  */
305  boost::to_lower(input);
306  string::size_type last_pos= input.find_first_not_of(',', 0);
307  string::size_type pos= input.find_first_of(',', last_pos);
308 
309  while (pos != string::npos || last_pos != string::npos)
310  {
311  filter.push_back(input.substr(last_pos, pos - last_pos));
312  last_pos= input.find_first_not_of(',', pos);
313  pos= input.find_first_of(',', last_pos);
314  }
315 }
316 
317 bool FilteredReplicator::isSchemaFiltered(const string &schema_name)
318 {
319  pthread_mutex_lock(&sch_vector_lock);
320  std::vector<string>::iterator it= find(schemas_to_filter.begin(), schemas_to_filter.end(), schema_name);
321  if (it != schemas_to_filter.end())
322  {
323  pthread_mutex_unlock(&sch_vector_lock);
324  return true;
325  }
326  pthread_mutex_unlock(&sch_vector_lock);
327 
328  /*
329  * If regular expression matching is enabled for schemas to filter, then
330  * we check to see if this schema name matches the regular expression that
331  * has been specified.
332  */
333  if (not _sch_regex.empty())
334  {
335  int32_t result= pcre_exec(sch_re, NULL, schema_name.c_str(), schema_name.length(), 0, 0, NULL, 0);
336  if (result >= 0)
337  return true;
338  }
339 
340  return false;
341 }
342 
343 bool FilteredReplicator::isTableFiltered(const string &table_name)
344 {
345  pthread_mutex_lock(&tab_vector_lock);
346  std::vector<string>::iterator it= find(tables_to_filter.begin(), tables_to_filter.end(), table_name);
347  if (it != tables_to_filter.end())
348  {
349  pthread_mutex_unlock(&tab_vector_lock);
350  return true;
351  }
352  pthread_mutex_unlock(&tab_vector_lock);
353 
354  /*
355  * If regular expression matching is enabled for tables to filter, then
356  * we check to see if this table name matches the regular expression that
357  * has been specified.
358  */
359  if (not _tab_regex.empty())
360  {
361  int32_t result= pcre_exec(tab_re, NULL, table_name.c_str(), table_name.length(), 0, 0, NULL, 0);
362  if (result >= 0)
363  return true;
364  }
365 
366  return false;
367 }
368 
369 void FilteredReplicator::parseQuery(const string &sql,
370  string &schema_name,
371  string &table_name)
372 {
373  /*
374  * Determine what type of SQL we are dealing with e.g. create table,
375  * drop table, etc.
376  */
377  string::size_type pos= sql.find_first_of(' ', 0);
378  string type= sql.substr(0, pos);
379 
380  /*
381  * Convert the type string to uppercase here so that it doesn't
382  * matter what case the user entered the statement in.
383  */
384  boost::to_upper(type);
385 
386  if (type.compare("DROP") == 0)
387  {
388  /*
389  * The schema and table name can be either the third word
390  * or the fifth word in a DROP TABLE statement...so we extract
391  * the third word from the SQL and see whether it is and IF or
392  * not.
393  */
394  pos= sql.find_first_of(' ', 11);
395  string cmp_str= sql.substr(11, pos - 11);
396  string target_name("");
397  if (cmp_str.compare("IF") == 0)
398  {
399  /* the name must be the fifth word */
400  pos= sql.find_first_of(' ', 21);
401  target_name= sql.substr(21, pos - 21);
402  }
403  else
404  {
405  target_name= cmp_str;
406  }
407  /*
408  * Determine whether the name is a concatenation of the schema
409  * name and table name i.e. schema.table or just the table name
410  * on its own.
411  */
412  pos= target_name.find_first_of('.', 0);
413  if (pos != string::npos)
414  {
415  /*
416  * There is a schema name here...
417  */
418  schema_name= target_name.substr(0, pos);
419  /*
420  * The rest of the name string is the table name.
421  */
422  table_name= target_name.substr(pos + 1);
423  }
424  else
425  {
426  table_name= target_name;
427  }
428  }
429  else if (type.compare("CREATE") == 0)
430  {
431  /*
432  * The schema and table name are always the third word
433  * in a CREATE TABLE statement...always (unless there is
434  * some crazy syntax I am unaware of).
435  */
436  pos= sql.find_first_of(' ', 13);
437  string target_name= sql.substr(13, pos - 13);
438  /*
439  * Determine whether the name is a concatenation of the schema
440  * name and table name i.e. schema.table or just the table name
441  * on its own.
442  */
443  pos= target_name.find_first_of('.', 0);
444  if (pos != string::npos)
445  {
446  /*
447  * There is a schema name here...
448  */
449  schema_name= target_name.substr(0, pos);
450  /*
451  * The rest of the name string is the table name.
452  */
453  table_name= target_name.substr(pos + 1);
454  }
455  else
456  {
457  table_name= target_name;
458  }
459  }
460  else
461  {
462  /* we only deal with DROP and CREATE table for the moment */
463  return;
464  }
465 }
466 
467 void FilteredReplicator::setSchemaFilter(const string &input)
468 {
469  pthread_mutex_lock(&sch_vector_lock);
470  pthread_mutex_lock(&sysvar_sch_lock);
471  _sch_filter= input;
472  schemas_to_filter.clear();
473  populateFilter(_sch_filter, schemas_to_filter);
474  pthread_mutex_unlock(&sysvar_sch_lock);
475  pthread_mutex_unlock(&sch_vector_lock);
476 }
477 
478 void FilteredReplicator::setTableFilter(const string &input)
479 {
480  pthread_mutex_lock(&tab_vector_lock);
481  pthread_mutex_lock(&sysvar_tab_lock);
482  _tab_filter= input;
483  tables_to_filter.clear();
484  populateFilter(_tab_filter, tables_to_filter);
485  pthread_mutex_unlock(&sysvar_tab_lock);
486  pthread_mutex_unlock(&tab_vector_lock);
487 }
488 
489 static FilteredReplicator *filtered_replicator= NULL; /* The singleton replicator */
490 
491 static int filtered_schemas_validate(Session*, set_var *var)
492 {
493  const char *input= var->value->str_value.ptr();
494  if (input && filtered_replicator)
495  {
496  filtered_replicator->setSchemaFilter(input);
497  return 0;
498  }
499  return 1;
500 }
501 
502 
503 static int filtered_tables_validate(Session*, set_var *var)
504 {
505  const char *input= var->value->str_value.ptr();
506  if (input && filtered_replicator)
507  {
508  filtered_replicator->setTableFilter(input);
509  return 0;
510  }
511  return 1;
512 }
513 
514 
515 static int init(module::Context &context)
516 {
517  const module::option_map &vm= context.getOptions();
518 
519  filtered_replicator= new FilteredReplicator("filtered_replicator",
520  vm["filteredschemas"].as<string>(),
521  vm["filteredtables"].as<string>(),
522  vm["schemaregex"].as<string>(),
523  vm["tableregex"].as<string>());
524 
525  context.add(filtered_replicator);
526  context.registerVariable(new sys_var_std_string("filteredschemas",
527  sysvar_filtered_replicator_sch_filters,
528  filtered_schemas_validate));
529  context.registerVariable(new sys_var_std_string("filteredtables",
530  sysvar_filtered_replicator_tab_filters,
531  filtered_tables_validate));
532 
533  context.registerVariable(new sys_var_const_string_val("schemaregex",
534  vm["schemaregex"].as<string>()));
535  context.registerVariable(new sys_var_const_string_val("tableregex",
536  vm["tableregex"].as<string>()));
537 
538  return 0;
539 }
540 
541 static void init_options(drizzled::module::option_context &context)
542 {
543  context("filteredschemas",
544  po::value<string>(&sysvar_filtered_replicator_sch_filters)->default_value(""),
545  N_("Comma-separated list of schemas to exclude"));
546  context("filteredtables",
547  po::value<string>(&sysvar_filtered_replicator_tab_filters)->default_value(""),
548  N_("Comma-separated list of tables to exclude"));
549  context("schemaregex",
550  po::value<string>()->default_value(""),
551  N_("Regular expression to apply to schemas to exclude"));
552  context("tableregex",
553  po::value<string>()->default_value(""),
554  N_("Regular expression to apply to tables to exclude"));
555 }
556 
557 } /* namespace drizzle_plugin */
558 
559 DRIZZLE_DECLARE_PLUGIN
560 {
561  DRIZZLE_VERSION_ID,
562  "filtered_replicator",
563  "0.2",
564  "Padraig O Sullivan",
565  N_("Replicates events filtered by schema or table name"),
566  PLUGIN_LICENSE_GPL,
568  NULL,
570 }
571 DRIZZLE_DECLARE_PLUGIN_END;
bool isTableFiltered(const std::string &table_name)
void parseQuery(const std::string &sql, std::string &schema_name, std::string &table_name)
static void init_options(drizzled::module::option_context &context)
Initialize query-log command line options.
Definition: module.cc:157
void setTableFilter(const std::string &input)
An Proxy Wrapper around boost::program_options::variables_map.
virtual ReplicationReturnCode apply(Session &in_session, const message::Transaction &to_apply)=0
drizzled::plugin::ReplicationReturnCode replicate(drizzled::plugin::TransactionApplier *in_applier, drizzled::Session &in_session, drizzled::message::Transaction &to_replicate)
void setSchemaFilter(const std::string &input)
static int init(drizzled::module::Context &context)
Add query_log plugin to Drizzle and initalize query_log system variables.
Definition: module.cc:228
bool isSchemaFiltered(const std::string &schema_name)
void populateFilter(std::string input, std::vector< std::string > &filter)
void parseStatementTableMetadata(const drizzled::message::Statement &in_statement, std::string &in_schema_name, std::string &in_table_name) const
String str_value
Definition: item.h:107