001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018package org.apache.activemq.state; 019 020import java.util.ArrayList; 021import java.util.Collection; 022import java.util.Collections; 023import java.util.HashMap; 024import java.util.Iterator; 025import java.util.List; 026import java.util.Map; 027import java.util.Set; 028import java.util.concurrent.ConcurrentHashMap; 029import java.util.concurrent.atomic.AtomicBoolean; 030 031import org.apache.activemq.command.ActiveMQDestination; 032import org.apache.activemq.command.ConnectionInfo; 033import org.apache.activemq.command.ConsumerId; 034import org.apache.activemq.command.ConsumerInfo; 035import org.apache.activemq.command.DestinationInfo; 036import org.apache.activemq.command.SessionId; 037import org.apache.activemq.command.SessionInfo; 038import org.apache.activemq.command.TransactionId; 039 040public class ConnectionState { 041 042 ConnectionInfo info; 043 private final ConcurrentHashMap<TransactionId, TransactionState> transactions = new ConcurrentHashMap<TransactionId, TransactionState>(); 044 private final ConcurrentHashMap<SessionId, SessionState> sessions = new ConcurrentHashMap<SessionId, SessionState>(); 045 private final List<DestinationInfo> tempDestinations = Collections.synchronizedList(new ArrayList<DestinationInfo>()); 046 private final AtomicBoolean shutdown = new AtomicBoolean(false); 047 private boolean connectionInterruptProcessingComplete = true; 048 private HashMap<ConsumerId, ConsumerInfo> recoveringPullConsumers; 049 050 public ConnectionState(ConnectionInfo info) { 051 this.info = info; 052 // Add the default session id. 053 addSession(new SessionInfo(info, -1)); 054 } 055 056 public String toString() { 057 return info.toString(); 058 } 059 060 public void reset(ConnectionInfo info) { 061 this.info = info; 062 transactions.clear(); 063 sessions.clear(); 064 tempDestinations.clear(); 065 shutdown.set(false); 066 // Add the default session id. 067 addSession(new SessionInfo(info, -1)); 068 } 069 070 public void addTempDestination(DestinationInfo info) { 071 checkShutdown(); 072 tempDestinations.add(info); 073 } 074 075 public void removeTempDestination(ActiveMQDestination destination) { 076 for (Iterator<DestinationInfo> iter = tempDestinations.iterator(); iter.hasNext();) { 077 DestinationInfo di = iter.next(); 078 if (di.getDestination().equals(destination)) { 079 iter.remove(); 080 } 081 } 082 } 083 084 public void addTransactionState(TransactionId id) { 085 checkShutdown(); 086 transactions.put(id, new TransactionState(id)); 087 } 088 089 public TransactionState getTransactionState(TransactionId id) { 090 return transactions.get(id); 091 } 092 093 public Collection<TransactionState> getTransactionStates() { 094 return transactions.values(); 095 } 096 097 public TransactionState removeTransactionState(TransactionId id) { 098 return transactions.remove(id); 099 } 100 101 public void addSession(SessionInfo info) { 102 checkShutdown(); 103 sessions.put(info.getSessionId(), new SessionState(info)); 104 } 105 106 public SessionState removeSession(SessionId id) { 107 return sessions.remove(id); 108 } 109 110 public SessionState getSessionState(SessionId id) { 111 return sessions.get(id); 112 } 113 114 public ConnectionInfo getInfo() { 115 return info; 116 } 117 118 public Set<SessionId> getSessionIds() { 119 return sessions.keySet(); 120 } 121 122 public List<DestinationInfo> getTempDestinations() { 123 return tempDestinations; 124 } 125 126 public Collection<SessionState> getSessionStates() { 127 return sessions.values(); 128 } 129 130 private void checkShutdown() { 131 if (shutdown.get()) { 132 throw new IllegalStateException("Disposed"); 133 } 134 } 135 136 public void shutdown() { 137 if (shutdown.compareAndSet(false, true)) { 138 for (Iterator<SessionState> iter = sessions.values().iterator(); iter.hasNext();) { 139 SessionState ss = iter.next(); 140 ss.shutdown(); 141 } 142 } 143 } 144 145 public Map<ConsumerId, ConsumerInfo> getRecoveringPullConsumers() { 146 if (recoveringPullConsumers == null) { 147 recoveringPullConsumers = new HashMap<ConsumerId, ConsumerInfo>(); 148 } 149 return recoveringPullConsumers; 150 } 151 152 public void setConnectionInterruptProcessingComplete(boolean connectionInterruptProcessingComplete) { 153 this.connectionInterruptProcessingComplete = connectionInterruptProcessingComplete; 154 } 155 156 public boolean isConnectionInterruptProcessingComplete() { 157 return connectionInterruptProcessingComplete; 158 } 159}