Drizzled Public API Documentation

zeromq_log.cc
1 /* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3  *
4  * Copyright (C) 2011 Marcus Eriksson
5  *
6  * Authors:
7  *
8  * Marcus Eriksson <krummas@gmail.com>
9  *
10  * This program is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18  * GNU General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public License
21  * along with this program; if not, write to the Free Software
22  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23  */
24 
25 #include <config.h>
26 #include "zeromq_log.h"
27 #include <drizzled/message/transaction.pb.h>
28 #include <google/protobuf/io/coded_stream.h>
29 #include <stdio.h>
30 #include <drizzled/module/registry.h>
31 #include <drizzled/plugin.h>
32 #include <drizzled/item.h>
33 #include <stdint.h>
34 #include <boost/program_options.hpp>
36 #include <zmq.h>
37 
38 namespace po= boost::program_options;
39 
40 using namespace std;
41 using namespace drizzled;
42 using namespace google;
43 
44 namespace drizzle_plugin {
45 namespace zeromq {
46 
47 bool updateEndpoint(Session *, set_var* var);
48 
49 
50 ZeroMQLog::ZeroMQLog(const string &name, const string &endpoint) :
51  plugin::TransactionApplier(name),
52  sysvar_endpoint(endpoint)
53 {
54  void *context= zmq_init(1);
55  _socket= zmq_socket (context, ZMQ_PUB);
56  assert (_socket);
57  int rc= zmq_bind (_socket, endpoint.c_str());
58  assert (rc == 0);
59  pthread_mutex_init(&publishLock, NULL);
60 }
61 
62 ZeroMQLog::~ZeroMQLog()
63 {
64  zmq_close(_socket);
65  pthread_mutex_destroy(&publishLock);
66 }
67 
68 std::string& ZeroMQLog::getEndpoint()
69 {
70  return sysvar_endpoint;
71 }
72 
73 bool ZeroMQLog::setEndpoint(std::string new_endpoint)
74 {
75  void *tmp_context= zmq_init(1);
76  void *tmp_socket= zmq_socket(tmp_context, ZMQ_PUB);
77  if(!tmp_socket)
78  return false;
79  int tmp_rc= zmq_bind(tmp_socket, new_endpoint.c_str());
80  if(tmp_rc!=0)
81  return false;
82  // need a mutex around this since other threads can try to write to _socket while we are changing the endpoint
83  pthread_mutex_lock(&publishLock);
84 
85  zmq_close(_socket);
86  _socket= tmp_socket;
87  sysvar_endpoint= new_endpoint;
88 
89  //Releasing the mutex lock
90  pthread_mutex_unlock(&publishLock);
91  return true;
92 }
93 
94 plugin::ReplicationReturnCode
96 {
97  size_t message_byte_length= to_apply.ByteSize();
98  uint8_t* buffer= new uint8_t[message_byte_length];
99  if(buffer == NULL)
100  {
101  errmsg_printf(error::ERROR, _("Failed to allocate enough memory to transaction message\n"));
102  deactivate();
103  return plugin::UNKNOWN_ERROR;
104  }
105 
106  string schema= getSchemaName(to_apply);
107  zmq_msg_t schemamsg;
108  int rc= zmq_msg_init_size(&schemamsg, schema.length());
109  memcpy(zmq_msg_data(&schemamsg), schema.c_str(), schema.length());
110 
111  to_apply.SerializeWithCachedSizesToArray(buffer);
112  zmq_msg_t msg;
113  rc= zmq_msg_init_size(&msg, message_byte_length);
114  assert (rc == 0);
115  memcpy(zmq_msg_data(&msg), buffer, message_byte_length);
116 
117  // need a mutex around this since several threads can call this method at the same time
118  pthread_mutex_lock(&publishLock);
119  rc= zmq_send(_socket, &schemamsg, ZMQ_SNDMORE);
120  rc= zmq_send(_socket, &msg, 0);
121  pthread_mutex_unlock(&publishLock);
122 
123  zmq_msg_close(&msg);
124  zmq_msg_close(&schemamsg);
125  delete[] buffer;
126  return plugin::SUCCESS;
127 }
128 
129 string ZeroMQLog::getSchemaName(const message::Transaction &txn) {
130  if(txn.statement_size() == 0) return "";
131 
132  const message::Statement &statement= txn.statement(0);
133 
134  switch(statement.type())
135  {
136  case message::Statement::INSERT:
137  return statement.insert_header().table_metadata().schema_name();
138  case message::Statement::UPDATE:
139  return statement.update_header().table_metadata().schema_name();
140  case message::Statement::DELETE:
141  return statement.delete_header().table_metadata().schema_name();
142  case message::Statement::CREATE_TABLE:
143  return statement.create_table_statement().table().schema();
144  case message::Statement::TRUNCATE_TABLE:
145  return statement.truncate_table_statement().table_metadata().schema_name();
146  case message::Statement::DROP_TABLE:
147  return statement.drop_table_statement().table_metadata().schema_name();
148  case message::Statement::CREATE_SCHEMA:
149  return statement.create_schema_statement().schema().name();
150  case message::Statement::DROP_SCHEMA:
151  return statement.drop_schema_statement().schema_name();
152  default:
153  return "";
154  }
155 }
156 
157 static ZeroMQLog *zeromqLogger;
158 
164 bool updateEndpoint(Session *, set_var* var)
165 {
166  if (not var->value->str_value.empty())
167  {
168  std::string new_endpoint(var->value->str_value.data());
169  if (zeromqLogger->setEndpoint(new_endpoint))
170  return false; //success
171  else
172  return true; // error
173  }
174  errmsg_printf(error::ERROR, _("zeromq_endpoint cannot be NULL"));
175  return true; // error
176 }
177 
181 static int init(drizzled::module::Context &context)
182 {
183  const module::option_map &vm= context.getOptions();
184  zeromqLogger= new ZeroMQLog("zeromq_applier", vm["endpoint"].as<string>());
185  context.add(zeromqLogger);
186  ReplicationServices::attachApplier(zeromqLogger, vm["use-replicator"].as<string>());
187  context.registerVariable(new sys_var_std_string("endpoint", zeromqLogger->getEndpoint(), NULL, &updateEndpoint));
188  return 0;
189 }
190 
191 
192 static void init_options(drizzled::module::option_context &context)
193 {
194  context("endpoint",
195  po::value<string>()->default_value("tcp://*:9999"),
196  _("End point to bind to"));
197  context("use-replicator",
198  po::value<string>()->default_value("default_replicator"),
199  _("Name of the replicator plugin to use (default='default_replicator')"));
200 
201 }
202 
203 } /* namespace zeromq */
204 } /* namespace drizzle_plugin */
205 
206 DRIZZLE_DECLARE_PLUGIN
207 {
208  DRIZZLE_VERSION_ID,
209  "zeromq",
210  "0.1",
211  "Marcus Eriksson",
212  N_("Publishes transactions to ZeroMQ"),
213  PLUGIN_LICENSE_GPL,
214  drizzle_plugin::zeromq::init,
215  NULL,
216  drizzle_plugin::zeromq::init_options,
217 }
218 DRIZZLE_DECLARE_PLUGIN_END;