001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.network.jms; 018 019import javax.jms.Connection; 020import javax.jms.Destination; 021import javax.jms.ExceptionListener; 022import javax.jms.JMSException; 023import javax.jms.Topic; 024import javax.jms.TopicConnection; 025import javax.jms.TopicConnectionFactory; 026import javax.jms.TopicSession; 027import javax.jms.Session; 028import javax.naming.NamingException; 029 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033/** 034 * A Bridge to other JMS Topic providers 035 * 036 * @org.apache.xbean.XBean 037 */ 038public class JmsTopicConnector extends JmsConnector { 039 private static final Logger LOG = LoggerFactory.getLogger(JmsTopicConnector.class); 040 private String outboundTopicConnectionFactoryName; 041 private String localConnectionFactoryName; 042 private TopicConnectionFactory outboundTopicConnectionFactory; 043 private TopicConnectionFactory localTopicConnectionFactory; 044 private InboundTopicBridge[] inboundTopicBridges; 045 private OutboundTopicBridge[] outboundTopicBridges; 046 047 /** 048 * @return Returns the inboundTopicBridges. 049 */ 050 public InboundTopicBridge[] getInboundTopicBridges() { 051 return inboundTopicBridges; 052 } 053 054 /** 055 * @param inboundTopicBridges The inboundTopicBridges to set. 056 */ 057 public void setInboundTopicBridges(InboundTopicBridge[] inboundTopicBridges) { 058 this.inboundTopicBridges = inboundTopicBridges; 059 } 060 061 /** 062 * @return Returns the outboundTopicBridges. 063 */ 064 public OutboundTopicBridge[] getOutboundTopicBridges() { 065 return outboundTopicBridges; 066 } 067 068 /** 069 * @param outboundTopicBridges The outboundTopicBridges to set. 070 */ 071 public void setOutboundTopicBridges(OutboundTopicBridge[] outboundTopicBridges) { 072 this.outboundTopicBridges = outboundTopicBridges; 073 } 074 075 /** 076 * @return Returns the localTopicConnectionFactory. 077 */ 078 public TopicConnectionFactory getLocalTopicConnectionFactory() { 079 return localTopicConnectionFactory; 080 } 081 082 /** 083 * @param localTopicConnectionFactory The localTopicConnectionFactory to set. 084 */ 085 public void setLocalTopicConnectionFactory(TopicConnectionFactory localConnectionFactory) { 086 this.localTopicConnectionFactory = localConnectionFactory; 087 } 088 089 /** 090 * @return Returns the outboundTopicConnectionFactory. 091 */ 092 public TopicConnectionFactory getOutboundTopicConnectionFactory() { 093 return outboundTopicConnectionFactory; 094 } 095 096 /** 097 * @return Returns the outboundTopicConnectionFactoryName. 098 */ 099 public String getOutboundTopicConnectionFactoryName() { 100 return outboundTopicConnectionFactoryName; 101 } 102 103 /** 104 * @param outboundTopicConnectionFactoryName The outboundTopicConnectionFactoryName to set. 105 */ 106 public void setOutboundTopicConnectionFactoryName(String foreignTopicConnectionFactoryName) { 107 this.outboundTopicConnectionFactoryName = foreignTopicConnectionFactoryName; 108 } 109 110 /** 111 * @return Returns the localConnectionFactoryName. 112 */ 113 public String getLocalConnectionFactoryName() { 114 return localConnectionFactoryName; 115 } 116 117 /** 118 * @param localConnectionFactoryName The localConnectionFactoryName to set. 119 */ 120 public void setLocalConnectionFactoryName(String localConnectionFactoryName) { 121 this.localConnectionFactoryName = localConnectionFactoryName; 122 } 123 124 /** 125 * @return Returns the localTopicConnection. 126 */ 127 public TopicConnection getLocalTopicConnection() { 128 return (TopicConnection) localConnection.get(); 129 } 130 131 /** 132 * @param localTopicConnection The localTopicConnection to set. 133 */ 134 public void setLocalTopicConnection(TopicConnection localTopicConnection) { 135 this.localConnection.set(localTopicConnection); 136 } 137 138 /** 139 * @return Returns the outboundTopicConnection. 140 */ 141 public TopicConnection getOutboundTopicConnection() { 142 return (TopicConnection) foreignConnection.get(); 143 } 144 145 /** 146 * @param outboundTopicConnection The outboundTopicConnection to set. 147 */ 148 public void setOutboundTopicConnection(TopicConnection foreignTopicConnection) { 149 this.foreignConnection.set(foreignTopicConnection); 150 } 151 152 /** 153 * @param outboundTopicConnectionFactory The outboundTopicConnectionFactory to set. 154 */ 155 public void setOutboundTopicConnectionFactory(TopicConnectionFactory foreignTopicConnectionFactory) { 156 this.outboundTopicConnectionFactory = foreignTopicConnectionFactory; 157 } 158 159 @Override 160 protected void initializeForeignConnection() throws NamingException, JMSException { 161 162 final TopicConnection newConnection; 163 164 if (foreignConnection.get() == null) { 165 // get the connection factories 166 if (outboundTopicConnectionFactory == null) { 167 // look it up from JNDI 168 if (outboundTopicConnectionFactoryName != null) { 169 outboundTopicConnectionFactory = (TopicConnectionFactory)jndiOutboundTemplate 170 .lookup(outboundTopicConnectionFactoryName, TopicConnectionFactory.class); 171 if (outboundUsername != null) { 172 newConnection = outboundTopicConnectionFactory 173 .createTopicConnection(outboundUsername, outboundPassword); 174 } else { 175 newConnection = outboundTopicConnectionFactory.createTopicConnection(); 176 } 177 } else { 178 throw new JMSException("Cannot create foreignConnection - no information"); 179 } 180 } else { 181 if (outboundUsername != null) { 182 newConnection = outboundTopicConnectionFactory 183 .createTopicConnection(outboundUsername, outboundPassword); 184 } else { 185 newConnection = outboundTopicConnectionFactory.createTopicConnection(); 186 } 187 } 188 } else { 189 // Clear if for now in case something goes wrong during the init. 190 newConnection = (TopicConnection) foreignConnection.getAndSet(null); 191 } 192 193 if (outboundClientId != null && outboundClientId.length() > 0) { 194 newConnection.setClientID(getOutboundClientId()); 195 } 196 newConnection.start(); 197 198 outboundMessageConvertor.setConnection(newConnection); 199 200 // Configure the bridges with the new Outbound connection. 201 initializeInboundDestinationBridgesOutboundSide(newConnection); 202 initializeOutboundDestinationBridgesOutboundSide(newConnection); 203 204 // Register for any async error notifications now so we can reset in the 205 // case where there's not a lot of activity and a connection drops. 206 newConnection.setExceptionListener(new ExceptionListener() { 207 @Override 208 public void onException(JMSException exception) { 209 handleConnectionFailure(newConnection); 210 } 211 }); 212 213 // At this point all looks good, so this our current connection now. 214 foreignConnection.set(newConnection); 215 } 216 217 @Override 218 protected void initializeLocalConnection() throws NamingException, JMSException { 219 220 final TopicConnection newConnection; 221 222 if (localConnection.get() == null) { 223 // get the connection factories 224 if (localTopicConnectionFactory == null) { 225 if (embeddedConnectionFactory == null) { 226 // look it up from JNDI 227 if (localConnectionFactoryName != null) { 228 localTopicConnectionFactory = (TopicConnectionFactory)jndiLocalTemplate 229 .lookup(localConnectionFactoryName, TopicConnectionFactory.class); 230 if (localUsername != null) { 231 newConnection = localTopicConnectionFactory 232 .createTopicConnection(localUsername, localPassword); 233 } else { 234 newConnection = localTopicConnectionFactory.createTopicConnection(); 235 } 236 } else { 237 throw new JMSException("Cannot create localConnection - no information"); 238 } 239 } else { 240 newConnection = embeddedConnectionFactory.createTopicConnection(); 241 } 242 } else { 243 if (localUsername != null) { 244 newConnection = localTopicConnectionFactory. 245 createTopicConnection(localUsername, localPassword); 246 } else { 247 newConnection = localTopicConnectionFactory.createTopicConnection(); 248 } 249 } 250 251 } else { 252 // Clear if for now in case something goes wrong during the init. 253 newConnection = (TopicConnection) localConnection.getAndSet(null); 254 } 255 256 if (localClientId != null && localClientId.length() > 0) { 257 newConnection.setClientID(getLocalClientId()); 258 } 259 newConnection.start(); 260 261 inboundMessageConvertor.setConnection(newConnection); 262 263 // Configure the bridges with the new Local connection. 264 initializeInboundDestinationBridgesLocalSide(newConnection); 265 initializeOutboundDestinationBridgesLocalSide(newConnection); 266 267 // Register for any async error notifications now so we can reset in the 268 // case where there's not a lot of activity and a connection drops. 269 newConnection.setExceptionListener(new ExceptionListener() { 270 @Override 271 public void onException(JMSException exception) { 272 handleConnectionFailure(newConnection); 273 } 274 }); 275 276 // At this point all looks good, so this our current connection now. 277 localConnection.set(newConnection); 278 } 279 280 protected void initializeInboundDestinationBridgesOutboundSide(TopicConnection connection) throws JMSException { 281 if (inboundTopicBridges != null) { 282 TopicSession outboundSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 283 284 for (InboundTopicBridge bridge : inboundTopicBridges) { 285 String TopicName = bridge.getInboundTopicName(); 286 Topic foreignTopic = createForeignTopic(outboundSession, TopicName); 287 bridge.setConsumer(null); 288 bridge.setConsumerTopic(foreignTopic); 289 bridge.setConsumerConnection(connection); 290 bridge.setJmsConnector(this); 291 addInboundBridge(bridge); 292 } 293 outboundSession.close(); 294 } 295 } 296 297 protected void initializeInboundDestinationBridgesLocalSide(TopicConnection connection) throws JMSException { 298 if (inboundTopicBridges != null) { 299 TopicSession localSession = connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); 300 301 for (InboundTopicBridge bridge : inboundTopicBridges) { 302 String localTopicName = bridge.getLocalTopicName(); 303 Topic activemqTopic = createActiveMQTopic(localSession, localTopicName); 304 bridge.setProducerTopic(activemqTopic); 305 bridge.setProducerConnection(connection); 306 if (bridge.getJmsMessageConvertor() == null) { 307 bridge.setJmsMessageConvertor(getInboundMessageConvertor()); 308 } 309 bridge.setJmsConnector(this); 310 addInboundBridge(bridge); 311 } 312 localSession.close(); 313 } 314 } 315 316 protected void initializeOutboundDestinationBridgesOutboundSide(TopicConnection connection) throws JMSException { 317 if (outboundTopicBridges != null) { 318 TopicSession outboundSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 319 320 for (OutboundTopicBridge bridge : outboundTopicBridges) { 321 String topicName = bridge.getOutboundTopicName(); 322 Topic foreignTopic = createForeignTopic(outboundSession, topicName); 323 bridge.setProducerTopic(foreignTopic); 324 bridge.setProducerConnection(connection); 325 if (bridge.getJmsMessageConvertor() == null) { 326 bridge.setJmsMessageConvertor(getOutboundMessageConvertor()); 327 } 328 bridge.setJmsConnector(this); 329 addOutboundBridge(bridge); 330 } 331 outboundSession.close(); 332 } 333 } 334 335 protected void initializeOutboundDestinationBridgesLocalSide(TopicConnection connection) throws JMSException { 336 if (outboundTopicBridges != null) { 337 TopicSession localSession = 338 connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 339 340 for (OutboundTopicBridge bridge : outboundTopicBridges) { 341 String localTopicName = bridge.getLocalTopicName(); 342 Topic activemqTopic = createActiveMQTopic(localSession, localTopicName); 343 bridge.setConsumer(null); 344 bridge.setConsumerTopic(activemqTopic); 345 bridge.setConsumerConnection(connection); 346 bridge.setJmsConnector(this); 347 addOutboundBridge(bridge); 348 } 349 localSession.close(); 350 } 351 } 352 353 protected Destination createReplyToBridge(Destination destination, Connection replyToProducerConnection, 354 Connection replyToConsumerConnection) { 355 Topic replyToProducerTopic = (Topic)destination; 356 boolean isInbound = replyToProducerConnection.equals(localConnection.get()); 357 358 if (isInbound) { 359 InboundTopicBridge bridge = (InboundTopicBridge)replyToBridges.get(replyToProducerTopic); 360 if (bridge == null) { 361 bridge = new InboundTopicBridge() { 362 protected Destination processReplyToDestination(Destination destination) { 363 return null; 364 } 365 }; 366 try { 367 TopicSession replyToConsumerSession = ((TopicConnection)replyToConsumerConnection) 368 .createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 369 Topic replyToConsumerTopic = replyToConsumerSession.createTemporaryTopic(); 370 replyToConsumerSession.close(); 371 bridge.setConsumerTopic(replyToConsumerTopic); 372 bridge.setProducerTopic(replyToProducerTopic); 373 bridge.setProducerConnection((TopicConnection)replyToProducerConnection); 374 bridge.setConsumerConnection((TopicConnection)replyToConsumerConnection); 375 bridge.setDoHandleReplyTo(false); 376 if (bridge.getJmsMessageConvertor() == null) { 377 bridge.setJmsMessageConvertor(getInboundMessageConvertor()); 378 } 379 bridge.setJmsConnector(this); 380 bridge.start(); 381 LOG.info("Created replyTo bridge for " + replyToProducerTopic); 382 } catch (Exception e) { 383 LOG.error("Failed to create replyTo bridge for topic: " + replyToProducerTopic, e); 384 return null; 385 } 386 replyToBridges.put(replyToProducerTopic, bridge); 387 } 388 return bridge.getConsumerTopic(); 389 } else { 390 OutboundTopicBridge bridge = (OutboundTopicBridge)replyToBridges.get(replyToProducerTopic); 391 if (bridge == null) { 392 bridge = new OutboundTopicBridge() { 393 protected Destination processReplyToDestination(Destination destination) { 394 return null; 395 } 396 }; 397 try { 398 TopicSession replyToConsumerSession = ((TopicConnection)replyToConsumerConnection) 399 .createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 400 Topic replyToConsumerTopic = replyToConsumerSession.createTemporaryTopic(); 401 replyToConsumerSession.close(); 402 bridge.setConsumerTopic(replyToConsumerTopic); 403 bridge.setProducerTopic(replyToProducerTopic); 404 bridge.setProducerConnection((TopicConnection)replyToProducerConnection); 405 bridge.setConsumerConnection((TopicConnection)replyToConsumerConnection); 406 bridge.setDoHandleReplyTo(false); 407 if (bridge.getJmsMessageConvertor() == null) { 408 bridge.setJmsMessageConvertor(getOutboundMessageConvertor()); 409 } 410 bridge.setJmsConnector(this); 411 bridge.start(); 412 LOG.info("Created replyTo bridge for " + replyToProducerTopic); 413 } catch (Exception e) { 414 LOG.error("Failed to create replyTo bridge for topic: " + replyToProducerTopic, e); 415 return null; 416 } 417 replyToBridges.put(replyToProducerTopic, bridge); 418 } 419 return bridge.getConsumerTopic(); 420 } 421 } 422 423 protected Topic createActiveMQTopic(TopicSession session, String topicName) throws JMSException { 424 return session.createTopic(topicName); 425 } 426 427 protected Topic createForeignTopic(TopicSession session, String topicName) throws JMSException { 428 Topic result = null; 429 try { 430 result = session.createTopic(topicName); 431 } catch (JMSException e) { 432 // look-up the Topic 433 try { 434 result = (Topic)jndiOutboundTemplate.lookup(topicName, Topic.class); 435 } catch (NamingException e1) { 436 String errStr = "Failed to look-up Topic for name: " + topicName; 437 LOG.error(errStr, e); 438 JMSException jmsEx = new JMSException(errStr); 439 jmsEx.setLinkedException(e1); 440 throw jmsEx; 441 } 442 } 443 return result; 444 } 445 446}