001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.ra; 018 019import java.net.URI; 020import java.util.HashMap; 021 022import javax.jms.Connection; 023import javax.jms.JMSException; 024import javax.jms.XAConnection; 025import javax.jms.XASession; 026import javax.resource.NotSupportedException; 027import javax.resource.ResourceException; 028import javax.resource.spi.ActivationSpec; 029import javax.resource.spi.BootstrapContext; 030import javax.resource.spi.ResourceAdapterInternalException; 031import javax.resource.spi.endpoint.MessageEndpointFactory; 032import javax.transaction.xa.XAResource; 033 034import org.apache.activemq.ActiveMQConnection; 035import org.apache.activemq.ActiveMQConnectionFactory; 036import org.apache.activemq.RedeliveryPolicy; 037import org.apache.activemq.broker.BrokerFactory; 038import org.apache.activemq.broker.BrokerService; 039import org.apache.activemq.util.ServiceSupport; 040 041/** 042 * Knows how to connect to one ActiveMQ server. It can then activate endpoints 043 * and deliver messages to those end points using the connection configure in 044 * the resource adapter. <p/>Must override equals and hashCode (JCA spec 16.4) 045 * 046 * @org.apache.xbean.XBean element="resourceAdapter" rootElement="true" 047 * description="The JCA Resource Adaptor for ActiveMQ" 048 * 049 */ 050public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implements MessageResourceAdapter { 051 052 private final HashMap<ActiveMQEndpointActivationKey, ActiveMQEndpointWorker> endpointWorkers = new HashMap<ActiveMQEndpointActivationKey, ActiveMQEndpointWorker>(); 053 054 private BootstrapContext bootstrapContext; 055 private String brokerXmlConfig; 056 private BrokerService broker; 057 private Thread brokerStartThread; 058 private ActiveMQConnectionFactory connectionFactory; 059 060 /** 061 * 062 */ 063 public ActiveMQResourceAdapter() { 064 super(); 065 } 066 067 /** 068 * @see javax.resource.spi.ResourceAdapter#start(javax.resource.spi.BootstrapContext) 069 */ 070 public void start(BootstrapContext bootstrapContext) throws ResourceAdapterInternalException { 071 this.bootstrapContext = bootstrapContext; 072 if (brokerXmlConfig != null && brokerXmlConfig.trim().length() > 0) { 073 brokerStartThread = new Thread("Starting ActiveMQ Broker") { 074 @Override 075 public void run () { 076 try { 077 // ensure RAR resources are available to xbean (needed for weblogic) 078 log.debug("original thread context classLoader: " + Thread.currentThread().getContextClassLoader()); 079 Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); 080 log.debug("current (from getClass()) thread context classLoader: " + Thread.currentThread().getContextClassLoader()); 081 082 synchronized( ActiveMQResourceAdapter.this ) { 083 broker = BrokerFactory.createBroker(new URI(brokerXmlConfig)); 084 } 085 broker.start(); 086 } catch (Throwable e) { 087 log.warn("Could not start up embeded ActiveMQ Broker '"+brokerXmlConfig+"': "+e.getMessage()); 088 log.debug("Reason for: "+e.getMessage(), e); 089 } 090 } 091 }; 092 brokerStartThread.setDaemon(true); 093 brokerStartThread.start(); 094 095 // Wait up to 5 seconds for the broker to start up in the async thread.. otherwise keep going without it.. 096 try { 097 brokerStartThread.join(1000*5); 098 } catch (InterruptedException e) { 099 Thread.currentThread().interrupt(); 100 } 101 } 102 } 103 104 /** 105 * @see org.apache.activemq.ra.MessageResourceAdapter#makeConnection() 106 */ 107 public ActiveMQConnection makeConnection() throws JMSException { 108 if( connectionFactory == null ) { 109 return makeConnection(getInfo()); 110 } else { 111 return makeConnection(getInfo(), connectionFactory); 112 } 113 } 114 115 /** 116 * @param activationSpec 117 */ 118 public ActiveMQConnection makeConnection(MessageActivationSpec activationSpec) throws JMSException { 119 ActiveMQConnectionFactory cf = getConnectionFactory(); 120 if (cf == null) { 121 cf = createConnectionFactory(getInfo()); 122 } 123 String userName = defaultValue(activationSpec.getUserName(), getInfo().getUserName()); 124 String password = defaultValue(activationSpec.getPassword(), getInfo().getPassword()); 125 String clientId = activationSpec.getClientId(); 126 if (clientId != null) { 127 cf.setClientID(clientId); 128 } else { 129 if (activationSpec.isDurableSubscription()) { 130 log.warn("No clientID specified for durable subscription: " + activationSpec); 131 } 132 } 133 ActiveMQConnection physicalConnection = (ActiveMQConnection) cf.createConnection(userName, password); 134 135 // have we configured a redelivery policy 136 RedeliveryPolicy redeliveryPolicy = activationSpec.redeliveryPolicy(); 137 if (redeliveryPolicy != null) { 138 physicalConnection.setRedeliveryPolicy(redeliveryPolicy); 139 } 140 return physicalConnection; 141 } 142 143 /** 144 * @see javax.resource.spi.ResourceAdapter#stop() 145 */ 146 public void stop() { 147 while (endpointWorkers.size() > 0) { 148 ActiveMQEndpointActivationKey key = endpointWorkers.keySet().iterator().next(); 149 endpointDeactivation(key.getMessageEndpointFactory(), key.getActivationSpec()); 150 } 151 152 synchronized( this ) { 153 if (broker != null) { 154 if( brokerStartThread.isAlive() ) { 155 brokerStartThread.interrupt(); 156 } 157 ServiceSupport.dispose(broker); 158 broker = null; 159 } 160 } 161 162 this.bootstrapContext = null; 163 } 164 165 /** 166 * @see org.apache.activemq.ra.MessageResourceAdapter#getBootstrapContext() 167 */ 168 public BootstrapContext getBootstrapContext() { 169 return bootstrapContext; 170 } 171 172 /** 173 * @see javax.resource.spi.ResourceAdapter#endpointActivation(javax.resource.spi.endpoint.MessageEndpointFactory, 174 * javax.resource.spi.ActivationSpec) 175 */ 176 public void endpointActivation(MessageEndpointFactory endpointFactory, ActivationSpec activationSpec) throws ResourceException { 177 178 // spec section 5.3.3 179 if (!equals(activationSpec.getResourceAdapter())) { 180 throw new ResourceException("Activation spec not initialized with this ResourceAdapter instance (" + activationSpec.getResourceAdapter() + " != " + this + ")"); 181 } 182 183 if (!(activationSpec instanceof MessageActivationSpec)) { 184 throw new NotSupportedException("That type of ActicationSpec not supported: " + activationSpec.getClass()); 185 } 186 187 ActiveMQEndpointActivationKey key = new ActiveMQEndpointActivationKey(endpointFactory, (MessageActivationSpec)activationSpec); 188 // This is weird.. the same endpoint activated twice.. must be a 189 // container error. 190 if (endpointWorkers.containsKey(key)) { 191 throw new IllegalStateException("Endpoint previously activated"); 192 } 193 194 ActiveMQEndpointWorker worker = new ActiveMQEndpointWorker(this, key); 195 196 endpointWorkers.put(key, worker); 197 worker.start(); 198 } 199 200 /** 201 * @see javax.resource.spi.ResourceAdapter#endpointDeactivation(javax.resource.spi.endpoint.MessageEndpointFactory, 202 * javax.resource.spi.ActivationSpec) 203 */ 204 public void endpointDeactivation(MessageEndpointFactory endpointFactory, ActivationSpec activationSpec) { 205 206 if (activationSpec instanceof MessageActivationSpec) { 207 ActiveMQEndpointActivationKey key = new ActiveMQEndpointActivationKey(endpointFactory, (MessageActivationSpec)activationSpec); 208 ActiveMQEndpointWorker worker = endpointWorkers.remove(key); 209 if (worker == null) { 210 // This is weird.. that endpoint was not activated.. oh well.. 211 // this method 212 // does not throw exceptions so just return. 213 return; 214 } 215 try { 216 worker.stop(); 217 } catch (InterruptedException e) { 218 // We interrupted.. we won't throw an exception but will stop 219 // waiting for the worker 220 // to stop.. we tried our best. Keep trying to interrupt the 221 // thread. 222 Thread.currentThread().interrupt(); 223 } 224 225 } 226 227 } 228 229 /** 230 * We only connect to one resource manager per ResourceAdapter instance, so 231 * any ActivationSpec will return the same XAResource. 232 * 233 * @see javax.resource.spi.ResourceAdapter#getXAResources(javax.resource.spi.ActivationSpec[]) 234 */ 235 public XAResource[] getXAResources(ActivationSpec[] activationSpecs) throws ResourceException { 236 Connection connection = null; 237 try { 238 connection = makeConnection(); 239 if (connection instanceof XAConnection) { 240 XASession session = ((XAConnection)connection).createXASession(); 241 XAResource xaResource = session.getXAResource(); 242 return new XAResource[] { 243 xaResource 244 }; 245 } 246 return new XAResource[] {}; 247 } catch (JMSException e) { 248 throw new ResourceException(e); 249 } finally { 250 try { 251 connection.close(); 252 } catch (Throwable ignore) { 253 // 254 } 255 } 256 } 257 258 // /////////////////////////////////////////////////////////////////////// 259 // 260 // Java Bean getters and setters for this ResourceAdapter class. 261 // 262 // /////////////////////////////////////////////////////////////////////// 263 264 /** 265 * @see org.apache.activemq.ra.MessageResourceAdapter#getBrokerXmlConfig() 266 */ 267 public String getBrokerXmlConfig() { 268 return brokerXmlConfig; 269 } 270 271 /** 272 * Sets the <a href="http://activemq.org/Xml+Configuration">XML 273 * configuration file </a> used to configure the ActiveMQ broker via Spring 274 * if using embedded mode. 275 * 276 * @param brokerXmlConfig is the filename which is assumed to be on the 277 * classpath unless a URL is specified. So a value of 278 * <code>foo/bar.xml</code> would be assumed to be on the 279 * classpath whereas <code>file:dir/file.xml</code> would 280 * use the file system. Any valid URL string is supported. 281 */ 282 public void setBrokerXmlConfig(String brokerXmlConfig) { 283 this.brokerXmlConfig = brokerXmlConfig; 284 } 285 286 /** 287 * @see java.lang.Object#equals(java.lang.Object) 288 */ 289 @Override 290 public boolean equals(Object o) { 291 if (this == o) { 292 return true; 293 } 294 if (!(o instanceof MessageResourceAdapter)) { 295 return false; 296 } 297 298 final MessageResourceAdapter activeMQResourceAdapter = (MessageResourceAdapter)o; 299 300 if (!getInfo().equals(activeMQResourceAdapter.getInfo())) { 301 return false; 302 } 303 if (notEqual(brokerXmlConfig, activeMQResourceAdapter.getBrokerXmlConfig())) { 304 return false; 305 } 306 307 return true; 308 } 309 310 /** 311 * @see java.lang.Object#hashCode() 312 */ 313 @Override 314 public int hashCode() { 315 int result; 316 result = getInfo().hashCode(); 317 if (brokerXmlConfig != null) { 318 result ^= brokerXmlConfig.hashCode(); 319 } 320 return result; 321 } 322 323 public ActiveMQConnectionFactory getConnectionFactory() { 324 return connectionFactory; 325 } 326 327 public void setConnectionFactory(ActiveMQConnectionFactory aConnectionFactory) { 328 this.connectionFactory = aConnectionFactory; 329 } 330 331 332 }