001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.command;
018
019import java.util.ArrayList;
020import java.util.List;
021
022import org.apache.activemq.filter.BooleanExpression;
023import org.apache.activemq.state.CommandVisitor;
024
025/**
026 * @openwire:marshaller code="5"
027 * 
028 */
029public class ConsumerInfo extends BaseCommand {
030
031    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.CONSUMER_INFO;
032
033    public static final byte HIGH_PRIORITY = 10;
034    public static final byte NORMAL_PRIORITY = 0;
035    public static final byte NETWORK_CONSUMER_PRIORITY = -5;
036    public static final byte LOW_PRIORITY = -10;
037
038    protected ConsumerId consumerId;
039    protected ActiveMQDestination destination;
040    protected int prefetchSize;
041    protected int maximumPendingMessageLimit;
042    protected boolean browser;
043    protected boolean dispatchAsync;
044    protected String selector;
045    protected String subscriptionName;
046    protected boolean noLocal;
047    protected boolean exclusive;
048    protected boolean retroactive;
049    protected byte priority;
050    protected BrokerId[] brokerPath;
051    protected boolean optimizedAcknowledge;
052    // used by the broker
053    protected transient int currentPrefetchSize;
054    // if true, the consumer will not send range
055    protected boolean noRangeAcks;
056    // acks.
057
058    protected BooleanExpression additionalPredicate;
059    protected transient boolean networkSubscription; // this subscription
060    protected transient List<ConsumerId> networkConsumerIds; // the original consumerId
061
062    // not marshalled, populated from RemoveInfo, the last message delivered, used
063    // to suppress redelivery on prefetched messages after close
064    private transient long lastDeliveredSequenceId;
065
066    // originated from a
067    // network connection
068
069    public ConsumerInfo() {
070    }
071
072    public ConsumerInfo(ConsumerId consumerId) {
073        this.consumerId = consumerId;
074    }
075
076    public ConsumerInfo(SessionInfo sessionInfo, long consumerId) {
077        this.consumerId = new ConsumerId(sessionInfo.getSessionId(), consumerId);
078    }
079
080    public ConsumerInfo copy() {
081        ConsumerInfo info = new ConsumerInfo();
082        copy(info);
083        return info;
084    }
085
086    public void copy(ConsumerInfo info) {
087        super.copy(info);
088        info.consumerId = consumerId;
089        info.destination = destination;
090        info.prefetchSize = prefetchSize;
091        info.maximumPendingMessageLimit = maximumPendingMessageLimit;
092        info.browser = browser;
093        info.dispatchAsync = dispatchAsync;
094        info.selector = selector;
095        info.subscriptionName = subscriptionName;
096        info.noLocal = noLocal;
097        info.exclusive = exclusive;
098        info.retroactive = retroactive;
099        info.priority = priority;
100        info.brokerPath = brokerPath;
101        info.networkSubscription = networkSubscription;
102        if (networkConsumerIds != null) {
103            if (info.networkConsumerIds==null){
104                info.networkConsumerIds=new ArrayList<ConsumerId>();
105            }
106            info.networkConsumerIds.addAll(networkConsumerIds);
107        }
108    }
109
110    public boolean isDurable() {
111        return subscriptionName != null;
112    }
113
114    public byte getDataStructureType() {
115        return DATA_STRUCTURE_TYPE;
116    }
117
118    /**
119     * Is used to uniquely identify the consumer to the broker.
120     * 
121     * @openwire:property version=1 cache=true
122     */
123    public ConsumerId getConsumerId() {
124        return consumerId;
125    }
126
127    public void setConsumerId(ConsumerId consumerId) {
128        this.consumerId = consumerId;
129    }
130
131    /**
132     * Is this consumer a queue browser?
133     * 
134     * @openwire:property version=1
135     */
136    public boolean isBrowser() {
137        return browser;
138    }
139
140    public void setBrowser(boolean browser) {
141        this.browser = browser;
142    }
143
144    /**
145     * The destination that the consumer is interested in receiving messages
146     * from. This destination could be a composite destination.
147     * 
148     * @openwire:property version=1 cache=true
149     */
150    public ActiveMQDestination getDestination() {
151        return destination;
152    }
153
154    public void setDestination(ActiveMQDestination destination) {
155        this.destination = destination;
156    }
157
158    /**
159     * How many messages a broker will send to the client without receiving an
160     * ack before he stops dispatching messages to the client.
161     * 
162     * @openwire:property version=1
163     */
164    public int getPrefetchSize() {
165        return prefetchSize;
166    }
167
168    public void setPrefetchSize(int prefetchSize) {
169        this.prefetchSize = prefetchSize;
170        this.currentPrefetchSize = prefetchSize;
171    }
172
173    /**
174     * How many messages a broker will keep around, above the prefetch limit,
175     * for non-durable topics before starting to discard older messages.
176     * 
177     * @openwire:property version=1
178     */
179    public int getMaximumPendingMessageLimit() {
180        return maximumPendingMessageLimit;
181    }
182
183    public void setMaximumPendingMessageLimit(int maximumPendingMessageLimit) {
184        this.maximumPendingMessageLimit = maximumPendingMessageLimit;
185    }
186
187    /**
188     * Should the broker dispatch a message to the consumer async? If he does it
189     * async, then he uses a more SEDA style of processing while if it is not
190     * done async, then he broker use a STP style of processing. STP is more
191     * appropriate in high bandwidth situations or when being used by and in vm
192     * transport.
193     * 
194     * @openwire:property version=1
195     */
196    public boolean isDispatchAsync() {
197        return dispatchAsync;
198    }
199
200    public void setDispatchAsync(boolean dispatchAsync) {
201        this.dispatchAsync = dispatchAsync;
202    }
203
204    /**
205     * The JMS selector used to filter out messages that this consumer is
206     * interested in.
207     * 
208     * @openwire:property version=1
209     */
210    public String getSelector() {
211        return selector;
212    }
213
214    public void setSelector(String selector) {
215        this.selector = selector;
216    }
217
218    /**
219     * Used to identify the name of a durable subscription.
220     * 
221     * @openwire:property version=1
222     */
223    public String getSubscriptionName() {
224        return subscriptionName;
225    }
226
227    public void setSubscriptionName(String durableSubscriptionId) {
228        this.subscriptionName = durableSubscriptionId;
229    }
230
231    /**
232     * @deprecated
233     * @return
234     * @see getSubscriptionName
235     */
236    public String getSubcriptionName() {
237        return subscriptionName;
238    }
239
240    /**
241     * @deprecated
242     * @see setSubscriptionName
243     * @param durableSubscriptionId
244     */
245    public void setSubcriptionName(String durableSubscriptionId) {
246        this.subscriptionName = durableSubscriptionId;
247    }
248
249    /**
250     * Set noLocal to true to avoid receiving messages that were published
251     * locally on the same connection.
252     * 
253     * @openwire:property version=1
254     */
255    public boolean isNoLocal() {
256        return noLocal;
257    }
258
259    public void setNoLocal(boolean noLocal) {
260        this.noLocal = noLocal;
261    }
262
263    /**
264     * An exclusive consumer locks out other consumers from being able to
265     * receive messages from the destination. If there are multiple exclusive
266     * consumers for a destination, the first one created will be the exclusive
267     * consumer of the destination.
268     * 
269     * @openwire:property version=1
270     */
271    public boolean isExclusive() {
272        return exclusive;
273    }
274
275    public void setExclusive(boolean exclusive) {
276        this.exclusive = exclusive;
277    }
278
279    /**
280     * A retroactive consumer only has meaning for Topics. It allows a consumer
281     * to retroactively see messages sent prior to the consumer being created.
282     * If the consumer is not durable, it will be delivered the last message
283     * published to the topic. If the consumer is durable then it will receive
284     * all persistent messages that are still stored in persistent storage for
285     * that topic.
286     * 
287     * @openwire:property version=1
288     */
289    public boolean isRetroactive() {
290        return retroactive;
291    }
292
293    public void setRetroactive(boolean retroactive) {
294        this.retroactive = retroactive;
295    }
296
297    public RemoveInfo createRemoveCommand() {
298        RemoveInfo command = new RemoveInfo(getConsumerId());
299        command.setResponseRequired(isResponseRequired());
300        return command;
301    }
302
303    /**
304     * The broker will avoid dispatching to a lower priority consumer if there
305     * are other higher priority consumers available to dispatch to. This allows
306     * letting the broker to have an affinity to higher priority consumers.
307     * Default priority is 0.
308     * 
309     * @openwire:property version=1
310     */
311    public byte getPriority() {
312        return priority;
313    }
314
315    public void setPriority(byte priority) {
316        this.priority = priority;
317    }
318
319    /**
320     * The route of brokers the command has moved through.
321     * 
322     * @openwire:property version=1 cache=true
323     */
324    public BrokerId[] getBrokerPath() {
325        return brokerPath;
326    }
327
328    public void setBrokerPath(BrokerId[] brokerPath) {
329        this.brokerPath = brokerPath;
330    }
331
332    /**
333     * A transient additional predicate that can be used it inject additional
334     * predicates into the selector on the fly. Handy if if say a Security
335     * Broker interceptor wants to filter out messages based on security level
336     * of the consumer.
337     * 
338     * @openwire:property version=1
339     */
340    public BooleanExpression getAdditionalPredicate() {
341        return additionalPredicate;
342    }
343
344    public void setAdditionalPredicate(BooleanExpression additionalPredicate) {
345        this.additionalPredicate = additionalPredicate;
346    }
347
348    public Response visit(CommandVisitor visitor) throws Exception {
349        return visitor.processAddConsumer(this);
350    }
351
352    /**
353     * @openwire:property version=1
354     * @return Returns the networkSubscription.
355     */
356    public boolean isNetworkSubscription() {
357        return networkSubscription;
358    }
359
360    /**
361     * @param networkSubscription The networkSubscription to set.
362     */
363    public void setNetworkSubscription(boolean networkSubscription) {
364        this.networkSubscription = networkSubscription;
365    }
366
367    /**
368     * @openwire:property version=1
369     * @return Returns the optimizedAcknowledge.
370     */
371    public boolean isOptimizedAcknowledge() {
372        return optimizedAcknowledge;
373    }
374
375    /**
376     * @param optimizedAcknowledge The optimizedAcknowledge to set.
377     */
378    public void setOptimizedAcknowledge(boolean optimizedAcknowledge) {
379        this.optimizedAcknowledge = optimizedAcknowledge;
380    }
381
382    /**
383     * @return Returns the currentPrefetchSize.
384     */
385    public int getCurrentPrefetchSize() {
386        return currentPrefetchSize;
387    }
388
389    /**
390     * @param currentPrefetchSize The currentPrefetchSize to set.
391     */
392    public void setCurrentPrefetchSize(int currentPrefetchSize) {
393        this.currentPrefetchSize = currentPrefetchSize;
394    }
395
396    /**
397     * The broker may be able to optimize it's processing or provides better QOS
398     * if it knows the consumer will not be sending ranged acks.
399     * 
400     * @return true if the consumer will not send range acks.
401     * @openwire:property version=1
402     */
403    public boolean isNoRangeAcks() {
404        return noRangeAcks;
405    }
406
407    public void setNoRangeAcks(boolean noRangeAcks) {
408        this.noRangeAcks = noRangeAcks;
409    }
410
411    public synchronized void addNetworkConsumerId(ConsumerId networkConsumerId) {
412        if (networkConsumerIds == null) {
413            networkConsumerIds = new ArrayList<ConsumerId>();
414        }
415        networkConsumerIds.add(networkConsumerId);
416    }
417
418    public synchronized void removeNetworkConsumerId(ConsumerId networkConsumerId) {
419        if (networkConsumerIds != null) {
420            networkConsumerIds.remove(networkConsumerId);
421            if (networkConsumerIds.isEmpty()) {
422                networkConsumerIds=null;
423            }
424        }
425    }
426    
427    public synchronized boolean isNetworkConsumersEmpty() {
428        return networkConsumerIds == null || networkConsumerIds.isEmpty();
429    }
430    
431    public synchronized List<ConsumerId> getNetworkConsumerIds(){
432        List<ConsumerId> result = new ArrayList<ConsumerId>();
433        if (networkConsumerIds != null) {
434            result.addAll(networkConsumerIds);
435        }
436        return result;
437    }
438
439    /**
440     * Tracks the original subscription id that causes a subscription to 
441     * percolate through a network when networkTTL > 1. Tracking the original
442     * subscription allows duplicate suppression.
443     * 
444     * @return array of the current subscription path
445     * @openwire:property version=4
446     */
447    public ConsumerId[] getNetworkConsumerPath() {
448        ConsumerId[] result = null;
449        if (networkConsumerIds != null) {
450            result = networkConsumerIds.toArray(new ConsumerId[0]);
451        }
452        return result;
453    }
454    
455    public void setNetworkConsumerPath(ConsumerId[] consumerPath) {
456        if (consumerPath != null) {
457            for (int i=0; i<consumerPath.length; i++) {
458                addNetworkConsumerId(consumerPath[i]);
459            }
460        }
461    }
462
463    public void setLastDeliveredSequenceId(long lastDeliveredSequenceId) {
464        this.lastDeliveredSequenceId  = lastDeliveredSequenceId;
465    }
466    
467    public long getLastDeliveredSequenceId() {
468        return lastDeliveredSequenceId;
469    }
470
471}