001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.command; 018 019import org.apache.activemq.advisory.AdvisorySupport; 020import org.apache.activemq.filter.BooleanExpression; 021import org.apache.activemq.filter.MessageEvaluationContext; 022import org.apache.activemq.util.JMSExceptionSupport; 023import org.slf4j.Logger; 024import org.slf4j.LoggerFactory; 025 026import javax.jms.JMSException; 027import java.io.IOException; 028import java.util.Arrays; 029 030/** 031 * @openwire:marshaller code="91" 032 * 033 */ 034public class NetworkBridgeFilter implements DataStructure, BooleanExpression { 035 036 public static final byte DATA_STRUCTURE_TYPE = CommandTypes.NETWORK_BRIDGE_FILTER; 037 static final Logger LOG = LoggerFactory.getLogger(NetworkBridgeFilter.class); 038 039 protected BrokerId networkBrokerId; 040 protected int messageTTL; 041 protected int consumerTTL; 042 transient ConsumerInfo consumerInfo; 043 044 public NetworkBridgeFilter() { 045 } 046 047 public NetworkBridgeFilter(ConsumerInfo consumerInfo, BrokerId networkBrokerId, int messageTTL, int consumerTTL) { 048 this.networkBrokerId = networkBrokerId; 049 this.messageTTL = messageTTL; 050 this.consumerTTL = consumerTTL; 051 this.consumerInfo = consumerInfo; 052 } 053 054 @Override 055 public byte getDataStructureType() { 056 return DATA_STRUCTURE_TYPE; 057 } 058 059 @Override 060 public boolean isMarshallAware() { 061 return false; 062 } 063 064 @Override 065 public boolean matches(MessageEvaluationContext mec) throws JMSException { 066 try { 067 // for Queues - the message can be acknowledged and dropped whilst 068 // still 069 // in the dispatch loop 070 // so need to get the reference to it 071 Message message = mec.getMessage(); 072 return message != null && matchesForwardingFilter(message, mec); 073 } catch (IOException e) { 074 throw JMSExceptionSupport.create(e); 075 } 076 } 077 078 @Override 079 public Object evaluate(MessageEvaluationContext message) throws JMSException { 080 return matches(message) ? Boolean.TRUE : Boolean.FALSE; 081 } 082 083 protected boolean matchesForwardingFilter(Message message, MessageEvaluationContext mec) { 084 085 if (contains(message.getBrokerPath(), networkBrokerId)) { 086 if (LOG.isTraceEnabled()) { 087 LOG.trace("Message all ready routed once through target broker (" 088 + networkBrokerId + "), path: " 089 + Arrays.toString(message.getBrokerPath()) + " - ignoring: " + message); 090 } 091 return false; 092 } 093 094 int hops = message.getBrokerPath() == null ? 0 : message.getBrokerPath().length; 095 096 if (messageTTL > -1 && hops >= messageTTL) { 097 if (LOG.isTraceEnabled()) { 098 LOG.trace("Message restricted to " + messageTTL + " network hops ignoring: " + message); 099 } 100 return false; 101 } 102 103 if (message.isAdvisory()) { 104 if (consumerInfo != null && consumerInfo.isNetworkSubscription() && isAdvisoryInterpretedByNetworkBridge(message)) { 105 // they will be interpreted by the bridge leading to dup commands 106 if (LOG.isTraceEnabled()) { 107 LOG.trace("not propagating advisory to network sub: " + consumerInfo.getConsumerId() + ", message: "+ message); 108 } 109 return false; 110 } else if ( message.getDataStructure() != null && message.getDataStructure().getDataStructureType() == CommandTypes.CONSUMER_INFO) { 111 ConsumerInfo info = (ConsumerInfo)message.getDataStructure(); 112 hops = info.getBrokerPath() == null ? 0 : info.getBrokerPath().length; 113 if (consumerTTL > -1 && hops >= consumerTTL) { 114 if (LOG.isTraceEnabled()) { 115 LOG.trace("ConsumerInfo advisory restricted to " + consumerTTL + " network hops ignoring: " + message); 116 } 117 return false; 118 } 119 120 if (contains(info.getBrokerPath(), networkBrokerId)) { 121 LOG.trace("ConsumerInfo advisory all ready routed once through target broker (" 122 + networkBrokerId + "), path: " 123 + Arrays.toString(info.getBrokerPath()) + " - ignoring: " + message); 124 return false; 125 } 126 } 127 } 128 return true; 129 } 130 131 public static boolean isAdvisoryInterpretedByNetworkBridge(Message message) { 132 return AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination()) || 133 AdvisorySupport.isVirtualDestinationConsumerAdvisoryTopic(message.getDestination()) || 134 AdvisorySupport.isTempDestinationAdvisoryTopic(message.getDestination()); 135 } 136 137 public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) { 138 if (brokerPath != null && brokerId != null) { 139 for (int i = 0; i < brokerPath.length; i++) { 140 if (brokerId.equals(brokerPath[i])) { 141 return true; 142 } 143 } 144 } 145 return false; 146 } 147 148 // keep for backward compat with older 149 // wire formats 150 public int getNetworkTTL() { 151 return messageTTL; 152 } 153 154 public void setNetworkTTL(int networkTTL) { 155 messageTTL = networkTTL; 156 consumerTTL = networkTTL; 157 } 158 159 /** 160 * @openwire:property version=1 cache=true 161 */ 162 public BrokerId getNetworkBrokerId() { 163 return networkBrokerId; 164 } 165 166 public void setNetworkBrokerId(BrokerId remoteBrokerPath) { 167 this.networkBrokerId = remoteBrokerPath; 168 } 169 170 public void setMessageTTL(int messageTTL) { 171 this.messageTTL = messageTTL; 172 } 173 174 /** 175 * @openwire:property version=10 176 */ 177 public int getMessageTTL() { 178 return this.messageTTL; 179 } 180 181 public void setConsumerTTL(int consumerTTL) { 182 this.consumerTTL = consumerTTL; 183 } 184 185 /** 186 * @openwire:property version=10 187 */ 188 public int getConsumerTTL() { 189 return this.consumerTTL; 190 } 191}