26 #include "rabbitmq_log.h"
27 #include <drizzled/message/transaction.pb.h>
28 #include <google/protobuf/io/coded_stream.h>
30 #include <drizzled/module/registry.h>
31 #include <drizzled/plugin.h>
33 #include "rabbitmq_handler.h"
34 #include <boost/program_options.hpp>
36 #include <drizzled/item.h>
38 namespace po= boost::program_options;
41 using namespace drizzled;
42 using namespace google;
44 namespace drizzle_plugin {
51 bool sysvar_logging_enable=
true;
52 string sysvar_rabbitmq_host;
53 string sysvar_rabbitmq_username;
54 string sysvar_rabbitmq_password;
55 string sysvar_rabbitmq_virtualhost;
56 string sysvar_rabbitmq_exchange;
57 string sysvar_rabbitmq_routingkey;
58 void updateSysvarLoggingEnable(
Session *, sql_var_t);
67 RabbitMQLog::RabbitMQLog(
const string &name,
69 plugin::TransactionApplier(name),
70 _rabbitMQHandler(mqHandler)
73 RabbitMQLog::~RabbitMQLog()
75 _rabbitMQHandler->disconnect();
76 delete _rabbitMQHandler;
79 plugin::ReplicationReturnCode
82 if(not sysvar_logging_enable)
83 return plugin::SUCCESS;
85 size_t message_byte_length= to_apply.ByteSize();
86 uint8_t* buffer=
new uint8_t[message_byte_length];
89 errmsg_printf(error::ERROR, _(
"Failed to allocate enough memory to transaction message\n"));
91 return plugin::UNKNOWN_ERROR;
94 to_apply.SerializeWithCachedSizesToArray(buffer);
97 while (!sent && tries > 0) {
101 _rabbitMQHandler->
publish(buffer,
int(message_byte_length));
106 errmsg_printf(error::ERROR,
"%s", e.what());
108 _rabbitMQHandler->reconnect();
109 }
catch(exception &e) {
110 errmsg_printf(error::ERROR, _(
"Could not reconnect, trying again.. - waiting 10 seconds for server to come back"));
117 if(sent)
return plugin::SUCCESS;
118 errmsg_printf(error::ERROR, _(
"RabbitMQ server has disappeared, failing transaction."));
120 return plugin::UNKNOWN_ERROR;
123 void RabbitMQLog::setRabbitMQHandler(
RabbitMQHandler* new_rabbitMQHandler)
125 _rabbitMQHandler= new_rabbitMQHandler;
128 static RabbitMQLog *rabbitmqLogger;
129 static RabbitMQHandler* rabbitmqHandler;
132 void updateSysvarLoggingEnable(
Session *, sql_var_t)
134 if(not sysvar_logging_enable)
136 sysvar_logging_enable =
false;
137 delete rabbitmqHandler;
141 rabbitmqHandler=
new RabbitMQHandler(sysvar_rabbitmq_host,
142 sysvar_rabbitmq_port,
143 sysvar_rabbitmq_username,
144 sysvar_rabbitmq_password,
145 sysvar_rabbitmq_virtualhost,
146 sysvar_rabbitmq_exchange,
147 sysvar_rabbitmq_routingkey);
148 if(rabbitmqHandler->rabbitmq_connection_established)
150 rabbitmqLogger->setRabbitMQHandler(rabbitmqHandler);
151 sysvar_logging_enable=
true;
155 errmsg_printf(error::ERROR, _(
"Could not open socket, is rabbitmq running?"));
156 sysvar_logging_enable=
false;
163 if(sysvar_logging_enable)
165 errmsg_printf(error::ERROR, _(
"Value of rabbitmq_host cannot be changed as rabbitmq plugin is enabled. You need to disable the plugin first."));
170 sysvar_rabbitmq_host = var->value->
str_value.data();
175 errmsg_printf(error::ERROR, _(
"rabbitmq_host cannot be NULL"));
183 if(sysvar_logging_enable)
185 errmsg_printf(error::ERROR, _(
"Value of rabbitmq_port cannot be changed as rabbitmq plugin is enabled. You need to disable the plugin first."));
190 sysvar_rabbitmq_port = var->value->
val_int();
195 errmsg_printf(error::ERROR, _(
"rabbitmq_port cannot be NULL"));
203 if(sysvar_logging_enable)
205 errmsg_printf(error::ERROR, _(
"Value of rabbitmq_username cannot be changed as rabbitmq plugin is enabled. You need to disable the plugin first."));
210 sysvar_rabbitmq_username = var->value->
str_value.data();
215 errmsg_printf(error::ERROR, _(
"rabbitmq_username cannot be NULL"));
223 if(sysvar_logging_enable)
225 errmsg_printf(error::ERROR, _(
"Value of rabbitmq_password cannot be changed as rabbitmq plugin is enabled. You need to disable the plugin first."));
230 sysvar_rabbitmq_password = var->value->
str_value.data();
235 errmsg_printf(error::ERROR, _(
"rabbitmq_password cannot be NULL"));
243 if(sysvar_logging_enable)
245 errmsg_printf(error::ERROR, _(
"Value of rabbitmq_virtualhost cannot be changed as rabbitmq plugin is enabled. You need to disable the plugin first."));
250 sysvar_rabbitmq_virtualhost = var->value->
str_value.data();
255 errmsg_printf(error::ERROR, _(
"rabbitmq_virtualhost cannot be NULL"));
263 if(sysvar_logging_enable)
265 errmsg_printf(error::ERROR, _(
"Value of rabbitmq_exchange cannot be changed as rabbitmq plugin is enabled. You need to disable the plugin first."));
270 sysvar_rabbitmq_exchange = var->value->
str_value.data();
275 errmsg_printf(error::ERROR, _(
"rabbitmq_exchange cannot be NULL"));
283 if(sysvar_logging_enable)
285 errmsg_printf(error::ERROR, _(
"Value of rabbitmq_routingkey cannot be changed as rabbitmq plugin is enabled. You need to disable the plugin first."));
290 sysvar_rabbitmq_routingkey = var->value->
str_value.data();
295 errmsg_printf(error::ERROR, _(
"rabbitmq_routingkey cannot be NULL"));
312 rabbitmqHandler=
new RabbitMQHandler(vm[
"host"].as<string>(),
313 sysvar_rabbitmq_port,
314 vm[
"username"].as<string>(),
315 vm[
"password"].as<string>(),
316 vm[
"virtualhost"].as<string>(),
317 vm[
"exchange"].as<string>(),
318 vm[
"routingkey"].as<string>());
319 if(not rabbitmqHandler->rabbitmq_connection_established)
321 throw rabbitmq_handler_exception(_(
"Could not open socket, is rabbitmq running?"));
326 errmsg_printf(error::ERROR, _(
"Failed to allocate the RabbitMQHandler. Got error: %s\n"),
332 rabbitmqLogger=
new RabbitMQLog(
"rabbitmq_applier", rabbitmqHandler);
336 errmsg_printf(error::ERROR, _(
"Failed to allocate the RabbitMQLog instance. Got error: %s\n"),
341 context.add(rabbitmqLogger);
344 context.registerVariable(
new sys_var_bool_ptr(
"logging_enable", &sysvar_logging_enable, &updateSysvarLoggingEnable));
345 context.registerVariable(
new sys_var_std_string(
"host", sysvar_rabbitmq_host, NULL, &updateSysvarRabbitMQHost));
347 context.registerVariable(
new sys_var_std_string(
"username", sysvar_rabbitmq_username, NULL, &updateSysvarRabbitMQUserName));
348 context.registerVariable(
new sys_var_std_string(
"password", sysvar_rabbitmq_password, NULL, &updateSysvarRabbitMQPassword));
349 context.registerVariable(
new sys_var_std_string(
"virtualhost", sysvar_rabbitmq_virtualhost, NULL, &updateSysvarRabbitMQVirtualHost));
350 context.registerVariable(
new sys_var_std_string(
"exchange", sysvar_rabbitmq_exchange, NULL, &updateSysvarRabbitMQExchange));
351 context.registerVariable(
new sys_var_std_string(
"routingkey", sysvar_rabbitmq_routingkey, NULL, &updateSysvarRabbitMQRoutingKey));
359 context(
"logging-enable",
360 po::value<bool>(&sysvar_logging_enable)->default_value(
true)->zero_tokens(),
361 _(
"Enable logging to rabbitmq server"));
363 po::value<string>(&sysvar_rabbitmq_host)->default_value(
"localhost"),
364 _(
"Host name to connect to"));
366 po::value<port_constraint>(&sysvar_rabbitmq_port)->default_value(5672),
367 _(
"Port to connect to"));
368 context(
"virtualhost",
369 po::value<string>(&sysvar_rabbitmq_virtualhost)->default_value(
"/"),
370 _(
"RabbitMQ virtualhost"));
372 po::value<string>(&sysvar_rabbitmq_username)->default_value(
"guest"),
373 _(
"RabbitMQ username"));
375 po::value<string>(&sysvar_rabbitmq_password)->default_value(
"guest"),
376 _(
"RabbitMQ password"));
377 context(
"use-replicator",
378 po::value<string>()->default_value(
"default_replicator"),
379 _(
"Name of the replicator plugin to use (default='default_replicator')"));
381 po::value<string>(&sysvar_rabbitmq_exchange)->default_value(
"ReplicationExchange"),
382 _(
"Name of RabbitMQ exchange to publish to"));
383 context(
"routingkey",
384 po::value<string>(&sysvar_rabbitmq_routingkey)->default_value(
"ReplicationRoutingKey"),
385 _(
"Name of RabbitMQ routing key to use"));
391 DRIZZLE_DECLARE_PLUGIN
397 N_(
"Publishes transactions to RabbitMQ"),
399 drizzle_plugin::rabbitmq::init,
401 drizzle_plugin::rabbitmq::init_options
403 DRIZZLE_DECLARE_PLUGIN_END;
virtual int64_t val_int()=0
An Proxy Wrapper around boost::program_options::variables_map.
wrapper around librabbitmq, hides error handling and reconnections etc TODO: add reconnection handlin...
drizzled::plugin::ReplicationReturnCode apply(drizzled::Session &session, const drizzled::message::Transaction &to_apply)
Serializes the transaction and uses a RabbiMQHandler to publish the message.
static void attachApplier(plugin::TransactionApplier *in_applier, const std::string &requested_replicator)
void publish(void *message, const int length)
Publishes the message to the server.