001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker.region.virtual; 018 019 import java.io.IOException; 020 import java.util.List; 021 import java.util.Set; 022 023 import org.apache.activemq.broker.Broker; 024 import org.apache.activemq.broker.ProducerBrokerExchange; 025 import org.apache.activemq.broker.region.Destination; 026 import org.apache.activemq.broker.region.Subscription; 027 import org.apache.activemq.command.ActiveMQDestination; 028 import org.apache.activemq.command.Message; 029 import org.apache.activemq.filter.BooleanExpression; 030 import org.apache.activemq.filter.MessageEvaluationContext; 031 import org.apache.activemq.filter.NonCachedMessageEvaluationContext; 032 import org.apache.activemq.plugin.SubQueueSelectorCacheBroker; 033 import org.apache.activemq.selector.SelectorParser; 034 import org.apache.activemq.util.LRUCache; 035 import org.slf4j.Logger; 036 import org.slf4j.LoggerFactory; 037 038 public class SelectorAwareVirtualTopicInterceptor extends VirtualTopicInterceptor { 039 private static final Logger LOG = LoggerFactory.getLogger(SelectorAwareVirtualTopicInterceptor.class); 040 LRUCache<String,BooleanExpression> expressionCache = new LRUCache<String,BooleanExpression>(); 041 private SubQueueSelectorCacheBroker selectorCachePlugin; 042 043 public SelectorAwareVirtualTopicInterceptor(Destination next, String prefix, String postfix, boolean local) { 044 super(next, prefix, postfix, local); 045 } 046 047 /** 048 * Respect the selectors of the subscriptions to ensure only matched messages are dispatched to 049 * the virtual queues, hence there is no build up of unmatched messages on these destinations 050 */ 051 @Override 052 protected void send(ProducerBrokerExchange context, Message message, ActiveMQDestination destination) throws Exception { 053 Broker broker = context.getConnectionContext().getBroker(); 054 Set<Destination> destinations = broker.getDestinations(destination); 055 056 for (Destination dest : destinations) { 057 if (matchesSomeConsumer(broker, message, dest)) { 058 dest.send(context, message.copy()); 059 } 060 } 061 } 062 063 private boolean matchesSomeConsumer(final Broker broker, Message message, Destination dest) throws IOException { 064 boolean matches = false; 065 MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext(); 066 msgContext.setDestination(dest.getActiveMQDestination()); 067 msgContext.setMessageReference(message); 068 List<Subscription> subs = dest.getConsumers(); 069 for (Subscription sub : subs) { 070 if (sub.matches(message, msgContext)) { 071 matches = true; 072 break; 073 074 } 075 } 076 if (matches == false && subs.size() == 0) { 077 matches = tryMatchingCachedSubs(broker, dest, msgContext); 078 } 079 return matches; 080 } 081 082 private boolean tryMatchingCachedSubs(final Broker broker, Destination dest, MessageEvaluationContext msgContext) { 083 boolean matches = false; 084 LOG.debug("No active consumer match found. Will try cache if configured..."); 085 086 //retrieve the specific plugin class and lookup the selector for the destination. 087 final SubQueueSelectorCacheBroker cache = getSubQueueSelectorCacheBrokerPlugin(broker); 088 089 if (cache != null) { 090 final String selector = cache.getSelector(dest.getActiveMQDestination().getQualifiedName()); 091 if (selector != null) { 092 try { 093 final BooleanExpression expression = getExpression(selector); 094 matches = expression.matches(msgContext); 095 } catch (Exception e) { 096 LOG.error(e.getMessage(), e); 097 } 098 } 099 } 100 return matches; 101 } 102 103 private BooleanExpression getExpression(String selector) throws Exception{ 104 BooleanExpression result; 105 synchronized(expressionCache){ 106 result = expressionCache.get(selector); 107 if (result == null){ 108 result = compileSelector(selector); 109 expressionCache.put(selector,result); 110 } 111 } 112 return result; 113 } 114 115 /** 116 * @return The SubQueueSelectorCacheBroker instance or null if no such broker is available. 117 */ 118 private SubQueueSelectorCacheBroker getSubQueueSelectorCacheBrokerPlugin(final Broker broker) { 119 if (selectorCachePlugin == null) { 120 selectorCachePlugin = (SubQueueSelectorCacheBroker) broker.getAdaptor(SubQueueSelectorCacheBroker.class); 121 } //if 122 123 return selectorCachePlugin; 124 } 125 126 /** 127 * Pre-compile the JMS selector. 128 * 129 * @param selectorExpression The non-null JMS selector expression. 130 */ 131 private BooleanExpression compileSelector(final String selectorExpression) throws Exception { 132 return SelectorParser.parse(selectorExpression); 133 } 134 }