001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.io.IOException;
020import java.util.Collections;
021import java.util.List;
022import java.util.concurrent.CopyOnWriteArrayList;
023import javax.jms.InvalidSelectorException;
024import javax.jms.JMSException;
025import javax.management.ObjectName;
026import org.apache.activemq.broker.Broker;
027import org.apache.activemq.broker.ConnectionContext;
028import org.apache.activemq.command.ActiveMQDestination;
029import org.apache.activemq.command.ConsumerId;
030import org.apache.activemq.command.ConsumerInfo;
031import org.apache.activemq.filter.BooleanExpression;
032import org.apache.activemq.filter.DestinationFilter;
033import org.apache.activemq.filter.LogicExpression;
034import org.apache.activemq.filter.MessageEvaluationContext;
035import org.apache.activemq.filter.NoLocalExpression;
036import org.apache.activemq.selector.SelectorParser;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040public abstract class AbstractSubscription implements Subscription {
041
042    private static final Logger LOG = LoggerFactory.getLogger(AbstractSubscription.class);
043    protected Broker broker;
044    protected ConnectionContext context;
045    protected ConsumerInfo info;
046    protected final DestinationFilter destinationFilter;
047    protected final CopyOnWriteArrayList<Destination> destinations = new CopyOnWriteArrayList<Destination>();
048    private BooleanExpression selectorExpression;
049    private ObjectName objectName;
050    private int cursorMemoryHighWaterMark = 70;
051    private boolean slowConsumer;
052
053
054    public AbstractSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
055        this.broker = broker;
056        this.context = context;
057        this.info = info;
058        this.destinationFilter = DestinationFilter.parseFilter(info.getDestination());
059        this.selectorExpression = parseSelector(info);
060    }
061
062    private static BooleanExpression parseSelector(ConsumerInfo info) throws InvalidSelectorException {
063        BooleanExpression rc = null;
064        if (info.getSelector() != null) {
065            rc = SelectorParser.parse(info.getSelector());
066        }
067        if (info.isNoLocal()) {
068            if (rc == null) {
069                rc = new NoLocalExpression(info.getConsumerId().getConnectionId());
070            } else {
071                rc = LogicExpression.createAND(new NoLocalExpression(info.getConsumerId().getConnectionId()), rc);
072            }
073        }
074        if (info.getAdditionalPredicate() != null) {
075            if (rc == null) {
076                rc = info.getAdditionalPredicate();
077            } else {
078                rc = LogicExpression.createAND(info.getAdditionalPredicate(), rc);
079            }
080        }
081        return rc;
082    }
083
084    public boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException {
085        ConsumerId targetConsumerId = node.getTargetConsumerId();
086        if (targetConsumerId != null) {
087            if (!targetConsumerId.equals(info.getConsumerId())) {
088                return false;
089            }
090        }
091        try {
092            return (selectorExpression == null || selectorExpression.matches(context)) && this.context.isAllowedToConsume(node);
093        } catch (JMSException e) {
094            LOG.info("Selector failed to evaluate: " + e.getMessage(), e);
095            return false;
096        }
097    }
098
099    public boolean matches(ActiveMQDestination destination) {
100        return destinationFilter.matches(destination);
101    }
102
103    public void add(ConnectionContext context, Destination destination) throws Exception {
104        destinations.add(destination);
105    }
106
107    public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
108        destinations.remove(destination);
109        return Collections.EMPTY_LIST;
110    }
111
112    public ConsumerInfo getConsumerInfo() {
113        return info;
114    }
115
116    public void gc() {
117    }
118
119    public boolean isSlave() {
120        return broker.getBrokerService().isSlave();
121    }
122
123    public ConnectionContext getContext() {
124        return context;
125    }
126
127    public ConsumerInfo getInfo() {
128        return info;
129    }
130
131    public BooleanExpression getSelectorExpression() {
132        return selectorExpression;
133    }
134
135    public String getSelector() {
136        return info.getSelector();
137    }
138
139    public void setSelector(String selector) throws InvalidSelectorException {
140        ConsumerInfo copy = info.copy();
141        copy.setSelector(selector);
142        BooleanExpression newSelector = parseSelector(copy);
143        // its valid so lets actually update it now
144        info.setSelector(selector);
145        this.selectorExpression = newSelector;
146    }
147
148    public ObjectName getObjectName() {
149        return objectName;
150    }
151
152    public void setObjectName(ObjectName objectName) {
153        this.objectName = objectName;
154    }
155
156    public int getPrefetchSize() {
157        return info.getPrefetchSize();
158    }
159    public void setPrefetchSize(int newSize) {
160        info.setPrefetchSize(newSize);
161    }
162
163    public boolean isRecoveryRequired() {
164        return true;
165    }
166    
167    public boolean isSlowConsumer() {
168        return slowConsumer;
169    }
170    
171    public void setSlowConsumer(boolean val) {
172        slowConsumer = val;
173    }
174
175    public boolean addRecoveredMessage(ConnectionContext context, MessageReference message) throws Exception {
176        boolean result = false;
177        MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
178        try {
179            msgContext.setDestination(message.getRegionDestination().getActiveMQDestination());
180            msgContext.setMessageReference(message);
181            result = matches(message, msgContext);
182            if (result) {
183                doAddRecoveredMessage(message);
184            }
185
186        } finally {
187            msgContext.clear();
188        }
189        return result;
190    }
191
192    public ActiveMQDestination getActiveMQDestination() {
193        return info != null ? info.getDestination() : null;
194    }
195    
196    public boolean isBrowser() {
197        return info != null && info.isBrowser();
198    }
199    
200    public int getInFlightUsage() {
201        if (info.getPrefetchSize() > 0) {
202        return (getInFlightSize() * 100)/info.getPrefetchSize();
203        }
204        return Integer.MAX_VALUE;
205    }
206    
207    /**
208     * Add a destination
209     * @param destination
210     */
211    public void addDestination(Destination destination) {
212        
213    }
214       
215    
216    /**
217     * Remove a destination
218     * @param destination
219     */
220    public void removeDestination(Destination destination) {
221        
222    }
223    
224    public int getCursorMemoryHighWaterMark(){
225        return this.cursorMemoryHighWaterMark;
226    }
227
228        public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark){
229                this.cursorMemoryHighWaterMark=cursorMemoryHighWaterMark;
230        }
231    
232    public int countBeforeFull() {
233        return getDispatchedQueueSize() - info.getPrefetchSize();
234    }
235
236    public void unmatched(MessageReference node) throws IOException {
237        // only durable topic subs have something to do here
238    }
239
240    protected void doAddRecoveredMessage(MessageReference message) throws Exception {
241        add(message);
242    }
243}