001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.network.jms; 018 019 import java.util.concurrent.atomic.AtomicBoolean; 020 import javax.jms.Connection; 021 import javax.jms.Destination; 022 import javax.jms.JMSException; 023 import javax.jms.Message; 024 import javax.jms.MessageConsumer; 025 import javax.jms.MessageListener; 026 import javax.jms.MessageProducer; 027 import org.apache.activemq.Service; 028 import org.slf4j.Logger; 029 import org.slf4j.LoggerFactory; 030 031 /** 032 * A Destination bridge is used to bridge between to different JMS systems 033 */ 034 public abstract class DestinationBridge implements Service, MessageListener { 035 036 private static final Logger LOG = LoggerFactory.getLogger(DestinationBridge.class); 037 038 protected MessageConsumer consumer; 039 protected AtomicBoolean started = new AtomicBoolean(false); 040 protected JmsMesageConvertor jmsMessageConvertor; 041 protected boolean doHandleReplyTo = true; 042 protected JmsConnector jmsConnector; 043 044 /** 045 * @return Returns the consumer. 046 */ 047 public MessageConsumer getConsumer() { 048 return consumer; 049 } 050 051 /** 052 * @param consumer The consumer to set. 053 */ 054 public void setConsumer(MessageConsumer consumer) { 055 this.consumer = consumer; 056 } 057 058 /** 059 * @param connector 060 */ 061 public void setJmsConnector(JmsConnector connector) { 062 this.jmsConnector = connector; 063 } 064 065 /** 066 * @return Returns the inboundMessageConvertor. 067 */ 068 public JmsMesageConvertor getJmsMessageConvertor() { 069 return jmsMessageConvertor; 070 } 071 072 /** 073 * @param jmsMessageConvertor 074 */ 075 public void setJmsMessageConvertor(JmsMesageConvertor jmsMessageConvertor) { 076 this.jmsMessageConvertor = jmsMessageConvertor; 077 } 078 079 protected Destination processReplyToDestination(Destination destination) { 080 return jmsConnector.createReplyToBridge(destination, getConnnectionForConsumer(), getConnectionForProducer()); 081 } 082 083 public void start() throws Exception { 084 if (started.compareAndSet(false, true)) { 085 createConsumer(); 086 createProducer(); 087 } 088 } 089 090 public void stop() throws Exception { 091 started.set(false); 092 } 093 094 public void onMessage(Message message) { 095 096 int attempt = 0; 097 final int maxRetries = jmsConnector.getReconnectionPolicy().getMaxSendRetries(); 098 099 while (started.get() && message != null && ++attempt <= maxRetries) { 100 101 try { 102 103 if (attempt > 0) { 104 try { 105 Thread.sleep(jmsConnector.getReconnectionPolicy().getNextDelay(attempt)); 106 } catch(InterruptedException e) { 107 break; 108 } 109 } 110 111 Message converted; 112 if (jmsMessageConvertor != null) { 113 if (doHandleReplyTo) { 114 Destination replyTo = message.getJMSReplyTo(); 115 if (replyTo != null) { 116 converted = jmsMessageConvertor.convert(message, processReplyToDestination(replyTo)); 117 } else { 118 converted = jmsMessageConvertor.convert(message); 119 } 120 } else { 121 message.setJMSReplyTo(null); 122 converted = jmsMessageConvertor.convert(message); 123 } 124 } else { 125 // The Producer side is not up or not yet configured, retry. 126 continue; 127 } 128 129 try { 130 sendMessage(converted); 131 } catch(Exception e) { 132 jmsConnector.handleConnectionFailure(getConnectionForProducer()); 133 continue; 134 } 135 136 try { 137 message.acknowledge(); 138 } catch(Exception e) { 139 jmsConnector.handleConnectionFailure(getConnnectionForConsumer()); 140 continue; 141 } 142 143 // if we got here then it made it out and was ack'd 144 return; 145 146 } catch (Exception e) { 147 LOG.info("failed to forward message on attempt: " + attempt + 148 " reason: " + e + " message: " + message, e); 149 } 150 } 151 } 152 153 /** 154 * @return Returns the doHandleReplyTo. 155 */ 156 protected boolean isDoHandleReplyTo() { 157 return doHandleReplyTo; 158 } 159 160 /** 161 * @param doHandleReplyTo The doHandleReplyTo to set. 162 */ 163 protected void setDoHandleReplyTo(boolean doHandleReplyTo) { 164 this.doHandleReplyTo = doHandleReplyTo; 165 } 166 167 protected abstract MessageConsumer createConsumer() throws JMSException; 168 169 protected abstract MessageProducer createProducer() throws JMSException; 170 171 protected abstract void sendMessage(Message message) throws JMSException; 172 173 protected abstract Connection getConnnectionForConsumer(); 174 175 protected abstract Connection getConnectionForProducer(); 176 177 }