Drizzled Public API Documentation

rabbitmq_log.cc
1 /* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3  *
4  * Copyright (C) 2010 Marcus Eriksson
5  *
6  * Authors:
7  *
8  * Marcus Eriksson <krummas@gmail.com>
9  *
10  * This program is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18  * GNU General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public License
21  * along with this program; if not, write to the Free Software
22  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23  */
24 
25 #include <config.h>
26 #include "rabbitmq_log.h"
27 #include <drizzled/message/transaction.pb.h>
28 #include <google/protobuf/io/coded_stream.h>
29 #include <stdio.h>
30 #include <drizzled/module/registry.h>
31 #include <drizzled/plugin.h>
32 #include <stdint.h>
33 #include "rabbitmq_handler.h"
34 #include <boost/program_options.hpp>
36 
37 namespace po= boost::program_options;
38 
39 using namespace std;
40 using namespace drizzled;
41 using namespace google;
42 
43 namespace drizzle_plugin
44 {
45 
49 static port_constraint sysvar_rabbitmq_port;
50 
51 
52 RabbitMQLog::RabbitMQLog(const string &name,
53  RabbitMQHandler* mqHandler) :
54  plugin::TransactionApplier(name),
55  _rabbitMQHandler(mqHandler)
56 { }
57 
58 RabbitMQLog::~RabbitMQLog()
59 {
60  _rabbitMQHandler->disconnect();
61  delete _rabbitMQHandler;
62 }
63 
64 plugin::ReplicationReturnCode
66 {
67  size_t message_byte_length= to_apply.ByteSize();
68  uint8_t* buffer= new uint8_t[message_byte_length];
69  if(buffer == NULL)
70  {
71  errmsg_printf(error::ERROR, _("Failed to allocate enough memory to transaction message\n"));
72  deactivate();
73  return plugin::UNKNOWN_ERROR;
74  }
75 
76  to_apply.SerializeWithCachedSizesToArray(buffer);
77  short tries = 3;
78  bool sent = false;
79  while (!sent && tries > 0) {
80  tries--;
81  try
82  {
83  _rabbitMQHandler->publish(buffer, int(message_byte_length));
84  sent = true;
85  }
86  catch(exception& e)
87  {
88  errmsg_printf(error::ERROR, "%s", e.what());
89  try {
90  _rabbitMQHandler->reconnect();
91  } catch(exception &e) {
92  errmsg_printf(error::ERROR, _("Could not reconnect, trying again.. - waiting 10 seconds for server to come back"));
93  sleep(10);
94  } //
95  }
96  }
97 
98  delete[] buffer;
99  if(sent) return plugin::SUCCESS;
100  errmsg_printf(error::ERROR, _("RabbitMQ server has disappeared, failing transaction."));
101  deactivate();
102  return plugin::UNKNOWN_ERROR;
103 }
104 
105 static RabbitMQLog *rabbitmqLogger;
106 static RabbitMQHandler* rabbitmqHandler;
107 
108 
114 static int init(drizzled::module::Context &context)
115 {
116  const module::option_map &vm= context.getOptions();
117 
118  try
119  {
120  rabbitmqHandler= new RabbitMQHandler(vm["host"].as<string>(),
121  sysvar_rabbitmq_port,
122  vm["username"].as<string>(),
123  vm["password"].as<string>(),
124  vm["virtualhost"].as<string>(),
125  vm["exchange"].as<string>(),
126  vm["routingkey"].as<string>());
127  }
128  catch (exception& e)
129  {
130  errmsg_printf(error::ERROR, _("Failed to allocate the RabbitMQHandler. Got error: %s\n"),
131  e.what());
132  return 1;
133  }
134  try
135  {
136  rabbitmqLogger= new RabbitMQLog("rabbitmq_applier", rabbitmqHandler);
137  }
138  catch (exception& e)
139  {
140  errmsg_printf(error::ERROR, _("Failed to allocate the RabbitMQLog instance. Got error: %s\n"),
141  e.what());
142  return 1;
143  }
144 
145  context.add(rabbitmqLogger);
146  ReplicationServices::attachApplier(rabbitmqLogger, vm["use-replicator"].as<string>());
147 
148  context.registerVariable(new sys_var_const_string_val("host", vm["host"].as<string>()));
149  context.registerVariable(new sys_var_constrained_value_readonly<in_port_t>("port", sysvar_rabbitmq_port));
150  context.registerVariable(new sys_var_const_string_val("username", vm["username"].as<string>()));
151  context.registerVariable(new sys_var_const_string_val("password", vm["password"].as<string>()));
152  context.registerVariable(new sys_var_const_string_val("virtualhost", vm["virtualhost"].as<string>()));
153  context.registerVariable(new sys_var_const_string_val("exchange", vm["exchange"].as<string>()));
154  context.registerVariable(new sys_var_const_string_val("routingkey", vm["routingkey"].as<string>()));
155 
156  return 0;
157 }
158 
159 
160 static void init_options(drizzled::module::option_context &context)
161 {
162  context("host",
163  po::value<string>()->default_value("localhost"),
164  _("Host name to connect to"));
165  context("port",
166  po::value<port_constraint>(&sysvar_rabbitmq_port)->default_value(5672),
167  _("Port to connect to"));
168  context("virtualhost",
169  po::value<string>()->default_value("/"),
170  _("RabbitMQ virtualhost"));
171  context("username",
172  po::value<string>()->default_value("guest"),
173  _("RabbitMQ username"));
174  context("password",
175  po::value<string>()->default_value("guest"),
176  _("RabbitMQ password"));
177  context("use-replicator",
178  po::value<string>()->default_value("default_replicator"),
179  _("Name of the replicator plugin to use (default='default_replicator')"));
180  context("exchange",
181  po::value<string>()->default_value("ReplicationExchange"),
182  _("Name of RabbitMQ exchange to publish to"));
183  context("routingkey",
184  po::value<string>()->default_value("ReplicationRoutingKey"),
185  _("Name of RabbitMQ routing key to use"));
186 }
187 
188 } /* namespace drizzle_plugin */
189 
190 DRIZZLE_DECLARE_PLUGIN
191 {
192  DRIZZLE_VERSION_ID,
193  "rabbitmq",
194  "0.1",
195  "Marcus Eriksson",
196  N_("Publishes transactions to RabbitMQ"),
197  PLUGIN_LICENSE_GPL,
198  drizzle_plugin::init,
199  NULL,
200  drizzle_plugin::init_options
201 }
202 DRIZZLE_DECLARE_PLUGIN_END;