30 #include <drizzled/gettext.h>
32 #include "rabbitmq_handler.h"
36 namespace drizzle_plugin
39 RabbitMQHandler::RabbitMQHandler(
const std::string &rabbitMQHost,
40 const in_port_t rabbitMQPort,
41 const std::string &rabbitMQUsername,
42 const std::string &rabbitMQPassword,
43 const std::string &rabbitMQVirtualhost,
44 const std::string &rabbitMQExchange,
45 const std::string &rabbitMQRoutingKey)
47 rabbitmqConnection(amqp_new_connection()),
48 hostname(rabbitMQHost),
50 username(rabbitMQUsername),
51 password(rabbitMQPassword),
52 virtualhost(rabbitMQVirtualhost),
53 exchange(rabbitMQExchange),
54 routingKey(rabbitMQRoutingKey)
56 pthread_mutex_init(&publishLock, NULL);
60 RabbitMQHandler::~RabbitMQHandler()
62 pthread_mutex_destroy(&publishLock);
66 void RabbitMQHandler::publish(
void *message,
70 pthread_mutex_lock(&publishLock);
75 if (amqp_basic_publish(rabbitmqConnection,
77 amqp_cstring_bytes(exchange.c_str()),
78 amqp_cstring_bytes(routingKey.c_str()),
84 pthread_mutex_unlock(&publishLock);
87 pthread_mutex_unlock(&publishLock);
97 void RabbitMQHandler::disconnect() throw(rabbitmq_handler_exception)
101 handleAMQPError(amqp_channel_close(rabbitmqConnection,
105 handleAMQPError(amqp_connection_close(rabbitmqConnection,
108 amqp_destroy_connection(rabbitmqConnection);
110 catch(exception& e) {}
114 void RabbitMQHandler::connect() throw(rabbitmq_handler_exception) {
115 sockfd = amqp_open_socket(hostname.c_str(), port);
118 throw rabbitmq_handler_exception(_(
"Could not open socket, is rabbitmq running?"));
120 amqp_set_sockfd(rabbitmqConnection, sockfd);
122 handleAMQPError(amqp_login(rabbitmqConnection,
127 AMQP_SASL_METHOD_PLAIN,
132 amqp_channel_open(rabbitmqConnection, 1);
133 handleAMQPError(amqp_get_rpc_reply(rabbitmqConnection),
"RPC Reply");
134 amqp_table_t empty_table = { 0, NULL };
135 amqp_exchange_declare(rabbitmqConnection, 1, amqp_cstring_bytes(exchange.c_str()), amqp_cstring_bytes(
"fanout"), 0, 0, empty_table);
136 handleAMQPError(amqp_get_rpc_reply(rabbitmqConnection),
"RPC Reply");
139 void RabbitMQHandler::handleAMQPError(amqp_rpc_reply_t x,
string context)
throw(rabbitmq_handler_exception)
141 string errorMessage(
"");
142 switch (x.reply_type) {
143 case AMQP_RESPONSE_NORMAL:
145 case AMQP_RESPONSE_NONE:
146 errorMessage.assign(
"No response in ");
147 errorMessage.append(context);
148 throw rabbitmq_handler_exception(errorMessage);
149 case AMQP_RESPONSE_LIBRARY_EXCEPTION:
150 case AMQP_RESPONSE_SERVER_EXCEPTION:
151 switch (x.reply.id) {
152 case AMQP_CONNECTION_CLOSE_METHOD:
153 errorMessage.assign(
"Connection closed in ");
154 errorMessage.append(context);
155 throw rabbitmq_handler_exception(errorMessage);
156 case AMQP_CHANNEL_CLOSE_METHOD:
157 errorMessage.assign(
"Channel closed in ");
158 errorMessage.append(context);
159 throw rabbitmq_handler_exception(errorMessage);
161 errorMessage.assign(
"Unknown error in ");
162 errorMessage.append(context);
163 throw rabbitmq_handler_exception(errorMessage);