001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.vm;
018    
019    import java.io.IOException;
020    import java.io.InterruptedIOException;
021    import java.net.URI;
022    import java.util.concurrent.BlockingQueue;
023    import java.util.concurrent.LinkedBlockingQueue;
024    import java.util.concurrent.TimeUnit;
025    import java.util.concurrent.atomic.AtomicBoolean;
026    import java.util.concurrent.atomic.AtomicLong;
027    
028    import org.apache.activemq.command.ShutdownInfo;
029    import org.apache.activemq.thread.DefaultThreadPools;
030    import org.apache.activemq.thread.Task;
031    import org.apache.activemq.thread.TaskRunner;
032    import org.apache.activemq.transport.FutureResponse;
033    import org.apache.activemq.transport.ResponseCallback;
034    import org.apache.activemq.transport.Transport;
035    import org.apache.activemq.transport.TransportDisposedIOException;
036    import org.apache.activemq.transport.TransportListener;
037    
038    /**
039     * A Transport implementation that uses direct method invocations.
040     */
041    public class VMTransport implements Transport, Task {
042    
043        private static final Object DISCONNECT = new Object();
044        private static final AtomicLong NEXT_ID = new AtomicLong(0);
045    
046        // Transport Configuration
047        protected VMTransport peer;
048        protected TransportListener transportListener;
049        protected boolean marshal;
050        protected boolean network;
051        protected boolean async = true;
052        protected int asyncQueueDepth = 2000;
053        protected final URI location;
054        protected final long id;
055    
056        // Implementation
057        private LinkedBlockingQueue<Object> messageQueue;
058        private TaskRunner taskRunner;
059    
060        // Transport State
061        protected final AtomicBoolean started = new AtomicBoolean();
062        protected final AtomicBoolean disposed = new AtomicBoolean();
063    
064        private volatile int receiveCounter;
065    
066        public VMTransport(URI location) {
067            this.location = location;
068            this.id = NEXT_ID.getAndIncrement();
069        }
070    
071        public void setPeer(VMTransport peer) {
072            this.peer = peer;
073        }
074    
075        public void oneway(Object command) throws IOException {
076    
077            if (disposed.get()) {
078                throw new TransportDisposedIOException("Transport disposed.");
079            }
080    
081            if (peer == null) {
082                throw new IOException("Peer not connected.");
083            }
084    
085            try {
086    
087                if (peer.disposed.get()) {
088                    throw new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed.");
089                }
090    
091                if (peer.async || !peer.started.get()) {
092                    peer.getMessageQueue().put(command);
093                    peer.wakeup();
094                    return;
095                }
096    
097            } catch (InterruptedException e) {
098                InterruptedIOException iioe = new InterruptedIOException(e.getMessage());
099                iioe.initCause(e);
100                throw iioe;
101            }
102    
103            dispatch(peer, peer.messageQueue, command);
104        }
105    
106        public void dispatch(VMTransport transport, BlockingQueue<Object> pending, Object command) {
107            TransportListener transportListener = transport.getTransportListener();
108            if (transportListener != null) {
109                synchronized (started) {
110    
111                    // Ensure that no additional commands entered the queue in the small time window
112                    // before the start method locks the dispatch lock and the oneway method was in
113                    // an put operation.
114                    while(pending != null && !pending.isEmpty() && !transport.isDisposed()) {
115                        doDispatch(transport, transportListener, pending.poll());
116                    }
117    
118                    // We are now in sync mode and won't enqueue any more commands to the target
119                    // transport so lets clean up its resources.
120                    transport.messageQueue = null;
121    
122                    // Don't dispatch if either end was disposed already.
123                    if (command != null && !this.disposed.get() && !transport.isDisposed()) {
124                        doDispatch(transport, transportListener, command);
125                    }
126                }
127            }
128        }
129    
130        public void doDispatch(VMTransport transport, TransportListener transportListener, Object command) {
131            if (command == DISCONNECT) {
132                transportListener.onException(new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed."));
133            } else {
134                transport.receiveCounter++;
135                transportListener.onCommand(command);
136            }
137        }
138    
139        public void start() throws Exception {
140    
141            if (transportListener == null) {
142                throw new IOException("TransportListener not set.");
143            }
144    
145            // If we are not in async mode we lock the dispatch lock here and then start to
146            // prevent any sync dispatches from occurring until we dispatch the pending messages
147            // to maintain delivery order.  When async this happens automatically so just set
148            // started and wakeup the task runner.
149            if (!async) {
150                synchronized (started) {
151                    if (started.compareAndSet(false, true)) {
152                        LinkedBlockingQueue<Object> mq = getMessageQueue();
153                        Object command;
154                        while ((command = mq.poll()) != null && !disposed.get() ) {
155                            receiveCounter++;
156                            doDispatch(this, transportListener, command);
157                        }
158                    }
159                }
160            } else {
161                if (started.compareAndSet(false, true)) {
162                    wakeup();
163                }
164            }
165        }
166    
167        public void stop() throws Exception {
168            // Only need to do this once, all future oneway calls will now
169            // fail as will any asnyc jobs in the task runner.
170            if (disposed.compareAndSet(false, true)) {
171    
172                TaskRunner tr = taskRunner;
173                LinkedBlockingQueue<Object> mq = this.messageQueue;
174    
175                taskRunner = null;
176                messageQueue = null;
177    
178                if (mq != null) {
179                    mq.clear();
180                }
181    
182                // Allow pending deliveries to finish up, but don't wait
183                // forever in case of an stalled onCommand.
184                if (tr != null) {
185                    try {
186                        tr.shutdown(TimeUnit.SECONDS.toMillis(1));
187                    } catch(Exception e) {
188                    }
189                }
190    
191                // let the peer know that we are disconnecting after attempting
192                // to cleanly shutdown the async tasks so that this is the last
193                // command it see's.
194                try {
195                    peer.transportListener.onCommand(new ShutdownInfo());
196                } catch (Exception ignore) {
197                }
198            }
199        }
200    
201        protected void wakeup() {
202            if (async && started.get()) {
203                try {
204                    getTaskRunner().wakeup();
205                } catch (InterruptedException e) {
206                    Thread.currentThread().interrupt();
207                } catch (TransportDisposedIOException e) {
208                }
209            }
210        }
211    
212        /**
213         * @see org.apache.activemq.thread.Task#iterate()
214         */
215        public boolean iterate() {
216    
217            final TransportListener tl = transportListener;
218    
219            LinkedBlockingQueue<Object> mq;
220            try {
221                mq = getMessageQueue();
222            } catch (TransportDisposedIOException e) {
223                return false;
224            }
225    
226            Object command = mq.poll();
227            if (command != null && !disposed.get()) {
228                if( command == DISCONNECT ) {
229                    tl.onException(new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed."));
230                } else {
231                    tl.onCommand(command);
232                }
233                return !mq.isEmpty() && !disposed.get();
234            } else {
235                if(disposed.get()) {
236                    mq.clear();
237                }
238                return false;
239            }
240        }
241    
242        public void setTransportListener(TransportListener commandListener) {
243            this.transportListener = commandListener;
244        }
245    
246        public void setMessageQueue(LinkedBlockingQueue<Object> asyncQueue) {
247            synchronized (this) {
248                if (messageQueue == null) {
249                    messageQueue = asyncQueue;
250                }
251            }
252        }
253    
254        public LinkedBlockingQueue<Object> getMessageQueue() throws TransportDisposedIOException {
255            LinkedBlockingQueue<Object> result = messageQueue;
256            if (result == null) {
257                synchronized (this) {
258                    result = messageQueue;
259                    if (result == null) {
260                        if (disposed.get()) {
261                            throw new TransportDisposedIOException("The Transport has been disposed");
262                        }
263    
264                        messageQueue = result = new LinkedBlockingQueue<Object>(this.asyncQueueDepth);
265                    }
266                }
267            }
268            return result;
269        }
270    
271        protected TaskRunner getTaskRunner() throws TransportDisposedIOException {
272            TaskRunner result = taskRunner;
273            if (result == null) {
274                synchronized (this) {
275                    result = taskRunner;
276                    if (result == null) {
277                        if (disposed.get()) {
278                            throw new TransportDisposedIOException("The Transport has been disposed");
279                        }
280    
281                        taskRunner = result = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(this, "VMTransport: " + toString());
282                    }
283                }
284            }
285            return result;
286        }
287    
288        public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
289            throw new AssertionError("Unsupported Method");
290        }
291    
292        public Object request(Object command) throws IOException {
293            throw new AssertionError("Unsupported Method");
294        }
295    
296        public Object request(Object command, int timeout) throws IOException {
297            throw new AssertionError("Unsupported Method");
298        }
299    
300        public TransportListener getTransportListener() {
301            return transportListener;
302        }
303    
304        public <T> T narrow(Class<T> target) {
305            if (target.isAssignableFrom(getClass())) {
306                return target.cast(this);
307            }
308            return null;
309        }
310    
311        public boolean isMarshal() {
312            return marshal;
313        }
314    
315        public void setMarshal(boolean marshal) {
316            this.marshal = marshal;
317        }
318    
319        public boolean isNetwork() {
320            return network;
321        }
322    
323        public void setNetwork(boolean network) {
324            this.network = network;
325        }
326    
327        @Override
328        public String toString() {
329            return location + "#" + id;
330        }
331    
332        public String getRemoteAddress() {
333            if (peer != null) {
334                return peer.toString();
335            }
336            return null;
337        }
338    
339        /**
340         * @return the async
341         */
342        public boolean isAsync() {
343            return async;
344        }
345    
346        /**
347         * @param async the async to set
348         */
349        public void setAsync(boolean async) {
350            this.async = async;
351        }
352    
353        /**
354         * @return the asyncQueueDepth
355         */
356        public int getAsyncQueueDepth() {
357            return asyncQueueDepth;
358        }
359    
360        /**
361         * @param asyncQueueDepth the asyncQueueDepth to set
362         */
363        public void setAsyncQueueDepth(int asyncQueueDepth) {
364            this.asyncQueueDepth = asyncQueueDepth;
365        }
366    
367        public boolean isFaultTolerant() {
368            return false;
369        }
370    
371        public boolean isDisposed() {
372            return disposed.get();
373        }
374    
375        public boolean isConnected() {
376            return !disposed.get();
377        }
378    
379        public void reconnect(URI uri) throws IOException {
380            throw new IOException("Transport reconnect is not supported");
381        }
382    
383        public boolean isReconnectSupported() {
384            return false;
385        }
386    
387        public boolean isUpdateURIsSupported() {
388            return false;
389        }
390    
391        public void updateURIs(boolean reblance,URI[] uris) throws IOException {
392            throw new IOException("URI update feature not supported");
393        }
394    
395        public int getReceiveCounter() {
396            return receiveCounter;
397        }
398    }