001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.state;
018
019import java.io.IOException;
020import java.util.Iterator;
021import java.util.LinkedHashMap;
022import java.util.Map;
023import java.util.Vector;
024import java.util.Map.Entry;
025import java.util.concurrent.ConcurrentHashMap;
026
027import javax.jms.TransactionRolledBackException;
028import javax.transaction.xa.XAResource;
029
030import org.apache.activemq.command.Command;
031import org.apache.activemq.command.ConnectionId;
032import org.apache.activemq.command.ConnectionInfo;
033import org.apache.activemq.command.ConsumerControl;
034import org.apache.activemq.command.ConsumerId;
035import org.apache.activemq.command.ConsumerInfo;
036import org.apache.activemq.command.DestinationInfo;
037import org.apache.activemq.command.ExceptionResponse;
038import org.apache.activemq.command.IntegerResponse;
039import org.apache.activemq.command.Message;
040import org.apache.activemq.command.MessagePull;
041import org.apache.activemq.command.ProducerId;
042import org.apache.activemq.command.ProducerInfo;
043import org.apache.activemq.command.Response;
044import org.apache.activemq.command.SessionId;
045import org.apache.activemq.command.SessionInfo;
046import org.apache.activemq.command.TransactionInfo;
047import org.apache.activemq.transport.Transport;
048import org.apache.activemq.util.IOExceptionSupport;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052/**
053 * Tracks the state of a connection so a newly established transport can be
054 * re-initialized to the state that was tracked.
055 * 
056 * 
057 */
058public class ConnectionStateTracker extends CommandVisitorAdapter {
059    private static final Logger LOG = LoggerFactory.getLogger(ConnectionStateTracker.class);
060
061    private static final Tracked TRACKED_RESPONSE_MARKER = new Tracked(null);
062    private static final int MESSAGE_PULL_SIZE = 400;
063    protected final ConcurrentHashMap<ConnectionId, ConnectionState> connectionStates = new ConcurrentHashMap<ConnectionId, ConnectionState>(); 
064
065    private boolean trackTransactions;
066    private boolean restoreSessions = true;
067    private boolean restoreConsumers = true;
068    private boolean restoreProducers = true;
069    private boolean restoreTransaction = true;
070    private boolean trackMessages = true;
071    private boolean trackTransactionProducers = true;
072    private int maxCacheSize = 128 * 1024;
073    private int currentCacheSize;
074    private Map<Object,Command> messageCache = new LinkedHashMap<Object,Command>(){
075        protected boolean removeEldestEntry(Map.Entry<Object,Command> eldest) {
076            boolean result = currentCacheSize > maxCacheSize;
077            if (result) {
078                if (eldest.getValue() instanceof Message) {
079                    currentCacheSize -= ((Message)eldest.getValue()).getSize();
080                } else if (eldest.getValue() instanceof MessagePull) {
081                    currentCacheSize -= MESSAGE_PULL_SIZE;
082                }
083                if (LOG.isTraceEnabled()) {
084                    LOG.trace("removing tracked message: " + eldest.getKey());
085                }
086            }
087            return result;
088        }
089    };
090    
091    private class RemoveTransactionAction implements ResponseHandler {
092        private final TransactionInfo info;
093
094        public RemoveTransactionAction(TransactionInfo info) {
095            this.info = info;
096        }
097
098        public void onResponse(Command response) {
099            ConnectionId connectionId = info.getConnectionId();
100            ConnectionState cs = connectionStates.get(connectionId);
101            cs.removeTransactionState(info.getTransactionId());
102        }
103    }
104    
105    private class PrepareReadonlyTransactionAction extends RemoveTransactionAction {
106
107        public PrepareReadonlyTransactionAction(TransactionInfo info) {
108            super(info);
109        }
110
111        public void onResponse(Command command) {
112            IntegerResponse response = (IntegerResponse) command;
113            if (XAResource.XA_RDONLY == response.getResult()) {
114                // all done, no commit or rollback from TM
115                super.onResponse(command);
116            }
117        }
118    }
119
120    /**
121     * 
122     * 
123     * @param command
124     * @return null if the command is not state tracked.
125     * @throws IOException
126     */
127    public Tracked track(Command command) throws IOException {
128        try {
129            return (Tracked)command.visit(this);
130        } catch (IOException e) {
131            throw e;
132        } catch (Throwable e) {
133            throw IOExceptionSupport.create(e);
134        }
135    }
136    
137    public void trackBack(Command command) {
138        if (command != null) {
139            if (trackMessages && command.isMessage()) {
140                Message message = (Message) command;
141                if (message.getTransactionId()==null) {
142                    currentCacheSize = currentCacheSize +  message.getSize();
143                }
144            } else if (command instanceof MessagePull) {
145                // just needs to be a rough estimate of size, ~4 identifiers
146                currentCacheSize += MESSAGE_PULL_SIZE;
147            }
148        }
149    }
150
151    public void restore(Transport transport) throws IOException {
152        // Restore the connections.
153        for (Iterator<ConnectionState> iter = connectionStates.values().iterator(); iter.hasNext();) {
154            ConnectionState connectionState = iter.next();
155            connectionState.getInfo().setFailoverReconnect(true);
156            if (LOG.isDebugEnabled()) {
157                LOG.debug("conn: " + connectionState.getInfo().getConnectionId());
158            }
159            transport.oneway(connectionState.getInfo());
160            restoreTempDestinations(transport, connectionState);
161
162            if (restoreSessions) {
163                restoreSessions(transport, connectionState);
164            }
165
166            if (restoreTransaction) {
167                restoreTransactions(transport, connectionState);
168            }
169        }
170        //now flush messages
171        for (Command msg:messageCache.values()) {
172            if (LOG.isDebugEnabled()) {
173                LOG.debug("command: " + (msg.isMessage() ? ((Message) msg).getMessageId() : msg));
174            }
175            transport.oneway(msg);
176        }
177    }
178
179    private void restoreTransactions(Transport transport, ConnectionState connectionState) throws IOException {
180        Vector<TransactionInfo> toRollback = new Vector<TransactionInfo>();
181        for (TransactionState transactionState : connectionState.getTransactionStates()) {
182            if (LOG.isDebugEnabled()) {
183                LOG.debug("tx: " + transactionState.getId());
184            }
185            
186            // rollback any completed transactions - no way to know if commit got there
187            // or if reply went missing
188            //
189            if (!transactionState.getCommands().isEmpty()) {
190                Command lastCommand = transactionState.getCommands().get(transactionState.getCommands().size() - 1);
191                if (lastCommand instanceof TransactionInfo) {
192                    TransactionInfo transactionInfo = (TransactionInfo) lastCommand;
193                    if (transactionInfo.getType() == TransactionInfo.COMMIT_ONE_PHASE) {
194                        if (LOG.isDebugEnabled()) {
195                            LOG.debug("rolling back potentially completed tx: " + transactionState.getId());
196                        }
197                        toRollback.add(transactionInfo);
198                        continue;
199                    }
200                }
201            }
202            
203            // replay short lived producers that may have been involved in the transaction
204            for (ProducerState producerState : transactionState.getProducerStates().values()) {
205                if (LOG.isDebugEnabled()) {
206                    LOG.debug("tx replay producer :" + producerState.getInfo());
207                }
208                transport.oneway(producerState.getInfo());
209            }
210            
211            for (Command command : transactionState.getCommands()) {
212                if (LOG.isDebugEnabled()) {
213                    LOG.debug("tx replay: " + command);
214                }
215                transport.oneway(command);
216            }
217            
218            for (ProducerState producerState : transactionState.getProducerStates().values()) {
219                if (LOG.isDebugEnabled()) {
220                    LOG.debug("tx remove replayed producer :" + producerState.getInfo());
221                }
222                transport.oneway(producerState.getInfo().createRemoveCommand());
223            }
224        }
225        
226        for (TransactionInfo command: toRollback) {
227            // respond to the outstanding commit
228            ExceptionResponse response = new ExceptionResponse();
229            response.setException(new TransactionRolledBackException("Transaction completion in doubt due to failover. Forcing rollback of " + command.getTransactionId()));
230            response.setCorrelationId(command.getCommandId());
231            transport.getTransportListener().onCommand(response);
232        }
233    }
234
235    /**
236     * @param transport
237     * @param connectionState
238     * @throws IOException
239     */
240    protected void restoreSessions(Transport transport, ConnectionState connectionState) throws IOException {
241        // Restore the connection's sessions
242        for (Iterator iter2 = connectionState.getSessionStates().iterator(); iter2.hasNext();) {
243            SessionState sessionState = (SessionState)iter2.next();
244            if (LOG.isDebugEnabled()) {
245                LOG.debug("session: " + sessionState.getInfo().getSessionId());
246            }
247            transport.oneway(sessionState.getInfo());
248
249            if (restoreProducers) {
250                restoreProducers(transport, sessionState);
251            }
252
253            if (restoreConsumers) {
254                restoreConsumers(transport, sessionState);
255            }
256        }
257    }
258
259    /**
260     * @param transport
261     * @param sessionState
262     * @throws IOException
263     */
264    protected void restoreConsumers(Transport transport, SessionState sessionState) throws IOException {
265        // Restore the session's consumers but possibly in pull only (prefetch 0 state) till recovery complete
266        final ConnectionState connectionState = connectionStates.get(sessionState.getInfo().getSessionId().getParentId());
267        final boolean connectionInterruptionProcessingComplete = connectionState.isConnectionInterruptProcessingComplete();
268        for (ConsumerState consumerState : sessionState.getConsumerStates()) {   
269            ConsumerInfo infoToSend = consumerState.getInfo();
270            if (!connectionInterruptionProcessingComplete && infoToSend.getPrefetchSize() > 0) {
271                infoToSend = consumerState.getInfo().copy();
272                connectionState.getRecoveringPullConsumers().put(infoToSend.getConsumerId(), consumerState.getInfo());
273                infoToSend.setPrefetchSize(0);
274                if (LOG.isDebugEnabled()) {
275                    LOG.debug("restore consumer: " + infoToSend.getConsumerId() + " in pull mode pending recovery, overriding prefetch: " + consumerState.getInfo().getPrefetchSize());
276                }
277            }
278            if (LOG.isDebugEnabled()) {
279                LOG.debug("restore consumer: " + infoToSend.getConsumerId());
280            }
281            transport.oneway(infoToSend);
282        }
283    }
284
285    /**
286     * @param transport
287     * @param sessionState
288     * @throws IOException
289     */
290    protected void restoreProducers(Transport transport, SessionState sessionState) throws IOException {
291        // Restore the session's producers
292        for (Iterator iter3 = sessionState.getProducerStates().iterator(); iter3.hasNext();) {
293            ProducerState producerState = (ProducerState)iter3.next();
294            if (LOG.isDebugEnabled()) {
295                LOG.debug("producer: " + producerState.getInfo().getProducerId());
296            }
297            transport.oneway(producerState.getInfo());
298        }
299    }
300
301    /**
302     * @param transport
303     * @param connectionState
304     * @throws IOException
305     */
306    protected void restoreTempDestinations(Transport transport, ConnectionState connectionState)
307        throws IOException {
308        // Restore the connection's temp destinations.
309        for (Iterator iter2 = connectionState.getTempDestinations().iterator(); iter2.hasNext();) {
310            transport.oneway((DestinationInfo)iter2.next());
311        }
312    }
313
314    public Response processAddDestination(DestinationInfo info) {
315        if (info != null) {
316            ConnectionState cs = connectionStates.get(info.getConnectionId());
317            if (cs != null && info.getDestination().isTemporary()) {
318                cs.addTempDestination(info);
319            }
320        }
321        return TRACKED_RESPONSE_MARKER;
322    }
323
324    public Response processRemoveDestination(DestinationInfo info) {
325        if (info != null) {
326            ConnectionState cs = connectionStates.get(info.getConnectionId());
327            if (cs != null && info.getDestination().isTemporary()) {
328                cs.removeTempDestination(info.getDestination());
329            }
330        }
331        return TRACKED_RESPONSE_MARKER;
332    }
333
334    public Response processAddProducer(ProducerInfo info) {
335        if (info != null && info.getProducerId() != null) {
336            SessionId sessionId = info.getProducerId().getParentId();
337            if (sessionId != null) {
338                ConnectionId connectionId = sessionId.getParentId();
339                if (connectionId != null) {
340                    ConnectionState cs = connectionStates.get(connectionId);
341                    if (cs != null) {
342                        SessionState ss = cs.getSessionState(sessionId);
343                        if (ss != null) {
344                            ss.addProducer(info);
345                        }
346                    }
347                }
348            }
349        }
350        return TRACKED_RESPONSE_MARKER;
351    }
352
353    public Response processRemoveProducer(ProducerId id) {
354        if (id != null) {
355            SessionId sessionId = id.getParentId();
356            if (sessionId != null) {
357                ConnectionId connectionId = sessionId.getParentId();
358                if (connectionId != null) {
359                    ConnectionState cs = connectionStates.get(connectionId);
360                    if (cs != null) {
361                        SessionState ss = cs.getSessionState(sessionId);
362                        if (ss != null) {
363                            ss.removeProducer(id);
364                        }
365                    }
366                }
367            }
368        }
369        return TRACKED_RESPONSE_MARKER;
370    }
371
372    public Response processAddConsumer(ConsumerInfo info) {
373        if (info != null) {
374            SessionId sessionId = info.getConsumerId().getParentId();
375            if (sessionId != null) {
376                ConnectionId connectionId = sessionId.getParentId();
377                if (connectionId != null) {
378                    ConnectionState cs = connectionStates.get(connectionId);
379                    if (cs != null) {
380                        SessionState ss = cs.getSessionState(sessionId);
381                        if (ss != null) {
382                            ss.addConsumer(info);
383                        }
384                    }
385                }
386            }
387        }
388        return TRACKED_RESPONSE_MARKER;
389    }
390
391    public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) {
392        if (id != null) {
393            SessionId sessionId = id.getParentId();
394            if (sessionId != null) {
395                ConnectionId connectionId = sessionId.getParentId();
396                if (connectionId != null) {
397                    ConnectionState cs = connectionStates.get(connectionId);
398                    if (cs != null) {
399                        SessionState ss = cs.getSessionState(sessionId);
400                        if (ss != null) {
401                            ss.removeConsumer(id);
402                        }
403                    }
404                }
405            }
406        }
407        return TRACKED_RESPONSE_MARKER;
408    }
409
410    public Response processAddSession(SessionInfo info) {
411        if (info != null) {
412            ConnectionId connectionId = info.getSessionId().getParentId();
413            if (connectionId != null) {
414                ConnectionState cs = connectionStates.get(connectionId);
415                if (cs != null) {
416                    cs.addSession(info);
417                }
418            }
419        }
420        return TRACKED_RESPONSE_MARKER;
421    }
422
423    public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) {
424        if (id != null) {
425            ConnectionId connectionId = id.getParentId();
426            if (connectionId != null) {
427                ConnectionState cs = connectionStates.get(connectionId);
428                if (cs != null) {
429                    cs.removeSession(id);
430                }
431            }
432        }
433        return TRACKED_RESPONSE_MARKER;
434    }
435
436    public Response processAddConnection(ConnectionInfo info) {
437        if (info != null) {
438            connectionStates.put(info.getConnectionId(), new ConnectionState(info));
439        }
440        return TRACKED_RESPONSE_MARKER;
441    }
442
443    public Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) throws Exception {
444        if (id != null) {
445            connectionStates.remove(id);
446        }
447        return TRACKED_RESPONSE_MARKER;
448    }
449
450    public Response processMessage(Message send) throws Exception {
451        if (send != null) {
452            if (trackTransactions && send.getTransactionId() != null) {
453                ProducerId producerId = send.getProducerId();
454                ConnectionId connectionId = producerId.getParentId().getParentId();
455                if (connectionId != null) {
456                    ConnectionState cs = connectionStates.get(connectionId);
457                    if (cs != null) {
458                        TransactionState transactionState = cs.getTransactionState(send.getTransactionId());
459                        if (transactionState != null) {
460                            transactionState.addCommand(send);
461                            
462                            if (trackTransactionProducers) {
463                                // for jmstemplate, track the producer in case it is closed before commit
464                                // and needs to be replayed
465                                SessionState ss = cs.getSessionState(producerId.getParentId());
466                                ProducerState producerState = ss.getProducerState(producerId);
467                                producerState.setTransactionState(transactionState);            
468                            }
469                        }
470                    }
471                }
472                return TRACKED_RESPONSE_MARKER;
473            }else if (trackMessages) {
474                messageCache.put(send.getMessageId(), send);
475            }
476        }
477        return null;
478    }
479
480    public Response processBeginTransaction(TransactionInfo info) {
481        if (trackTransactions && info != null && info.getTransactionId() != null) {
482            ConnectionId connectionId = info.getConnectionId();
483            if (connectionId != null) {
484                ConnectionState cs = connectionStates.get(connectionId);
485                if (cs != null) {
486                    cs.addTransactionState(info.getTransactionId());
487                    TransactionState state = cs.getTransactionState(info.getTransactionId());
488                    state.addCommand(info);
489                }
490            }
491            return TRACKED_RESPONSE_MARKER;
492        }
493        return null;
494    }
495
496    public Response processPrepareTransaction(TransactionInfo info) throws Exception {
497        if (trackTransactions && info != null) {
498            ConnectionId connectionId = info.getConnectionId();
499            if (connectionId != null) {
500                ConnectionState cs = connectionStates.get(connectionId);
501                if (cs != null) {
502                    TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
503                    if (transactionState != null) {
504                        transactionState.addCommand(info);
505                        return new Tracked(new PrepareReadonlyTransactionAction(info));
506                    }
507                }
508            }
509        }
510        return null;
511    }
512
513    public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
514        if (trackTransactions && info != null) {
515            ConnectionId connectionId = info.getConnectionId();
516            if (connectionId != null) {
517                ConnectionState cs = connectionStates.get(connectionId);
518                if (cs != null) {
519                    TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
520                    if (transactionState != null) {
521                        transactionState.addCommand(info);
522                        return new Tracked(new RemoveTransactionAction(info));
523                    }
524                }
525            }
526        }
527        return null;
528    }
529
530    public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
531        if (trackTransactions && info != null) {
532            ConnectionId connectionId = info.getConnectionId();
533            if (connectionId != null) {
534                ConnectionState cs = connectionStates.get(connectionId);
535                if (cs != null) {
536                    TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
537                    if (transactionState != null) {
538                        transactionState.addCommand(info);
539                        return new Tracked(new RemoveTransactionAction(info));
540                    }
541                }
542            }
543        }
544        return null;
545    }
546
547    public Response processRollbackTransaction(TransactionInfo info) throws Exception {
548        if (trackTransactions && info != null) {
549            ConnectionId connectionId = info.getConnectionId();
550            if (connectionId != null) {
551                ConnectionState cs = connectionStates.get(connectionId);
552                if (cs != null) {
553                    TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
554                    if (transactionState != null) {
555                        transactionState.addCommand(info);
556                        return new Tracked(new RemoveTransactionAction(info));
557                    }
558                }
559            }
560        }
561        return null;
562    }
563
564    public Response processEndTransaction(TransactionInfo info) throws Exception {
565        if (trackTransactions && info != null) {
566            ConnectionId connectionId = info.getConnectionId();
567            if (connectionId != null) {
568                ConnectionState cs = connectionStates.get(connectionId);
569                if (cs != null) {
570                    TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
571                    if (transactionState != null) {
572                        transactionState.addCommand(info);
573                    }
574                }
575            }
576            return TRACKED_RESPONSE_MARKER;
577        }
578        return null;
579    }
580
581    @Override
582    public Response processMessagePull(MessagePull pull) throws Exception {
583        if (pull != null) {
584            // leave a single instance in the cache
585            final String id = pull.getDestination() + "::" + pull.getConsumerId();
586            messageCache.put(id.intern(), pull);
587        }
588        return null;
589    }
590
591    public boolean isRestoreConsumers() {
592        return restoreConsumers;
593    }
594
595    public void setRestoreConsumers(boolean restoreConsumers) {
596        this.restoreConsumers = restoreConsumers;
597    }
598
599    public boolean isRestoreProducers() {
600        return restoreProducers;
601    }
602
603    public void setRestoreProducers(boolean restoreProducers) {
604        this.restoreProducers = restoreProducers;
605    }
606
607    public boolean isRestoreSessions() {
608        return restoreSessions;
609    }
610
611    public void setRestoreSessions(boolean restoreSessions) {
612        this.restoreSessions = restoreSessions;
613    }
614
615    public boolean isTrackTransactions() {
616        return trackTransactions;
617    }
618
619    public void setTrackTransactions(boolean trackTransactions) {
620        this.trackTransactions = trackTransactions;
621    }
622    
623    public boolean isTrackTransactionProducers() {
624        return this.trackTransactionProducers;
625    }
626
627    public void setTrackTransactionProducers(boolean trackTransactionProducers) {
628        this.trackTransactionProducers = trackTransactionProducers;
629    }
630    
631    public boolean isRestoreTransaction() {
632        return restoreTransaction;
633    }
634
635    public void setRestoreTransaction(boolean restoreTransaction) {
636        this.restoreTransaction = restoreTransaction;
637    }
638
639    public boolean isTrackMessages() {
640        return trackMessages;
641    }
642
643    public void setTrackMessages(boolean trackMessages) {
644        this.trackMessages = trackMessages;
645    }
646
647    public int getMaxCacheSize() {
648        return maxCacheSize;
649    }
650
651    public void setMaxCacheSize(int maxCacheSize) {
652        this.maxCacheSize = maxCacheSize;
653    }
654
655    public void connectionInterruptProcessingComplete(Transport transport, ConnectionId connectionId) {
656        ConnectionState connectionState = connectionStates.get(connectionId);
657        if (connectionState != null) {
658            connectionState.setConnectionInterruptProcessingComplete(true);
659            Map<ConsumerId, ConsumerInfo> stalledConsumers = connectionState.getRecoveringPullConsumers();
660            for (Entry<ConsumerId, ConsumerInfo> entry: stalledConsumers.entrySet()) {
661                ConsumerControl control = new ConsumerControl();
662                control.setConsumerId(entry.getKey());
663                control.setPrefetch(entry.getValue().getPrefetchSize());
664                control.setDestination(entry.getValue().getDestination());
665                try {
666                    if (LOG.isDebugEnabled()) {
667                        LOG.debug("restored recovering consumer: " + control.getConsumerId() + " with: " + control.getPrefetch());
668                    }
669                    transport.oneway(control);  
670                } catch (Exception ex) {
671                    if (LOG.isDebugEnabled()) {
672                        LOG.debug("Failed to submit control for consumer: " + control.getConsumerId()
673                                + " with: " + control.getPrefetch(), ex);
674                    }
675                }
676            }
677            stalledConsumers.clear();
678        }
679    }
680
681    public void transportInterrupted(ConnectionId connectionId) {
682        ConnectionState connectionState = connectionStates.get(connectionId);
683        if (connectionState != null) {
684            connectionState.setConnectionInterruptProcessingComplete(false);
685        }
686    }
687}