30 #include <drizzled/gettext.h>
32 #include "rabbitmq_handler.h"
36 namespace drizzle_plugin {
39 extern bool sysvar_logging_enable;
41 RabbitMQHandler::RabbitMQHandler(
const std::string &rabbitMQHost,
42 const in_port_t rabbitMQPort,
43 const std::string &rabbitMQUsername,
44 const std::string &rabbitMQPassword,
45 const std::string &rabbitMQVirtualhost,
46 const std::string &rabbitMQExchange,
47 const std::string &rabbitMQRoutingKey)
49 rabbitmqConnection(amqp_new_connection()),
50 hostname(rabbitMQHost),
52 username(rabbitMQUsername),
53 password(rabbitMQPassword),
54 virtualhost(rabbitMQVirtualhost),
55 exchange(rabbitMQExchange),
56 routingKey(rabbitMQRoutingKey),
57 rabbitmq_connection_established(
false)
59 pthread_mutex_init(&publishLock, NULL);
63 RabbitMQHandler::~RabbitMQHandler()
65 pthread_mutex_destroy(&publishLock);
69 void RabbitMQHandler::publish(
void *message,
77 pthread_mutex_lock(&publishLock);
82 if (amqp_basic_publish(rabbitmqConnection,
84 amqp_cstring_bytes(exchange.c_str()),
85 amqp_cstring_bytes(routingKey.c_str()),
91 pthread_mutex_unlock(&publishLock);
94 pthread_mutex_unlock(&publishLock);
104 void RabbitMQHandler::disconnect() throw(rabbitmq_handler_exception)
108 handleAMQPError(amqp_channel_close(rabbitmqConnection,
112 handleAMQPError(amqp_connection_close(rabbitmqConnection,
115 amqp_destroy_connection(rabbitmqConnection);
117 catch(exception& e) {}
121 void RabbitMQHandler::connect() throw(rabbitmq_handler_exception) {
122 sockfd = amqp_open_socket(hostname.c_str(), port);
125 rabbitmq_connection_established=
false;
130 amqp_set_sockfd(rabbitmqConnection, sockfd);
132 handleAMQPError(amqp_login(rabbitmqConnection,
137 AMQP_SASL_METHOD_PLAIN,
142 amqp_channel_open(rabbitmqConnection, 1);
143 handleAMQPError(amqp_get_rpc_reply(rabbitmqConnection),
"RPC Reply");
144 amqp_table_t empty_table = { 0, NULL };
145 amqp_exchange_declare(rabbitmqConnection, 1, amqp_cstring_bytes(exchange.c_str()), amqp_cstring_bytes(
"fanout"), 0, 0, empty_table);
146 handleAMQPError(amqp_get_rpc_reply(rabbitmqConnection),
"RPC Reply");
147 rabbitmq_connection_established=
true;
151 rabbitmq_connection_established=
false;
155 void RabbitMQHandler::handleAMQPError(amqp_rpc_reply_t x,
string context)
throw(rabbitmq_handler_exception)
157 string errorMessage(
"");
158 switch (x.reply_type) {
159 case AMQP_RESPONSE_NORMAL:
161 case AMQP_RESPONSE_NONE:
162 errorMessage.assign(
"No response in ");
163 errorMessage.append(context);
164 throw rabbitmq_handler_exception(errorMessage);
165 case AMQP_RESPONSE_LIBRARY_EXCEPTION:
166 case AMQP_RESPONSE_SERVER_EXCEPTION:
167 switch (x.reply.id) {
168 case AMQP_CONNECTION_CLOSE_METHOD:
169 errorMessage.assign(
"Connection closed in ");
170 errorMessage.append(context);
171 throw rabbitmq_handler_exception(errorMessage);
172 case AMQP_CHANNEL_CLOSE_METHOD:
173 errorMessage.assign(
"Channel closed in ");
174 errorMessage.append(context);
175 throw rabbitmq_handler_exception(errorMessage);
177 errorMessage.assign(
"Unknown error in ");
178 errorMessage.append(context);
179 throw rabbitmq_handler_exception(errorMessage);