001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.network.jms; 018 019 import javax.jms.Connection; 020 import javax.jms.Destination; 021 import javax.jms.ExceptionListener; 022 import javax.jms.JMSException; 023 import javax.jms.Queue; 024 import javax.jms.QueueConnection; 025 import javax.jms.QueueConnectionFactory; 026 import javax.jms.QueueSession; 027 import javax.jms.Session; 028 import javax.naming.NamingException; 029 030 import org.slf4j.Logger; 031 import org.slf4j.LoggerFactory; 032 033 /** 034 * A Bridge to other JMS Queue providers 035 * 036 * @org.apache.xbean.XBean 037 */ 038 public class JmsQueueConnector extends JmsConnector { 039 private static final Logger LOG = LoggerFactory.getLogger(JmsQueueConnector.class); 040 private String outboundQueueConnectionFactoryName; 041 private String localConnectionFactoryName; 042 private QueueConnectionFactory outboundQueueConnectionFactory; 043 private QueueConnectionFactory localQueueConnectionFactory; 044 private InboundQueueBridge[] inboundQueueBridges; 045 private OutboundQueueBridge[] outboundQueueBridges; 046 047 /** 048 * @return Returns the inboundQueueBridges. 049 */ 050 public InboundQueueBridge[] getInboundQueueBridges() { 051 return inboundQueueBridges; 052 } 053 054 /** 055 * @param inboundQueueBridges The inboundQueueBridges to set. 056 */ 057 public void setInboundQueueBridges(InboundQueueBridge[] inboundQueueBridges) { 058 this.inboundQueueBridges = inboundQueueBridges; 059 } 060 061 /** 062 * @return Returns the outboundQueueBridges. 063 */ 064 public OutboundQueueBridge[] getOutboundQueueBridges() { 065 return outboundQueueBridges; 066 } 067 068 /** 069 * @param outboundQueueBridges The outboundQueueBridges to set. 070 */ 071 public void setOutboundQueueBridges(OutboundQueueBridge[] outboundQueueBridges) { 072 this.outboundQueueBridges = outboundQueueBridges; 073 } 074 075 /** 076 * @return Returns the localQueueConnectionFactory. 077 */ 078 public QueueConnectionFactory getLocalQueueConnectionFactory() { 079 return localQueueConnectionFactory; 080 } 081 082 /** 083 * @param localQueueConnectionFactory The localQueueConnectionFactory to 084 * set. 085 */ 086 public void setLocalQueueConnectionFactory(QueueConnectionFactory localConnectionFactory) { 087 this.localQueueConnectionFactory = localConnectionFactory; 088 } 089 090 /** 091 * @return Returns the outboundQueueConnectionFactory. 092 */ 093 public QueueConnectionFactory getOutboundQueueConnectionFactory() { 094 return outboundQueueConnectionFactory; 095 } 096 097 /** 098 * @return Returns the outboundQueueConnectionFactoryName. 099 */ 100 public String getOutboundQueueConnectionFactoryName() { 101 return outboundQueueConnectionFactoryName; 102 } 103 104 /** 105 * @param outboundQueueConnectionFactoryName The 106 * outboundQueueConnectionFactoryName to set. 107 */ 108 public void setOutboundQueueConnectionFactoryName(String foreignQueueConnectionFactoryName) { 109 this.outboundQueueConnectionFactoryName = foreignQueueConnectionFactoryName; 110 } 111 112 /** 113 * @return Returns the localConnectionFactoryName. 114 */ 115 public String getLocalConnectionFactoryName() { 116 return localConnectionFactoryName; 117 } 118 119 /** 120 * @param localConnectionFactoryName The localConnectionFactoryName to set. 121 */ 122 public void setLocalConnectionFactoryName(String localConnectionFactoryName) { 123 this.localConnectionFactoryName = localConnectionFactoryName; 124 } 125 126 /** 127 * @return Returns the localQueueConnection. 128 */ 129 public QueueConnection getLocalQueueConnection() { 130 return (QueueConnection) localConnection.get(); 131 } 132 133 /** 134 * @param localQueueConnection The localQueueConnection to set. 135 */ 136 public void setLocalQueueConnection(QueueConnection localQueueConnection) { 137 this.localConnection.set(localQueueConnection); 138 } 139 140 /** 141 * @return Returns the outboundQueueConnection. 142 */ 143 public QueueConnection getOutboundQueueConnection() { 144 return (QueueConnection) foreignConnection.get(); 145 } 146 147 /** 148 * @param outboundQueueConnection The outboundQueueConnection to set. 149 */ 150 public void setOutboundQueueConnection(QueueConnection foreignQueueConnection) { 151 this.foreignConnection.set(foreignQueueConnection); 152 } 153 154 /** 155 * @param outboundQueueConnectionFactory The outboundQueueConnectionFactory 156 * to set. 157 */ 158 public void setOutboundQueueConnectionFactory(QueueConnectionFactory foreignQueueConnectionFactory) { 159 this.outboundQueueConnectionFactory = foreignQueueConnectionFactory; 160 } 161 162 @Override 163 protected void initializeForeignConnection() throws NamingException, JMSException { 164 165 final QueueConnection newConnection; 166 167 if (foreignConnection.get() == null) { 168 // get the connection factories 169 if (outboundQueueConnectionFactory == null) { 170 // look it up from JNDI 171 if (outboundQueueConnectionFactoryName != null) { 172 outboundQueueConnectionFactory = (QueueConnectionFactory)jndiOutboundTemplate 173 .lookup(outboundQueueConnectionFactoryName, QueueConnectionFactory.class); 174 if (outboundUsername != null) { 175 newConnection = outboundQueueConnectionFactory 176 .createQueueConnection(outboundUsername, outboundPassword); 177 } else { 178 newConnection = outboundQueueConnectionFactory.createQueueConnection(); 179 } 180 } else { 181 throw new JMSException("Cannot create foreignConnection - no information"); 182 } 183 } else { 184 if (outboundUsername != null) { 185 newConnection = outboundQueueConnectionFactory 186 .createQueueConnection(outboundUsername, outboundPassword); 187 } else { 188 newConnection = outboundQueueConnectionFactory.createQueueConnection(); 189 } 190 } 191 } else { 192 // Clear if for now in case something goes wrong during the init. 193 newConnection = (QueueConnection) foreignConnection.getAndSet(null); 194 } 195 196 if (outboundClientId != null && outboundClientId.length() > 0) { 197 newConnection.setClientID(getOutboundClientId()); 198 } 199 newConnection.start(); 200 201 outboundMessageConvertor.setConnection(newConnection); 202 203 // Configure the bridges with the new Outbound connection. 204 initializeInboundDestinationBridgesOutboundSide(newConnection); 205 initializeOutboundDestinationBridgesOutboundSide(newConnection); 206 207 // Register for any async error notifications now so we can reset in the 208 // case where there's not a lot of activity and a connection drops. 209 newConnection.setExceptionListener(new ExceptionListener() { 210 @Override 211 public void onException(JMSException exception) { 212 handleConnectionFailure(newConnection); 213 } 214 }); 215 216 // At this point all looks good, so this our current connection now. 217 foreignConnection.set(newConnection); 218 } 219 220 @Override 221 protected void initializeLocalConnection() throws NamingException, JMSException { 222 223 final QueueConnection newConnection; 224 225 if (localConnection.get() == null) { 226 // get the connection factories 227 if (localQueueConnectionFactory == null) { 228 if (embeddedConnectionFactory == null) { 229 // look it up from JNDI 230 if (localConnectionFactoryName != null) { 231 localQueueConnectionFactory = (QueueConnectionFactory)jndiLocalTemplate 232 .lookup(localConnectionFactoryName, QueueConnectionFactory.class); 233 if (localUsername != null) { 234 newConnection = localQueueConnectionFactory 235 .createQueueConnection(localUsername, localPassword); 236 } else { 237 newConnection = localQueueConnectionFactory.createQueueConnection(); 238 } 239 } else { 240 throw new JMSException("Cannot create localConnection - no information"); 241 } 242 } else { 243 newConnection = embeddedConnectionFactory.createQueueConnection(); 244 } 245 } else { 246 if (localUsername != null) { 247 newConnection = localQueueConnectionFactory. 248 createQueueConnection(localUsername, localPassword); 249 } else { 250 newConnection = localQueueConnectionFactory.createQueueConnection(); 251 } 252 } 253 254 } else { 255 // Clear if for now in case something goes wrong during the init. 256 newConnection = (QueueConnection) localConnection.getAndSet(null); 257 } 258 259 if (localClientId != null && localClientId.length() > 0) { 260 newConnection.setClientID(getLocalClientId()); 261 } 262 newConnection.start(); 263 264 inboundMessageConvertor.setConnection(newConnection); 265 266 // Configure the bridges with the new Local connection. 267 initializeInboundDestinationBridgesLocalSide(newConnection); 268 initializeOutboundDestinationBridgesLocalSide(newConnection); 269 270 // Register for any async error notifications now so we can reset in the 271 // case where there's not a lot of activity and a connection drops. 272 newConnection.setExceptionListener(new ExceptionListener() { 273 @Override 274 public void onException(JMSException exception) { 275 handleConnectionFailure(newConnection); 276 } 277 }); 278 279 // At this point all looks good, so this our current connection now. 280 localConnection.set(newConnection); 281 } 282 283 protected void initializeInboundDestinationBridgesOutboundSide(QueueConnection connection) throws JMSException { 284 if (inboundQueueBridges != null) { 285 QueueSession outboundSession = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 286 287 for (InboundQueueBridge bridge : inboundQueueBridges) { 288 String queueName = bridge.getInboundQueueName(); 289 Queue foreignQueue = createForeignQueue(outboundSession, queueName); 290 bridge.setConsumer(null); 291 bridge.setConsumerQueue(foreignQueue); 292 bridge.setConsumerConnection(connection); 293 bridge.setJmsConnector(this); 294 addInboundBridge(bridge); 295 } 296 outboundSession.close(); 297 } 298 } 299 300 protected void initializeInboundDestinationBridgesLocalSide(QueueConnection connection) throws JMSException { 301 if (inboundQueueBridges != null) { 302 QueueSession localSession = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); 303 304 for (InboundQueueBridge bridge : inboundQueueBridges) { 305 String localQueueName = bridge.getLocalQueueName(); 306 Queue activemqQueue = createActiveMQQueue(localSession, localQueueName); 307 bridge.setProducerQueue(activemqQueue); 308 bridge.setProducerConnection(connection); 309 if (bridge.getJmsMessageConvertor() == null) { 310 bridge.setJmsMessageConvertor(getInboundMessageConvertor()); 311 } 312 bridge.setJmsConnector(this); 313 addInboundBridge(bridge); 314 } 315 localSession.close(); 316 } 317 } 318 319 protected void initializeOutboundDestinationBridgesOutboundSide(QueueConnection connection) throws JMSException { 320 if (outboundQueueBridges != null) { 321 QueueSession outboundSession = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 322 323 for (OutboundQueueBridge bridge : outboundQueueBridges) { 324 String queueName = bridge.getOutboundQueueName(); 325 Queue foreignQueue = createForeignQueue(outboundSession, queueName); 326 bridge.setProducerQueue(foreignQueue); 327 bridge.setProducerConnection(connection); 328 if (bridge.getJmsMessageConvertor() == null) { 329 bridge.setJmsMessageConvertor(getOutboundMessageConvertor()); 330 } 331 bridge.setJmsConnector(this); 332 addOutboundBridge(bridge); 333 } 334 outboundSession.close(); 335 } 336 } 337 338 protected void initializeOutboundDestinationBridgesLocalSide(QueueConnection connection) throws JMSException { 339 if (outboundQueueBridges != null) { 340 QueueSession localSession = 341 connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 342 343 for (OutboundQueueBridge bridge : outboundQueueBridges) { 344 String localQueueName = bridge.getLocalQueueName(); 345 Queue activemqQueue = createActiveMQQueue(localSession, localQueueName); 346 bridge.setConsumer(null); 347 bridge.setConsumerQueue(activemqQueue); 348 bridge.setConsumerConnection(connection); 349 bridge.setJmsConnector(this); 350 addOutboundBridge(bridge); 351 } 352 localSession.close(); 353 } 354 } 355 356 protected Destination createReplyToBridge(Destination destination, Connection replyToProducerConnection, 357 Connection replyToConsumerConnection) { 358 Queue replyToProducerQueue = (Queue)destination; 359 boolean isInbound = replyToProducerConnection.equals(localConnection.get()); 360 361 if (isInbound) { 362 InboundQueueBridge bridge = (InboundQueueBridge)replyToBridges.get(replyToProducerQueue); 363 if (bridge == null) { 364 bridge = new InboundQueueBridge() { 365 protected Destination processReplyToDestination(Destination destination) { 366 return null; 367 } 368 }; 369 try { 370 QueueSession replyToConsumerSession = ((QueueConnection)replyToConsumerConnection) 371 .createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 372 Queue replyToConsumerQueue = replyToConsumerSession.createTemporaryQueue(); 373 replyToConsumerSession.close(); 374 bridge.setConsumerQueue(replyToConsumerQueue); 375 bridge.setProducerQueue(replyToProducerQueue); 376 bridge.setProducerConnection((QueueConnection)replyToProducerConnection); 377 bridge.setConsumerConnection((QueueConnection)replyToConsumerConnection); 378 bridge.setDoHandleReplyTo(false); 379 if (bridge.getJmsMessageConvertor() == null) { 380 bridge.setJmsMessageConvertor(getInboundMessageConvertor()); 381 } 382 bridge.setJmsConnector(this); 383 bridge.start(); 384 LOG.info("Created replyTo bridge for " + replyToProducerQueue); 385 } catch (Exception e) { 386 LOG.error("Failed to create replyTo bridge for queue: " + replyToProducerQueue, e); 387 return null; 388 } 389 replyToBridges.put(replyToProducerQueue, bridge); 390 } 391 return bridge.getConsumerQueue(); 392 } else { 393 OutboundQueueBridge bridge = (OutboundQueueBridge)replyToBridges.get(replyToProducerQueue); 394 if (bridge == null) { 395 bridge = new OutboundQueueBridge() { 396 protected Destination processReplyToDestination(Destination destination) { 397 return null; 398 } 399 }; 400 try { 401 QueueSession replyToConsumerSession = ((QueueConnection)replyToConsumerConnection) 402 .createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 403 Queue replyToConsumerQueue = replyToConsumerSession.createTemporaryQueue(); 404 replyToConsumerSession.close(); 405 bridge.setConsumerQueue(replyToConsumerQueue); 406 bridge.setProducerQueue(replyToProducerQueue); 407 bridge.setProducerConnection((QueueConnection)replyToProducerConnection); 408 bridge.setConsumerConnection((QueueConnection)replyToConsumerConnection); 409 bridge.setDoHandleReplyTo(false); 410 if (bridge.getJmsMessageConvertor() == null) { 411 bridge.setJmsMessageConvertor(getOutboundMessageConvertor()); 412 } 413 bridge.setJmsConnector(this); 414 bridge.start(); 415 LOG.info("Created replyTo bridge for " + replyToProducerQueue); 416 } catch (Exception e) { 417 LOG.error("Failed to create replyTo bridge for queue: " + replyToProducerQueue, e); 418 return null; 419 } 420 replyToBridges.put(replyToProducerQueue, bridge); 421 } 422 return bridge.getConsumerQueue(); 423 } 424 } 425 426 protected Queue createActiveMQQueue(QueueSession session, String queueName) throws JMSException { 427 return session.createQueue(queueName); 428 } 429 430 protected Queue createForeignQueue(QueueSession session, String queueName) throws JMSException { 431 Queue result = null; 432 try { 433 result = session.createQueue(queueName); 434 } catch (JMSException e) { 435 // look-up the Queue 436 try { 437 result = (Queue)jndiOutboundTemplate.lookup(queueName, Queue.class); 438 } catch (NamingException e1) { 439 String errStr = "Failed to look-up Queue for name: " + queueName; 440 LOG.error(errStr, e); 441 JMSException jmsEx = new JMSException(errStr); 442 jmsEx.setLinkedException(e1); 443 throw jmsEx; 444 } 445 } 446 return result; 447 } 448 449 }