001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018 package org.apache.activemq.transport.mqtt; 019 020 import java.io.IOException; 021 import java.util.Timer; 022 import java.util.concurrent.SynchronousQueue; 023 import java.util.concurrent.ThreadFactory; 024 import java.util.concurrent.ThreadPoolExecutor; 025 import java.util.concurrent.TimeUnit; 026 import java.util.concurrent.atomic.AtomicBoolean; 027 import java.util.concurrent.atomic.AtomicInteger; 028 import java.util.concurrent.locks.ReentrantReadWriteLock; 029 030 import org.apache.activemq.command.KeepAliveInfo; 031 import org.apache.activemq.thread.SchedulerTimerTask; 032 import org.apache.activemq.transport.AbstractInactivityMonitor; 033 import org.apache.activemq.transport.InactivityIOException; 034 import org.apache.activemq.transport.Transport; 035 import org.apache.activemq.transport.TransportFilter; 036 import org.apache.activemq.wireformat.WireFormat; 037 import org.slf4j.Logger; 038 import org.slf4j.LoggerFactory; 039 040 public class MQTTInactivityMonitor extends TransportFilter { 041 042 private static final Logger LOG = LoggerFactory.getLogger(MQTTInactivityMonitor.class); 043 044 private static ThreadPoolExecutor ASYNC_TASKS; 045 private static int CHECKER_COUNTER; 046 private static long DEFAULT_CHECK_TIME_MILLS = 30000; 047 private static Timer READ_CHECK_TIMER; 048 049 private final AtomicBoolean monitorStarted = new AtomicBoolean(false); 050 051 private final AtomicBoolean commandSent = new AtomicBoolean(false); 052 private final AtomicBoolean inSend = new AtomicBoolean(false); 053 private final AtomicBoolean failed = new AtomicBoolean(false); 054 055 private final AtomicBoolean commandReceived = new AtomicBoolean(true); 056 private final AtomicBoolean inReceive = new AtomicBoolean(false); 057 private final AtomicInteger lastReceiveCounter = new AtomicInteger(0); 058 059 private final ReentrantReadWriteLock sendLock = new ReentrantReadWriteLock(); 060 private SchedulerTimerTask readCheckerTask; 061 062 private long readCheckTime = DEFAULT_CHECK_TIME_MILLS; 063 private long initialDelayTime = DEFAULT_CHECK_TIME_MILLS; 064 private boolean keepAliveResponseRequired; 065 private MQTTProtocolConverter protocolConverter; 066 067 068 private final Runnable readChecker = new Runnable() { 069 long lastRunTime; 070 071 public void run() { 072 long now = System.currentTimeMillis(); 073 long elapsed = (now - lastRunTime); 074 075 if (lastRunTime != 0 && LOG.isDebugEnabled()) { 076 LOG.debug("" + elapsed + " ms elapsed since last read check."); 077 } 078 079 // Perhaps the timer executed a read check late.. and then executes 080 // the next read check on time which causes the time elapsed between 081 // read checks to be small.. 082 083 // If less than 90% of the read check Time elapsed then abort this readcheck. 084 if (!allowReadCheck(elapsed)) { // FUNKY qdox bug does not allow me to inline this expression. 085 LOG.debug("Aborting read check.. Not enough time elapsed since last read check."); 086 return; 087 } 088 089 lastRunTime = now; 090 readCheck(); 091 } 092 }; 093 094 private boolean allowReadCheck(long elapsed) { 095 return elapsed > (readCheckTime * 9 / 10); 096 } 097 098 099 public MQTTInactivityMonitor(Transport next, WireFormat wireFormat) { 100 super(next); 101 } 102 103 public void start() throws Exception { 104 next.start(); 105 startMonitorThread(); 106 } 107 108 public void stop() throws Exception { 109 stopMonitorThread(); 110 next.stop(); 111 } 112 113 114 final void readCheck() { 115 int currentCounter = next.getReceiveCounter(); 116 int previousCounter = lastReceiveCounter.getAndSet(currentCounter); 117 if (inReceive.get() || currentCounter != previousCounter) { 118 if (LOG.isTraceEnabled()) { 119 LOG.trace("A receive is in progress"); 120 } 121 return; 122 } 123 if (!commandReceived.get() && monitorStarted.get() && !ASYNC_TASKS.isTerminating()) { 124 if (LOG.isDebugEnabled()) { 125 LOG.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException."); 126 } 127 ASYNC_TASKS.execute(new Runnable() { 128 public void run() { 129 if (protocolConverter != null) { 130 protocolConverter.onTransportError(); 131 } 132 onException(new InactivityIOException("Channel was inactive for too (>" + readCheckTime + ") long: " + next.getRemoteAddress())); 133 } 134 135 ; 136 }); 137 } else { 138 if (LOG.isTraceEnabled()) { 139 LOG.trace("Message received since last read check, resetting flag: "); 140 } 141 } 142 commandReceived.set(false); 143 } 144 145 146 public void onCommand(Object command) { 147 commandReceived.set(true); 148 inReceive.set(true); 149 try { 150 if (command.getClass() == KeepAliveInfo.class) { 151 KeepAliveInfo info = (KeepAliveInfo) command; 152 if (info.isResponseRequired()) { 153 sendLock.readLock().lock(); 154 try { 155 info.setResponseRequired(false); 156 oneway(info); 157 } catch (IOException e) { 158 onException(e); 159 } finally { 160 sendLock.readLock().unlock(); 161 } 162 } 163 } else { 164 transportListener.onCommand(command); 165 } 166 } finally { 167 inReceive.set(false); 168 } 169 } 170 171 public void oneway(Object o) throws IOException { 172 // To prevent the inactivity monitor from sending a message while we 173 // are performing a send we take a read lock. The inactivity monitor 174 // sends its Heart-beat commands under a write lock. This means that 175 // the MutexTransport is still responsible for synchronizing sends 176 this.sendLock.readLock().lock(); 177 inSend.set(true); 178 try { 179 doOnewaySend(o); 180 } finally { 181 commandSent.set(true); 182 inSend.set(false); 183 this.sendLock.readLock().unlock(); 184 } 185 } 186 187 // Must be called under lock, either read or write on sendLock. 188 private void doOnewaySend(Object command) throws IOException { 189 if (failed.get()) { 190 throw new InactivityIOException("Cannot send, channel has already failed: " + next.getRemoteAddress()); 191 } 192 next.oneway(command); 193 } 194 195 public void onException(IOException error) { 196 if (failed.compareAndSet(false, true)) { 197 stopMonitorThread(); 198 transportListener.onException(error); 199 } 200 } 201 202 203 public long getReadCheckTime() { 204 return readCheckTime; 205 } 206 207 public void setReadCheckTime(long readCheckTime) { 208 this.readCheckTime = readCheckTime; 209 } 210 211 212 public long getInitialDelayTime() { 213 return initialDelayTime; 214 } 215 216 public void setInitialDelayTime(long initialDelayTime) { 217 this.initialDelayTime = initialDelayTime; 218 } 219 220 public boolean isKeepAliveResponseRequired() { 221 return this.keepAliveResponseRequired; 222 } 223 224 public void setKeepAliveResponseRequired(boolean value) { 225 this.keepAliveResponseRequired = value; 226 } 227 228 public boolean isMonitorStarted() { 229 return this.monitorStarted.get(); 230 } 231 232 public void setProtocolConverter(MQTTProtocolConverter protocolConverter) { 233 this.protocolConverter = protocolConverter; 234 } 235 236 public MQTTProtocolConverter getProtocolConverter() { 237 return protocolConverter; 238 } 239 240 synchronized void startMonitorThread() { 241 if (monitorStarted.get()) { 242 return; 243 } 244 245 246 if (readCheckTime > 0) { 247 readCheckerTask = new SchedulerTimerTask(readChecker); 248 } 249 250 251 if (readCheckTime > 0) { 252 monitorStarted.set(true); 253 synchronized (AbstractInactivityMonitor.class) { 254 if (CHECKER_COUNTER == 0) { 255 ASYNC_TASKS = createExecutor(); 256 READ_CHECK_TIMER = new Timer("InactivityMonitor ReadCheck", true); 257 } 258 CHECKER_COUNTER++; 259 if (readCheckTime > 0) { 260 READ_CHECK_TIMER.schedule(readCheckerTask, initialDelayTime, readCheckTime); 261 } 262 } 263 } 264 } 265 266 267 synchronized void stopMonitorThread() { 268 if (monitorStarted.compareAndSet(true, false)) { 269 if (readCheckerTask != null) { 270 readCheckerTask.cancel(); 271 } 272 273 synchronized (AbstractInactivityMonitor.class) { 274 READ_CHECK_TIMER.purge(); 275 CHECKER_COUNTER--; 276 if (CHECKER_COUNTER == 0) { 277 READ_CHECK_TIMER.cancel(); 278 READ_CHECK_TIMER = null; 279 ASYNC_TASKS.shutdown(); 280 ASYNC_TASKS = null; 281 } 282 } 283 } 284 } 285 286 private ThreadFactory factory = new ThreadFactory() { 287 public Thread newThread(Runnable runnable) { 288 Thread thread = new Thread(runnable, "MQTTInactivityMonitor Async Task: " + runnable); 289 thread.setDaemon(true); 290 return thread; 291 } 292 }; 293 294 private ThreadPoolExecutor createExecutor() { 295 ThreadPoolExecutor exec = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), factory); 296 exec.allowCoreThreadTimeOut(true); 297 return exec; 298 } 299 } 300