001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.transport.vm; 018 019 import java.io.IOException; 020 import java.io.InterruptedIOException; 021 import java.net.URI; 022 import java.util.concurrent.BlockingQueue; 023 import java.util.concurrent.LinkedBlockingQueue; 024 import java.util.concurrent.TimeUnit; 025 import java.util.concurrent.atomic.AtomicBoolean; 026 import java.util.concurrent.atomic.AtomicLong; 027 028 import org.apache.activemq.command.ShutdownInfo; 029 import org.apache.activemq.thread.DefaultThreadPools; 030 import org.apache.activemq.thread.Task; 031 import org.apache.activemq.thread.TaskRunner; 032 import org.apache.activemq.transport.FutureResponse; 033 import org.apache.activemq.transport.ResponseCallback; 034 import org.apache.activemq.transport.Transport; 035 import org.apache.activemq.transport.TransportDisposedIOException; 036 import org.apache.activemq.transport.TransportListener; 037 038 /** 039 * A Transport implementation that uses direct method invocations. 040 */ 041 public class VMTransport implements Transport, Task { 042 043 private static final Object DISCONNECT = new Object(); 044 private static final AtomicLong NEXT_ID = new AtomicLong(0); 045 046 // Transport Configuration 047 protected VMTransport peer; 048 protected TransportListener transportListener; 049 protected boolean marshal; 050 protected boolean network; 051 protected boolean async = true; 052 protected int asyncQueueDepth = 2000; 053 protected final URI location; 054 protected final long id; 055 056 // Implementation 057 private LinkedBlockingQueue<Object> messageQueue; 058 private TaskRunner taskRunner; 059 060 // Transport State 061 protected final AtomicBoolean started = new AtomicBoolean(); 062 protected final AtomicBoolean disposed = new AtomicBoolean(); 063 064 private volatile int receiveCounter; 065 066 public VMTransport(URI location) { 067 this.location = location; 068 this.id = NEXT_ID.getAndIncrement(); 069 } 070 071 public void setPeer(VMTransport peer) { 072 this.peer = peer; 073 } 074 075 public void oneway(Object command) throws IOException { 076 077 if (disposed.get()) { 078 throw new TransportDisposedIOException("Transport disposed."); 079 } 080 081 if (peer == null) { 082 throw new IOException("Peer not connected."); 083 } 084 085 try { 086 087 if (peer.disposed.get()) { 088 throw new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed."); 089 } 090 091 if (peer.async || !peer.started.get()) { 092 peer.getMessageQueue().put(command); 093 peer.wakeup(); 094 return; 095 } 096 097 } catch (InterruptedException e) { 098 InterruptedIOException iioe = new InterruptedIOException(e.getMessage()); 099 iioe.initCause(e); 100 throw iioe; 101 } 102 103 dispatch(peer, peer.messageQueue, command); 104 } 105 106 public void dispatch(VMTransport transport, BlockingQueue<Object> pending, Object command) { 107 TransportListener transportListener = transport.getTransportListener(); 108 if (transportListener != null) { 109 synchronized (started) { 110 111 // Ensure that no additional commands entered the queue in the small time window 112 // before the start method locks the dispatch lock and the oneway method was in 113 // an put operation. 114 while(pending != null && !pending.isEmpty() && !transport.isDisposed()) { 115 doDispatch(transport, transportListener, pending.poll()); 116 } 117 118 // We are now in sync mode and won't enqueue any more commands to the target 119 // transport so lets clean up its resources. 120 transport.messageQueue = null; 121 122 // Don't dispatch if either end was disposed already. 123 if (command != null && !this.disposed.get() && !transport.isDisposed()) { 124 doDispatch(transport, transportListener, command); 125 } 126 } 127 } 128 } 129 130 public void doDispatch(VMTransport transport, TransportListener transportListener, Object command) { 131 if (command == DISCONNECT) { 132 transportListener.onException(new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed.")); 133 } else { 134 transport.receiveCounter++; 135 transportListener.onCommand(command); 136 } 137 } 138 139 public void start() throws Exception { 140 141 if (transportListener == null) { 142 throw new IOException("TransportListener not set."); 143 } 144 145 // If we are not in async mode we lock the dispatch lock here and then start to 146 // prevent any sync dispatches from occurring until we dispatch the pending messages 147 // to maintain delivery order. When async this happens automatically so just set 148 // started and wakeup the task runner. 149 if (!async) { 150 synchronized (started) { 151 if (started.compareAndSet(false, true)) { 152 LinkedBlockingQueue<Object> mq = getMessageQueue(); 153 Object command; 154 while ((command = mq.poll()) != null && !disposed.get() ) { 155 receiveCounter++; 156 doDispatch(this, transportListener, command); 157 } 158 } 159 } 160 } else { 161 if (started.compareAndSet(false, true)) { 162 wakeup(); 163 } 164 } 165 } 166 167 public void stop() throws Exception { 168 // Only need to do this once, all future oneway calls will now 169 // fail as will any asnyc jobs in the task runner. 170 if (disposed.compareAndSet(false, true)) { 171 172 TaskRunner tr = taskRunner; 173 LinkedBlockingQueue<Object> mq = this.messageQueue; 174 175 taskRunner = null; 176 messageQueue = null; 177 178 if (mq != null) { 179 mq.clear(); 180 } 181 182 // Allow pending deliveries to finish up, but don't wait 183 // forever in case of an stalled onCommand. 184 if (tr != null) { 185 try { 186 tr.shutdown(TimeUnit.SECONDS.toMillis(1)); 187 } catch(Exception e) { 188 } 189 } 190 191 // let the peer know that we are disconnecting after attempting 192 // to cleanly shutdown the async tasks so that this is the last 193 // command it see's. 194 try { 195 peer.transportListener.onCommand(new ShutdownInfo()); 196 } catch (Exception ignore) { 197 } 198 } 199 } 200 201 protected void wakeup() { 202 if (async && started.get()) { 203 try { 204 getTaskRunner().wakeup(); 205 } catch (InterruptedException e) { 206 Thread.currentThread().interrupt(); 207 } catch (TransportDisposedIOException e) { 208 } 209 } 210 } 211 212 /** 213 * @see org.apache.activemq.thread.Task#iterate() 214 */ 215 public boolean iterate() { 216 217 final TransportListener tl = transportListener; 218 219 LinkedBlockingQueue<Object> mq; 220 try { 221 mq = getMessageQueue(); 222 } catch (TransportDisposedIOException e) { 223 return false; 224 } 225 226 Object command = mq.poll(); 227 if (command != null && !disposed.get()) { 228 if( command == DISCONNECT ) { 229 tl.onException(new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed.")); 230 } else { 231 tl.onCommand(command); 232 } 233 return !mq.isEmpty() && !disposed.get(); 234 } else { 235 if(disposed.get()) { 236 mq.clear(); 237 } 238 return false; 239 } 240 } 241 242 public void setTransportListener(TransportListener commandListener) { 243 this.transportListener = commandListener; 244 } 245 246 public void setMessageQueue(LinkedBlockingQueue<Object> asyncQueue) { 247 synchronized (this) { 248 if (messageQueue == null) { 249 messageQueue = asyncQueue; 250 } 251 } 252 } 253 254 public LinkedBlockingQueue<Object> getMessageQueue() throws TransportDisposedIOException { 255 LinkedBlockingQueue<Object> result = messageQueue; 256 if (result == null) { 257 synchronized (this) { 258 result = messageQueue; 259 if (result == null) { 260 if (disposed.get()) { 261 throw new TransportDisposedIOException("The Transport has been disposed"); 262 } 263 264 messageQueue = result = new LinkedBlockingQueue<Object>(this.asyncQueueDepth); 265 } 266 } 267 } 268 return result; 269 } 270 271 protected TaskRunner getTaskRunner() throws TransportDisposedIOException { 272 TaskRunner result = taskRunner; 273 if (result == null) { 274 synchronized (this) { 275 result = taskRunner; 276 if (result == null) { 277 if (disposed.get()) { 278 throw new TransportDisposedIOException("The Transport has been disposed"); 279 } 280 281 taskRunner = result = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(this, "VMTransport: " + toString()); 282 } 283 } 284 } 285 return result; 286 } 287 288 public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException { 289 throw new AssertionError("Unsupported Method"); 290 } 291 292 public Object request(Object command) throws IOException { 293 throw new AssertionError("Unsupported Method"); 294 } 295 296 public Object request(Object command, int timeout) throws IOException { 297 throw new AssertionError("Unsupported Method"); 298 } 299 300 public TransportListener getTransportListener() { 301 return transportListener; 302 } 303 304 public <T> T narrow(Class<T> target) { 305 if (target.isAssignableFrom(getClass())) { 306 return target.cast(this); 307 } 308 return null; 309 } 310 311 public boolean isMarshal() { 312 return marshal; 313 } 314 315 public void setMarshal(boolean marshal) { 316 this.marshal = marshal; 317 } 318 319 public boolean isNetwork() { 320 return network; 321 } 322 323 public void setNetwork(boolean network) { 324 this.network = network; 325 } 326 327 @Override 328 public String toString() { 329 return location + "#" + id; 330 } 331 332 public String getRemoteAddress() { 333 if (peer != null) { 334 return peer.toString(); 335 } 336 return null; 337 } 338 339 /** 340 * @return the async 341 */ 342 public boolean isAsync() { 343 return async; 344 } 345 346 /** 347 * @param async the async to set 348 */ 349 public void setAsync(boolean async) { 350 this.async = async; 351 } 352 353 /** 354 * @return the asyncQueueDepth 355 */ 356 public int getAsyncQueueDepth() { 357 return asyncQueueDepth; 358 } 359 360 /** 361 * @param asyncQueueDepth the asyncQueueDepth to set 362 */ 363 public void setAsyncQueueDepth(int asyncQueueDepth) { 364 this.asyncQueueDepth = asyncQueueDepth; 365 } 366 367 public boolean isFaultTolerant() { 368 return false; 369 } 370 371 public boolean isDisposed() { 372 return disposed.get(); 373 } 374 375 public boolean isConnected() { 376 return !disposed.get(); 377 } 378 379 public void reconnect(URI uri) throws IOException { 380 throw new IOException("Transport reconnect is not supported"); 381 } 382 383 public boolean isReconnectSupported() { 384 return false; 385 } 386 387 public boolean isUpdateURIsSupported() { 388 return false; 389 } 390 391 public void updateURIs(boolean reblance,URI[] uris) throws IOException { 392 throw new IOException("URI update feature not supported"); 393 } 394 395 public int getReceiveCounter() { 396 return receiveCounter; 397 } 398 }