001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.network.jms;
018
019import javax.jms.Connection;
020import javax.jms.Destination;
021import javax.jms.ExceptionListener;
022import javax.jms.JMSException;
023import javax.jms.Topic;
024import javax.jms.TopicConnection;
025import javax.jms.TopicConnectionFactory;
026import javax.jms.TopicSession;
027import javax.jms.Session;
028import javax.naming.NamingException;
029
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033/**
034 * A Bridge to other JMS Topic providers
035 *
036 * @org.apache.xbean.XBean
037 */
038public class JmsTopicConnector extends JmsConnector {
039    private static final Logger LOG = LoggerFactory.getLogger(JmsTopicConnector.class);
040    private String outboundTopicConnectionFactoryName;
041    private String localConnectionFactoryName;
042    private TopicConnectionFactory outboundTopicConnectionFactory;
043    private TopicConnectionFactory localTopicConnectionFactory;
044    private InboundTopicBridge[] inboundTopicBridges;
045    private OutboundTopicBridge[] outboundTopicBridges;
046
047    /**
048     * @return Returns the inboundTopicBridges.
049     */
050    public InboundTopicBridge[] getInboundTopicBridges() {
051        return inboundTopicBridges;
052    }
053
054    /**
055     * @param inboundTopicBridges The inboundTopicBridges to set.
056     */
057    public void setInboundTopicBridges(InboundTopicBridge[] inboundTopicBridges) {
058        this.inboundTopicBridges = inboundTopicBridges;
059    }
060
061    /**
062     * @return Returns the outboundTopicBridges.
063     */
064    public OutboundTopicBridge[] getOutboundTopicBridges() {
065        return outboundTopicBridges;
066    }
067
068    /**
069     * @param outboundTopicBridges The outboundTopicBridges to set.
070     */
071    public void setOutboundTopicBridges(OutboundTopicBridge[] outboundTopicBridges) {
072        this.outboundTopicBridges = outboundTopicBridges;
073    }
074
075    /**
076     * @return Returns the localTopicConnectionFactory.
077     */
078    public TopicConnectionFactory getLocalTopicConnectionFactory() {
079        return localTopicConnectionFactory;
080    }
081
082    /**
083     * @param localTopicConnectionFactory The localTopicConnectionFactory to set.
084     */
085    public void setLocalTopicConnectionFactory(TopicConnectionFactory localConnectionFactory) {
086        this.localTopicConnectionFactory = localConnectionFactory;
087    }
088
089    /**
090     * @return Returns the outboundTopicConnectionFactory.
091     */
092    public TopicConnectionFactory getOutboundTopicConnectionFactory() {
093        return outboundTopicConnectionFactory;
094    }
095
096    /**
097     * @return Returns the outboundTopicConnectionFactoryName.
098     */
099    public String getOutboundTopicConnectionFactoryName() {
100        return outboundTopicConnectionFactoryName;
101    }
102
103    /**
104     * @param outboundTopicConnectionFactoryName The outboundTopicConnectionFactoryName to set.
105     */
106    public void setOutboundTopicConnectionFactoryName(String foreignTopicConnectionFactoryName) {
107        this.outboundTopicConnectionFactoryName = foreignTopicConnectionFactoryName;
108    }
109
110    /**
111     * @return Returns the localConnectionFactoryName.
112     */
113    public String getLocalConnectionFactoryName() {
114        return localConnectionFactoryName;
115    }
116
117    /**
118     * @param localConnectionFactoryName The localConnectionFactoryName to set.
119     */
120    public void setLocalConnectionFactoryName(String localConnectionFactoryName) {
121        this.localConnectionFactoryName = localConnectionFactoryName;
122    }
123
124    /**
125     * @return Returns the localTopicConnection.
126     */
127    public TopicConnection getLocalTopicConnection() {
128        return (TopicConnection) localConnection.get();
129    }
130
131    /**
132     * @param localTopicConnection The localTopicConnection to set.
133     */
134    public void setLocalTopicConnection(TopicConnection localTopicConnection) {
135        this.localConnection.set(localTopicConnection);
136    }
137
138    /**
139     * @return Returns the outboundTopicConnection.
140     */
141    public TopicConnection getOutboundTopicConnection() {
142        return (TopicConnection) foreignConnection.get();
143    }
144
145    /**
146     * @param outboundTopicConnection The outboundTopicConnection to set.
147     */
148    public void setOutboundTopicConnection(TopicConnection foreignTopicConnection) {
149        this.foreignConnection.set(foreignTopicConnection);
150    }
151
152    /**
153     * @param outboundTopicConnectionFactory The outboundTopicConnectionFactory to set.
154     */
155    public void setOutboundTopicConnectionFactory(TopicConnectionFactory foreignTopicConnectionFactory) {
156        this.outboundTopicConnectionFactory = foreignTopicConnectionFactory;
157    }
158
159    @Override
160    protected void initializeForeignConnection() throws NamingException, JMSException {
161
162        final TopicConnection newConnection;
163
164        if (foreignConnection.get() == null) {
165            // get the connection factories
166            if (outboundTopicConnectionFactory == null) {
167                // look it up from JNDI
168                if (outboundTopicConnectionFactoryName != null) {
169                    outboundTopicConnectionFactory = (TopicConnectionFactory)jndiOutboundTemplate
170                        .lookup(outboundTopicConnectionFactoryName, TopicConnectionFactory.class);
171                    if (outboundUsername != null) {
172                        newConnection = outboundTopicConnectionFactory
173                            .createTopicConnection(outboundUsername, outboundPassword);
174                    } else {
175                        newConnection = outboundTopicConnectionFactory.createTopicConnection();
176                    }
177                } else {
178                    throw new JMSException("Cannot create foreignConnection - no information");
179                }
180            } else {
181                if (outboundUsername != null) {
182                    newConnection = outboundTopicConnectionFactory
183                        .createTopicConnection(outboundUsername, outboundPassword);
184                } else {
185                    newConnection = outboundTopicConnectionFactory.createTopicConnection();
186                }
187            }
188        } else {
189            // Clear if for now in case something goes wrong during the init.
190            newConnection = (TopicConnection) foreignConnection.getAndSet(null);
191        }
192
193        if (outboundClientId != null && outboundClientId.length() > 0) {
194            newConnection.setClientID(getOutboundClientId());
195        }
196        newConnection.start();
197
198        outboundMessageConvertor.setConnection(newConnection);
199
200        // Configure the bridges with the new Outbound connection.
201        initializeInboundDestinationBridgesOutboundSide(newConnection);
202        initializeOutboundDestinationBridgesOutboundSide(newConnection);
203
204        // Register for any async error notifications now so we can reset in the
205        // case where there's not a lot of activity and a connection drops.
206        newConnection.setExceptionListener(new ExceptionListener() {
207            @Override
208            public void onException(JMSException exception) {
209                handleConnectionFailure(newConnection);
210            }
211        });
212
213        // At this point all looks good, so this our current connection now.
214        foreignConnection.set(newConnection);
215    }
216
217    @Override
218    protected void initializeLocalConnection() throws NamingException, JMSException {
219
220        final TopicConnection newConnection;
221
222        if (localConnection.get() == null) {
223            // get the connection factories
224            if (localTopicConnectionFactory == null) {
225                if (embeddedConnectionFactory == null) {
226                    // look it up from JNDI
227                    if (localConnectionFactoryName != null) {
228                        localTopicConnectionFactory = (TopicConnectionFactory)jndiLocalTemplate
229                            .lookup(localConnectionFactoryName, TopicConnectionFactory.class);
230                        if (localUsername != null) {
231                            newConnection = localTopicConnectionFactory
232                                .createTopicConnection(localUsername, localPassword);
233                        } else {
234                            newConnection = localTopicConnectionFactory.createTopicConnection();
235                        }
236                    } else {
237                        throw new JMSException("Cannot create localConnection - no information");
238                    }
239                } else {
240                    newConnection = embeddedConnectionFactory.createTopicConnection();
241                }
242            } else {
243                if (localUsername != null) {
244                    newConnection = localTopicConnectionFactory.
245                            createTopicConnection(localUsername, localPassword);
246                } else {
247                    newConnection = localTopicConnectionFactory.createTopicConnection();
248                }
249            }
250
251        } else {
252            // Clear if for now in case something goes wrong during the init.
253            newConnection = (TopicConnection) localConnection.getAndSet(null);
254        }
255
256        if (localClientId != null && localClientId.length() > 0) {
257            newConnection.setClientID(getLocalClientId());
258        }
259        newConnection.start();
260
261        inboundMessageConvertor.setConnection(newConnection);
262
263        // Configure the bridges with the new Local connection.
264        initializeInboundDestinationBridgesLocalSide(newConnection);
265        initializeOutboundDestinationBridgesLocalSide(newConnection);
266
267        // Register for any async error notifications now so we can reset in the
268        // case where there's not a lot of activity and a connection drops.
269        newConnection.setExceptionListener(new ExceptionListener() {
270            @Override
271            public void onException(JMSException exception) {
272                handleConnectionFailure(newConnection);
273            }
274        });
275
276        // At this point all looks good, so this our current connection now.
277        localConnection.set(newConnection);
278    }
279
280    protected void initializeInboundDestinationBridgesOutboundSide(TopicConnection connection) throws JMSException {
281        if (inboundTopicBridges != null) {
282            TopicSession outboundSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
283
284            for (InboundTopicBridge bridge : inboundTopicBridges) {
285                String TopicName = bridge.getInboundTopicName();
286                Topic foreignTopic = createForeignTopic(outboundSession, TopicName);
287                bridge.setConsumer(null);
288                bridge.setConsumerTopic(foreignTopic);
289                bridge.setConsumerConnection(connection);
290                bridge.setJmsConnector(this);
291                addInboundBridge(bridge);
292            }
293            outboundSession.close();
294        }
295    }
296
297    protected void initializeInboundDestinationBridgesLocalSide(TopicConnection connection) throws JMSException {
298        if (inboundTopicBridges != null) {
299            TopicSession localSession = connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
300
301            for (InboundTopicBridge bridge : inboundTopicBridges) {
302                String localTopicName = bridge.getLocalTopicName();
303                Topic activemqTopic = createActiveMQTopic(localSession, localTopicName);
304                bridge.setProducerTopic(activemqTopic);
305                bridge.setProducerConnection(connection);
306                if (bridge.getJmsMessageConvertor() == null) {
307                    bridge.setJmsMessageConvertor(getInboundMessageConvertor());
308                }
309                bridge.setJmsConnector(this);
310                addInboundBridge(bridge);
311            }
312            localSession.close();
313        }
314    }
315
316    protected void initializeOutboundDestinationBridgesOutboundSide(TopicConnection connection) throws JMSException {
317        if (outboundTopicBridges != null) {
318            TopicSession outboundSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
319
320            for (OutboundTopicBridge bridge : outboundTopicBridges) {
321                String topicName = bridge.getOutboundTopicName();
322                Topic foreignTopic = createForeignTopic(outboundSession, topicName);
323                bridge.setProducerTopic(foreignTopic);
324                bridge.setProducerConnection(connection);
325                if (bridge.getJmsMessageConvertor() == null) {
326                    bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
327                }
328                bridge.setJmsConnector(this);
329                addOutboundBridge(bridge);
330            }
331            outboundSession.close();
332        }
333    }
334
335    protected void initializeOutboundDestinationBridgesLocalSide(TopicConnection connection) throws JMSException {
336        if (outboundTopicBridges != null) {
337            TopicSession localSession =
338                    connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
339
340            for (OutboundTopicBridge bridge : outboundTopicBridges) {
341                String localTopicName = bridge.getLocalTopicName();
342                Topic activemqTopic = createActiveMQTopic(localSession, localTopicName);
343                bridge.setConsumer(null);
344                bridge.setConsumerTopic(activemqTopic);
345                bridge.setConsumerConnection(connection);
346                bridge.setJmsConnector(this);
347                addOutboundBridge(bridge);
348            }
349            localSession.close();
350        }
351    }
352
353    protected Destination createReplyToBridge(Destination destination, Connection replyToProducerConnection,
354                                              Connection replyToConsumerConnection) {
355        Topic replyToProducerTopic = (Topic)destination;
356        boolean isInbound = replyToProducerConnection.equals(localConnection.get());
357
358        if (isInbound) {
359            InboundTopicBridge bridge = (InboundTopicBridge)replyToBridges.get(replyToProducerTopic);
360            if (bridge == null) {
361                bridge = new InboundTopicBridge() {
362                    protected Destination processReplyToDestination(Destination destination) {
363                        return null;
364                    }
365                };
366                try {
367                    TopicSession replyToConsumerSession = ((TopicConnection)replyToConsumerConnection)
368                        .createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
369                    Topic replyToConsumerTopic = replyToConsumerSession.createTemporaryTopic();
370                    replyToConsumerSession.close();
371                    bridge.setConsumerTopic(replyToConsumerTopic);
372                    bridge.setProducerTopic(replyToProducerTopic);
373                    bridge.setProducerConnection((TopicConnection)replyToProducerConnection);
374                    bridge.setConsumerConnection((TopicConnection)replyToConsumerConnection);
375                    bridge.setDoHandleReplyTo(false);
376                    if (bridge.getJmsMessageConvertor() == null) {
377                        bridge.setJmsMessageConvertor(getInboundMessageConvertor());
378                    }
379                    bridge.setJmsConnector(this);
380                    bridge.start();
381                    LOG.info("Created replyTo bridge for " + replyToProducerTopic);
382                } catch (Exception e) {
383                    LOG.error("Failed to create replyTo bridge for topic: " + replyToProducerTopic, e);
384                    return null;
385                }
386                replyToBridges.put(replyToProducerTopic, bridge);
387            }
388            return bridge.getConsumerTopic();
389        } else {
390            OutboundTopicBridge bridge = (OutboundTopicBridge)replyToBridges.get(replyToProducerTopic);
391            if (bridge == null) {
392                bridge = new OutboundTopicBridge() {
393                    protected Destination processReplyToDestination(Destination destination) {
394                        return null;
395                    }
396                };
397                try {
398                    TopicSession replyToConsumerSession = ((TopicConnection)replyToConsumerConnection)
399                        .createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
400                    Topic replyToConsumerTopic = replyToConsumerSession.createTemporaryTopic();
401                    replyToConsumerSession.close();
402                    bridge.setConsumerTopic(replyToConsumerTopic);
403                    bridge.setProducerTopic(replyToProducerTopic);
404                    bridge.setProducerConnection((TopicConnection)replyToProducerConnection);
405                    bridge.setConsumerConnection((TopicConnection)replyToConsumerConnection);
406                    bridge.setDoHandleReplyTo(false);
407                    if (bridge.getJmsMessageConvertor() == null) {
408                        bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
409                    }
410                    bridge.setJmsConnector(this);
411                    bridge.start();
412                    LOG.info("Created replyTo bridge for " + replyToProducerTopic);
413                } catch (Exception e) {
414                    LOG.error("Failed to create replyTo bridge for topic: " + replyToProducerTopic, e);
415                    return null;
416                }
417                replyToBridges.put(replyToProducerTopic, bridge);
418            }
419            return bridge.getConsumerTopic();
420        }
421    }
422
423    protected Topic createActiveMQTopic(TopicSession session, String topicName) throws JMSException {
424        return session.createTopic(topicName);
425    }
426
427    protected Topic createForeignTopic(TopicSession session, String topicName) throws JMSException {
428        Topic result = null;
429        try {
430            result = session.createTopic(topicName);
431        } catch (JMSException e) {
432            // look-up the Topic
433            try {
434                result = (Topic)jndiOutboundTemplate.lookup(topicName, Topic.class);
435            } catch (NamingException e1) {
436                String errStr = "Failed to look-up Topic for name: " + topicName;
437                LOG.error(errStr, e);
438                JMSException jmsEx = new JMSException(errStr);
439                jmsEx.setLinkedException(e1);
440                throw jmsEx;
441            }
442        }
443        return result;
444    }
445
446}