001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.network; 018 019 import java.util.List; 020 import org.apache.activemq.broker.region.Subscription; 021 import org.apache.activemq.command.BrokerId; 022 import org.apache.activemq.command.ConsumerInfo; 023 import org.apache.activemq.command.Message; 024 import org.apache.activemq.command.NetworkBridgeFilter; 025 import org.apache.activemq.filter.MessageEvaluationContext; 026 import org.slf4j.Logger; 027 import org.slf4j.LoggerFactory; 028 029 /** 030 * implement conditional behaviour for queue consumers, 031 * allows replaying back to origin if no consumers are present on the local broker 032 * after a configurable delay, irrespective of the networkTTL 033 * Also allows rate limiting of messages through the network, useful for static includes 034 * 035 * @org.apache.xbean.XBean 036 */ 037 038 public class ConditionalNetworkBridgeFilterFactory implements NetworkBridgeFilterFactory { 039 boolean replayWhenNoConsumers = false; 040 int replayDelay = 0; 041 int rateLimit = 0; 042 int rateDuration = 1000; 043 044 @Override 045 public NetworkBridgeFilter create(ConsumerInfo info, BrokerId[] remoteBrokerPath, int networkTimeToLive) { 046 ConditionalNetworkBridgeFilter filter = new ConditionalNetworkBridgeFilter(); 047 filter.setNetworkBrokerId(remoteBrokerPath[0]); 048 filter.setNetworkTTL(networkTimeToLive); 049 filter.setAllowReplayWhenNoConsumers(isReplayWhenNoConsumers()); 050 filter.setRateLimit(getRateLimit()); 051 filter.setRateDuration(getRateDuration()); 052 filter.setReplayDelay(getReplayDelay()); 053 return filter; 054 } 055 056 public void setReplayWhenNoConsumers(boolean replayWhenNoConsumers) { 057 this.replayWhenNoConsumers = replayWhenNoConsumers; 058 } 059 060 public boolean isReplayWhenNoConsumers() { 061 return replayWhenNoConsumers; 062 } 063 064 public void setRateLimit(int rateLimit) { 065 this.rateLimit = rateLimit; 066 } 067 068 public int getRateLimit() { 069 return rateLimit; 070 } 071 072 public int getRateDuration() { 073 return rateDuration; 074 } 075 076 public void setRateDuration(int rateDuration) { 077 this.rateDuration = rateDuration; 078 } 079 080 public int getReplayDelay() { 081 return replayDelay; 082 } 083 084 public void setReplayDelay(int replayDelay) { 085 this.replayDelay = replayDelay; 086 } 087 088 private static class ConditionalNetworkBridgeFilter extends NetworkBridgeFilter { 089 final static Logger LOG = LoggerFactory.getLogger(ConditionalNetworkBridgeFilter.class); 090 private int rateLimit; 091 private int rateDuration = 1000; 092 private boolean allowReplayWhenNoConsumers = true; 093 private int replayDelay = 1000; 094 095 private int matchCount; 096 private long rateDurationEnd; 097 098 @Override 099 protected boolean matchesForwardingFilter(Message message, final MessageEvaluationContext mec) { 100 boolean match = true; 101 if (mec.getDestination().isQueue()) { 102 if (contains(message.getBrokerPath(), networkBrokerId)) { 103 // potential replay back to origin 104 match = allowReplayWhenNoConsumers && hasNoLocalConsumers(message, mec) && hasNotJustArrived(message); 105 106 if (match && LOG.isTraceEnabled()) { 107 LOG.trace("Replaying [" + message.getMessageId() +"] for [" + message.getDestination() +"] back to origin in the absence of a local consumer"); 108 } 109 } 110 111 if (match && rateLimitExceeded()) { 112 if (LOG.isTraceEnabled()) { 113 LOG.trace("Throttled network consumer rejecting [" + message.getMessageId() + "] for [" + message.getDestination() + " " + matchCount + ">" + rateLimit + "/" + rateDuration); 114 } 115 match = false; 116 } 117 118 } else { 119 // use existing logic for topics 120 match = super.matchesForwardingFilter(message, mec); 121 } 122 123 return match; 124 } 125 126 private boolean hasNotJustArrived(Message message) { 127 return replayDelay ==0 || (message.getBrokerInTime() + replayDelay < System.currentTimeMillis()); 128 } 129 130 private boolean hasNoLocalConsumers(final Message message, final MessageEvaluationContext mec) { 131 List<Subscription> consumers = mec.getMessageReference().getRegionDestination().getConsumers(); 132 for (Subscription sub : consumers) { 133 if (!sub.getConsumerInfo().isNetworkSubscription() && !sub.getConsumerInfo().isBrowser()) { 134 if (LOG.isTraceEnabled()) { 135 LOG.trace("Not replaying [" + message.getMessageId() + "] for [" + message.getDestination() +"] to origin due to existing local consumer: " + sub.getConsumerInfo()); 136 } 137 return false; 138 } 139 } 140 return true; 141 } 142 143 private boolean rateLimitExceeded() { 144 if (rateLimit == 0) { 145 return false; 146 } 147 148 if (rateDurationEnd < System.currentTimeMillis()) { 149 rateDurationEnd = System.currentTimeMillis() + rateDuration; 150 matchCount = 0; 151 } 152 return ++matchCount > rateLimit; 153 } 154 155 public void setReplayDelay(int replayDelay) { 156 this.replayDelay = replayDelay; 157 } 158 159 public void setRateLimit(int rateLimit) { 160 this.rateLimit = rateLimit; 161 } 162 163 public void setRateDuration(int rateDuration) { 164 this.rateDuration = rateDuration; 165 } 166 167 public void setAllowReplayWhenNoConsumers(boolean allowReplayWhenNoConsumers) { 168 this.allowReplayWhenNoConsumers = allowReplayWhenNoConsumers; 169 } 170 } 171 }