001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018package org.apache.activemq; 019 020import java.util.Collections; 021import java.util.LinkedList; 022import java.util.List; 023 024import javax.jms.ConnectionConsumer; 025import javax.jms.IllegalStateException; 026import javax.jms.JMSException; 027import javax.jms.ServerSession; 028import javax.jms.ServerSessionPool; 029import javax.jms.Session; 030 031import org.apache.activemq.command.ConsumerInfo; 032import org.apache.activemq.command.MessageDispatch; 033 034/** 035 * For application servers, <CODE>Connection</CODE> objects provide a special 036 * facility for creating a <CODE>ConnectionConsumer</CODE> (optional). The 037 * messages it is to consume are specified by a <CODE>Destination</CODE> and a 038 * message selector. In addition, a <CODE>ConnectionConsumer</CODE> must be 039 * given a <CODE>ServerSessionPool</CODE> to use for processing its messages. 040 * <p/> 041 * <P> 042 * Normally, when traffic is light, a <CODE>ConnectionConsumer</CODE> gets a 043 * <CODE>ServerSession</CODE> from its pool, loads it with a single message, 044 * and starts it. As traffic picks up, messages can back up. If this happens, a 045 * <CODE>ConnectionConsumer</CODE> can load each <CODE>ServerSession</CODE> 046 * with more than one message. This reduces the thread context switches and 047 * minimizes resource use at the expense of some serialization of message 048 * processing. 049 * 050 * @see javax.jms.Connection#createConnectionConsumer 051 * @see javax.jms.Connection#createDurableConnectionConsumer 052 * @see javax.jms.QueueConnection#createConnectionConsumer 053 * @see javax.jms.TopicConnection#createConnectionConsumer 054 * @see javax.jms.TopicConnection#createDurableConnectionConsumer 055 */ 056 057public class ActiveMQConnectionConsumer implements ConnectionConsumer, ActiveMQDispatcher { 058 059 private ActiveMQConnection connection; 060 private ServerSessionPool sessionPool; 061 private ConsumerInfo consumerInfo; 062 private boolean closed; 063 064 /** 065 * Create a ConnectionConsumer 066 * 067 * @param theConnection 068 * @param theSessionPool 069 * @param theConsumerInfo 070 * @throws JMSException 071 */ 072 protected ActiveMQConnectionConsumer(ActiveMQConnection theConnection, ServerSessionPool theSessionPool, ConsumerInfo theConsumerInfo) throws JMSException { 073 this.connection = theConnection; 074 this.sessionPool = theSessionPool; 075 this.consumerInfo = theConsumerInfo; 076 077 this.connection.addConnectionConsumer(this); 078 this.connection.addDispatcher(consumerInfo.getConsumerId(), this); 079 this.connection.asyncSendPacket(this.consumerInfo); 080 } 081 082 /** 083 * Gets the server session pool associated with this connection consumer. 084 * 085 * @return the server session pool used by this connection consumer 086 * @throws JMSException if the JMS provider fails to get the server session 087 * pool associated with this consumer due to some internal 088 * error. 089 */ 090 091 public ServerSessionPool getServerSessionPool() throws JMSException { 092 if (closed) { 093 throw new IllegalStateException("The Connection Consumer is closed"); 094 } 095 return this.sessionPool; 096 } 097 098 /** 099 * Closes the connection consumer. <p/> 100 * <P> 101 * Since a provider may allocate some resources on behalf of a connection 102 * consumer outside the Java virtual machine, clients should close these 103 * resources when they are not needed. Relying on garbage collection to 104 * eventually reclaim these resources may not be timely enough. 105 * 106 * @throws JMSException 107 */ 108 109 public void close() throws JMSException { 110 if (!closed) { 111 dispose(); 112 this.connection.asyncSendPacket(this.consumerInfo.createRemoveCommand()); 113 } 114 115 } 116 117 public void dispose() { 118 if (!closed) { 119 this.connection.removeDispatcher(consumerInfo.getConsumerId()); 120 this.connection.removeConnectionConsumer(this); 121 closed = true; 122 } 123 } 124 125 public void dispatch(MessageDispatch messageDispatch) { 126 try { 127 messageDispatch.setConsumer(this); 128 129 ServerSession serverSession = sessionPool.getServerSession(); 130 Session s = serverSession.getSession(); 131 ActiveMQSession session = null; 132 133 if (s instanceof ActiveMQSession) { 134 session = (ActiveMQSession)s; 135 } else if (s instanceof ActiveMQTopicSession) { 136 ActiveMQTopicSession topicSession = (ActiveMQTopicSession)s; 137 session = (ActiveMQSession)topicSession.getNext(); 138 } else if (s instanceof ActiveMQQueueSession) { 139 ActiveMQQueueSession queueSession = (ActiveMQQueueSession)s; 140 session = (ActiveMQSession)queueSession.getNext(); 141 } else { 142 connection.onClientInternalException(new JMSException("Session pool provided an invalid session type: " + s.getClass())); 143 return; 144 } 145 146 session.dispatch(messageDispatch); 147 serverSession.start(); 148 } catch (JMSException e) { 149 connection.onAsyncException(e); 150 } 151 } 152 153 public String toString() { 154 return "ActiveMQConnectionConsumer { value=" + consumerInfo.getConsumerId() + " }"; 155 } 156 157 public void clearMessagesInProgress() { 158 // future: may want to deal with rollback of in progress messages to track re deliveries 159 // before indicating that all is complete. 160 // Till there is a need, lets immediately allow dispatch 161 this.connection.transportInterruptionProcessingComplete(); 162 } 163}