001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.io.IOException;
020import java.util.Collections;
021import java.util.List;
022import java.util.concurrent.CopyOnWriteArrayList;
023import java.util.concurrent.atomic.AtomicInteger;
024
025import javax.jms.InvalidSelectorException;
026import javax.jms.JMSException;
027import javax.management.ObjectName;
028
029import org.apache.activemq.broker.Broker;
030import org.apache.activemq.broker.ConnectionContext;
031import org.apache.activemq.command.ActiveMQDestination;
032import org.apache.activemq.command.ConsumerId;
033import org.apache.activemq.command.ConsumerInfo;
034import org.apache.activemq.command.MessageAck;
035import org.apache.activemq.filter.BooleanExpression;
036import org.apache.activemq.filter.DestinationFilter;
037import org.apache.activemq.filter.LogicExpression;
038import org.apache.activemq.filter.MessageEvaluationContext;
039import org.apache.activemq.filter.NoLocalExpression;
040import org.apache.activemq.selector.SelectorParser;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044public abstract class AbstractSubscription implements Subscription {
045
046    private static final Logger LOG = LoggerFactory.getLogger(AbstractSubscription.class);
047
048    protected Broker broker;
049    protected ConnectionContext context;
050    protected ConsumerInfo info;
051    protected final DestinationFilter destinationFilter;
052    protected final CopyOnWriteArrayList<Destination> destinations = new CopyOnWriteArrayList<Destination>();
053    protected final AtomicInteger prefetchExtension = new AtomicInteger(0);
054
055    private BooleanExpression selectorExpression;
056    private ObjectName objectName;
057    private int cursorMemoryHighWaterMark = 70;
058    private boolean slowConsumer;
059    private long lastAckTime;
060    private final SubscriptionStatistics subscriptionStatistics = new SubscriptionStatistics();
061
062    public AbstractSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
063        this.broker = broker;
064        this.context = context;
065        this.info = info;
066        this.destinationFilter = DestinationFilter.parseFilter(info.getDestination());
067        this.selectorExpression = parseSelector(info);
068        this.lastAckTime = System.currentTimeMillis();
069    }
070
071    private static BooleanExpression parseSelector(ConsumerInfo info) throws InvalidSelectorException {
072        BooleanExpression rc = null;
073        if (info.getSelector() != null) {
074            rc = SelectorParser.parse(info.getSelector());
075        }
076        if (info.isNoLocal()) {
077            if (rc == null) {
078                rc = new NoLocalExpression(info.getConsumerId().getConnectionId());
079            } else {
080                rc = LogicExpression.createAND(new NoLocalExpression(info.getConsumerId().getConnectionId()), rc);
081            }
082        }
083        if (info.getAdditionalPredicate() != null) {
084            if (rc == null) {
085                rc = info.getAdditionalPredicate();
086            } else {
087                rc = LogicExpression.createAND(info.getAdditionalPredicate(), rc);
088            }
089        }
090        return rc;
091    }
092
093    @Override
094    public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception {
095        this.lastAckTime = System.currentTimeMillis();
096        subscriptionStatistics.getConsumedCount().increment();
097    }
098
099    @Override
100    public boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException {
101        ConsumerId targetConsumerId = node.getTargetConsumerId();
102        if (targetConsumerId != null) {
103            if (!targetConsumerId.equals(info.getConsumerId())) {
104                return false;
105            }
106        }
107        try {
108            return (selectorExpression == null || selectorExpression.matches(context)) && this.context.isAllowedToConsume(node);
109        } catch (JMSException e) {
110            LOG.info("Selector failed to evaluate: {}", e.getMessage(), e);
111            return false;
112        }
113    }
114
115    @Override
116    public boolean isWildcard() {
117        return destinationFilter.isWildcard();
118    }
119
120    @Override
121    public boolean matches(ActiveMQDestination destination) {
122        return destinationFilter.matches(destination);
123    }
124
125    @Override
126    public void add(ConnectionContext context, Destination destination) throws Exception {
127        destinations.add(destination);
128    }
129
130    @Override
131    public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
132        destinations.remove(destination);
133        return Collections.EMPTY_LIST;
134    }
135
136    @Override
137    public ConsumerInfo getConsumerInfo() {
138        return info;
139    }
140
141    @Override
142    public void gc() {
143    }
144
145    @Override
146    public ConnectionContext getContext() {
147        return context;
148    }
149
150    public ConsumerInfo getInfo() {
151        return info;
152    }
153
154    public BooleanExpression getSelectorExpression() {
155        return selectorExpression;
156    }
157
158    @Override
159    public String getSelector() {
160        return info.getSelector();
161    }
162
163    @Override
164    public void setSelector(String selector) throws InvalidSelectorException {
165        ConsumerInfo copy = info.copy();
166        copy.setSelector(selector);
167        BooleanExpression newSelector = parseSelector(copy);
168        // its valid so lets actually update it now
169        info.setSelector(selector);
170        this.selectorExpression = newSelector;
171    }
172
173    @Override
174    public ObjectName getObjectName() {
175        return objectName;
176    }
177
178    @Override
179    public void setObjectName(ObjectName objectName) {
180        this.objectName = objectName;
181    }
182
183    @Override
184    public int getPrefetchSize() {
185        return info.getPrefetchSize();
186    }
187
188    public void setPrefetchSize(int newSize) {
189        info.setPrefetchSize(newSize);
190    }
191
192    @Override
193    public boolean isRecoveryRequired() {
194        return true;
195    }
196
197    @Override
198    public boolean isSlowConsumer() {
199        return slowConsumer;
200    }
201
202    public void setSlowConsumer(boolean val) {
203        slowConsumer = val;
204    }
205
206    @Override
207    public boolean addRecoveredMessage(ConnectionContext context, MessageReference message) throws Exception {
208        boolean result = false;
209        MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
210        try {
211            Destination regionDestination = (Destination) message.getRegionDestination();
212            msgContext.setDestination(regionDestination.getActiveMQDestination());
213            msgContext.setMessageReference(message);
214            result = matches(message, msgContext);
215            if (result) {
216                doAddRecoveredMessage(message);
217            }
218        } finally {
219            msgContext.clear();
220        }
221        return result;
222    }
223
224    @Override
225    public ActiveMQDestination getActiveMQDestination() {
226        return info != null ? info.getDestination() : null;
227    }
228
229    @Override
230    public boolean isBrowser() {
231        return info != null && info.isBrowser();
232    }
233
234    @Override
235    public long getInFlightMessageSize() {
236        return subscriptionStatistics.getInflightMessageSize().getTotalSize();
237    }
238
239    @Override
240    public int getInFlightUsage() {
241        int prefetchSize = info.getPrefetchSize();
242        if (prefetchSize > 0) {
243            return (getInFlightSize() * 100) / prefetchSize;
244        }
245        return Integer.MAX_VALUE;
246    }
247
248    /**
249     * Add a destination
250     * @param destination
251     */
252    public void addDestination(Destination destination) {
253    }
254
255    /**
256     * Remove a destination
257     * @param destination
258     */
259    public void removeDestination(Destination destination) {
260    }
261
262    @Override
263    public int getCursorMemoryHighWaterMark(){
264        return this.cursorMemoryHighWaterMark;
265    }
266
267    @Override
268    public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark){
269        this.cursorMemoryHighWaterMark=cursorMemoryHighWaterMark;
270    }
271
272    @Override
273    public int countBeforeFull() {
274        return info.getPrefetchSize() - getDispatchedQueueSize();
275    }
276
277    @Override
278    public void unmatched(MessageReference node) throws IOException {
279        // only durable topic subs have something to do here
280    }
281
282    protected void doAddRecoveredMessage(MessageReference message) throws Exception {
283        add(message);
284    }
285
286    @Override
287    public long getTimeOfLastMessageAck() {
288        return lastAckTime;
289    }
290
291    public void setTimeOfLastMessageAck(long value) {
292        this.lastAckTime = value;
293    }
294
295    @Override
296    public long getConsumedCount(){
297        return subscriptionStatistics.getConsumedCount().getCount();
298    }
299
300    @Override
301    public void incrementConsumedCount(){
302        subscriptionStatistics.getConsumedCount().increment();
303    }
304
305    @Override
306    public void resetConsumedCount(){
307        subscriptionStatistics.getConsumedCount().reset();
308    }
309
310    @Override
311    public SubscriptionStatistics getSubscriptionStatistics() {
312        return subscriptionStatistics;
313    }
314
315    public void wakeupDestinationsForDispatch() {
316        for (Destination dest : destinations) {
317            dest.wakeup();
318        }
319    }
320
321    public AtomicInteger getPrefetchExtension() {
322        return this.prefetchExtension;
323    }
324}