001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.transport.reliable; 018 019 import java.io.IOException; 020 import java.util.SortedSet; 021 import java.util.TreeSet; 022 023 import org.apache.activemq.command.Command; 024 import org.apache.activemq.command.ReplayCommand; 025 import org.apache.activemq.command.Response; 026 import org.apache.activemq.openwire.CommandIdComparator; 027 import org.apache.activemq.transport.FutureResponse; 028 import org.apache.activemq.transport.ResponseCorrelator; 029 import org.apache.activemq.transport.Transport; 030 import org.apache.activemq.transport.udp.UdpTransport; 031 import org.slf4j.Logger; 032 import org.slf4j.LoggerFactory; 033 034 /** 035 * This interceptor deals with out of order commands together with being able to 036 * handle dropped commands and the re-requesting dropped commands. 037 * 038 * 039 */ 040 public class ReliableTransport extends ResponseCorrelator { 041 private static final Logger LOG = LoggerFactory.getLogger(ReliableTransport.class); 042 043 private ReplayStrategy replayStrategy; 044 private SortedSet<Command> commands = new TreeSet<Command>(new CommandIdComparator()); 045 private int expectedCounter = 1; 046 private int replayBufferCommandCount = 50; 047 private int requestTimeout = 2000; 048 private ReplayBuffer replayBuffer; 049 private Replayer replayer; 050 private UdpTransport udpTransport; 051 052 public ReliableTransport(Transport next, ReplayStrategy replayStrategy) { 053 super(next); 054 this.replayStrategy = replayStrategy; 055 } 056 057 public ReliableTransport(Transport next, UdpTransport udpTransport) throws IOException { 058 super(next, udpTransport.getSequenceGenerator()); 059 this.udpTransport = udpTransport; 060 this.replayer = udpTransport.createReplayer(); 061 } 062 063 /** 064 * Requests that a range of commands be replayed 065 */ 066 public void requestReplay(int fromCommandId, int toCommandId) { 067 ReplayCommand replay = new ReplayCommand(); 068 replay.setFirstNakNumber(fromCommandId); 069 replay.setLastNakNumber(toCommandId); 070 try { 071 oneway(replay); 072 } catch (IOException e) { 073 getTransportListener().onException(e); 074 } 075 } 076 077 public Object request(Object o) throws IOException { 078 final Command command = (Command)o; 079 FutureResponse response = asyncRequest(command, null); 080 while (true) { 081 Response result = response.getResult(requestTimeout); 082 if (result != null) { 083 return result; 084 } 085 onMissingResponse(command, response); 086 } 087 } 088 089 public Object request(Object o, int timeout) throws IOException { 090 final Command command = (Command)o; 091 FutureResponse response = asyncRequest(command, null); 092 while (timeout > 0) { 093 int time = timeout; 094 if (timeout > requestTimeout) { 095 time = requestTimeout; 096 } 097 Response result = response.getResult(time); 098 if (result != null) { 099 return result; 100 } 101 onMissingResponse(command, response); 102 timeout -= time; 103 } 104 return response.getResult(0); 105 } 106 107 public void onCommand(Object o) { 108 Command command = (Command)o; 109 // lets pass wireformat through 110 if (command.isWireFormatInfo()) { 111 super.onCommand(command); 112 return; 113 } else if (command.getDataStructureType() == ReplayCommand.DATA_STRUCTURE_TYPE) { 114 replayCommands((ReplayCommand)command); 115 return; 116 } 117 118 int actualCounter = command.getCommandId(); 119 boolean valid = expectedCounter == actualCounter; 120 121 if (!valid) { 122 synchronized (commands) { 123 int nextCounter = actualCounter; 124 boolean empty = commands.isEmpty(); 125 if (!empty) { 126 Command nextAvailable = commands.first(); 127 nextCounter = nextAvailable.getCommandId(); 128 } 129 130 try { 131 boolean keep = replayStrategy.onDroppedPackets(this, expectedCounter, actualCounter, nextCounter); 132 133 if (keep) { 134 // lets add it to the list for later on 135 if (LOG.isDebugEnabled()) { 136 LOG.debug("Received out of order command which is being buffered for later: " + command); 137 } 138 commands.add(command); 139 } 140 } catch (IOException e) { 141 onException(e); 142 } 143 144 if (!empty) { 145 // lets see if the first item in the set is the next 146 // expected 147 command = commands.first(); 148 valid = expectedCounter == command.getCommandId(); 149 if (valid) { 150 commands.remove(command); 151 } 152 } 153 } 154 } 155 156 while (valid) { 157 // we've got a valid header so increment counter 158 replayStrategy.onReceivedPacket(this, expectedCounter); 159 expectedCounter++; 160 super.onCommand(command); 161 162 synchronized (commands) { 163 // we could have more commands left 164 valid = !commands.isEmpty(); 165 if (valid) { 166 // lets see if the first item in the set is the next 167 // expected 168 command = commands.first(); 169 valid = expectedCounter == command.getCommandId(); 170 if (valid) { 171 commands.remove(command); 172 } 173 } 174 } 175 } 176 } 177 178 public int getBufferedCommandCount() { 179 synchronized (commands) { 180 return commands.size(); 181 } 182 } 183 184 public int getExpectedCounter() { 185 return expectedCounter; 186 } 187 188 /** 189 * This property should never really be set - but is mutable primarily for 190 * test cases 191 */ 192 public void setExpectedCounter(int expectedCounter) { 193 this.expectedCounter = expectedCounter; 194 } 195 196 public int getRequestTimeout() { 197 return requestTimeout; 198 } 199 200 /** 201 * Sets the default timeout of requests before starting to request commands 202 * are replayed 203 */ 204 public void setRequestTimeout(int requestTimeout) { 205 this.requestTimeout = requestTimeout; 206 } 207 208 public ReplayStrategy getReplayStrategy() { 209 return replayStrategy; 210 } 211 212 public ReplayBuffer getReplayBuffer() { 213 if (replayBuffer == null) { 214 replayBuffer = createReplayBuffer(); 215 } 216 return replayBuffer; 217 } 218 219 public void setReplayBuffer(ReplayBuffer replayBuffer) { 220 this.replayBuffer = replayBuffer; 221 } 222 223 public int getReplayBufferCommandCount() { 224 return replayBufferCommandCount; 225 } 226 227 /** 228 * Sets the default number of commands which are buffered 229 */ 230 public void setReplayBufferCommandCount(int replayBufferSize) { 231 this.replayBufferCommandCount = replayBufferSize; 232 } 233 234 public void setReplayStrategy(ReplayStrategy replayStrategy) { 235 this.replayStrategy = replayStrategy; 236 } 237 238 public Replayer getReplayer() { 239 return replayer; 240 } 241 242 public void setReplayer(Replayer replayer) { 243 this.replayer = replayer; 244 } 245 246 public String toString() { 247 return next.toString(); 248 } 249 250 public void start() throws Exception { 251 if (udpTransport != null) { 252 udpTransport.setReplayBuffer(getReplayBuffer()); 253 } 254 if (replayStrategy == null) { 255 throw new IllegalArgumentException("Property replayStrategy not specified"); 256 } 257 super.start(); 258 } 259 260 /** 261 * Lets attempt to replay the request as a command may have disappeared 262 */ 263 protected void onMissingResponse(Command command, FutureResponse response) { 264 LOG.debug("Still waiting for response on: " + this + " to command: " + command + " sending replay message"); 265 266 int commandId = command.getCommandId(); 267 requestReplay(commandId, commandId); 268 } 269 270 protected ReplayBuffer createReplayBuffer() { 271 return new DefaultReplayBuffer(getReplayBufferCommandCount()); 272 } 273 274 protected void replayCommands(ReplayCommand command) { 275 try { 276 if (replayer == null) { 277 onException(new IOException("Cannot replay commands. No replayer property configured")); 278 } 279 if (LOG.isDebugEnabled()) { 280 LOG.debug("Processing replay command: " + command); 281 } 282 getReplayBuffer().replayMessages(command.getFirstNakNumber(), command.getLastNakNumber(), replayer); 283 284 // TODO we could proactively remove ack'd stuff from the replay 285 // buffer 286 // if we only have a single client talking to us 287 } catch (IOException e) { 288 onException(e); 289 } 290 } 291 292 }