001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker; 018 019 import java.net.URI; 020 import java.util.Map; 021 import java.util.Set; 022 import java.util.concurrent.ThreadPoolExecutor; 023 import org.apache.activemq.broker.region.Destination; 024 import org.apache.activemq.broker.region.MessageReference; 025 import org.apache.activemq.broker.region.Subscription; 026 import org.apache.activemq.command.ActiveMQDestination; 027 import org.apache.activemq.command.BrokerId; 028 import org.apache.activemq.command.BrokerInfo; 029 import org.apache.activemq.command.ConnectionInfo; 030 import org.apache.activemq.command.ConsumerControl; 031 import org.apache.activemq.command.ConsumerInfo; 032 import org.apache.activemq.command.DestinationInfo; 033 import org.apache.activemq.command.Message; 034 import org.apache.activemq.command.MessageAck; 035 import org.apache.activemq.command.MessageDispatch; 036 import org.apache.activemq.command.MessageDispatchNotification; 037 import org.apache.activemq.command.MessagePull; 038 import org.apache.activemq.command.ProducerInfo; 039 import org.apache.activemq.command.RemoveSubscriptionInfo; 040 import org.apache.activemq.command.Response; 041 import org.apache.activemq.command.SessionInfo; 042 import org.apache.activemq.command.TransactionId; 043 import org.apache.activemq.store.kahadb.plist.PListStore; 044 import org.apache.activemq.thread.Scheduler; 045 import org.apache.activemq.usage.Usage; 046 047 /** 048 * Allows you to intercept broker operation so that features such as security 049 * can be implemented as a pluggable filter. 050 * 051 * 052 */ 053 public class BrokerFilter implements Broker { 054 055 protected final Broker next; 056 057 public BrokerFilter(Broker next) { 058 this.next = next; 059 } 060 061 public Broker getAdaptor(Class type) { 062 if (type.isInstance(this)) { 063 return this; 064 } 065 return next.getAdaptor(type); 066 } 067 068 public Map<ActiveMQDestination, Destination> getDestinationMap() { 069 return next.getDestinationMap(); 070 } 071 072 public Set <Destination>getDestinations(ActiveMQDestination destination) { 073 return next.getDestinations(destination); 074 } 075 076 public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception { 077 next.acknowledge(consumerExchange, ack); 078 } 079 080 public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception { 081 return next.messagePull(context, pull); 082 } 083 084 public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { 085 next.addConnection(context, info); 086 } 087 088 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 089 return next.addConsumer(context, info); 090 } 091 092 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { 093 next.addProducer(context, info); 094 } 095 096 public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception { 097 next.commitTransaction(context, xid, onePhase); 098 } 099 100 public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { 101 next.removeSubscription(context, info); 102 } 103 104 public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception { 105 return next.getPreparedTransactions(context); 106 } 107 108 public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception { 109 return next.prepareTransaction(context, xid); 110 } 111 112 public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { 113 next.removeConnection(context, info, error); 114 } 115 116 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 117 next.removeConsumer(context, info); 118 } 119 120 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { 121 next.removeProducer(context, info); 122 } 123 124 public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception { 125 next.rollbackTransaction(context, xid); 126 } 127 128 public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception { 129 next.send(producerExchange, messageSend); 130 } 131 132 public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception { 133 next.beginTransaction(context, xid); 134 } 135 136 public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception { 137 next.forgetTransaction(context, transactionId); 138 } 139 140 public Connection[] getClients() throws Exception { 141 return next.getClients(); 142 } 143 144 public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean createIfTemporary) throws Exception { 145 return next.addDestination(context, destination,createIfTemporary); 146 } 147 148 public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { 149 next.removeDestination(context, destination, timeout); 150 } 151 152 public ActiveMQDestination[] getDestinations() throws Exception { 153 return next.getDestinations(); 154 } 155 156 public void start() throws Exception { 157 next.start(); 158 } 159 160 public void stop() throws Exception { 161 next.stop(); 162 } 163 164 public void addSession(ConnectionContext context, SessionInfo info) throws Exception { 165 next.addSession(context, info); 166 } 167 168 public void removeSession(ConnectionContext context, SessionInfo info) throws Exception { 169 next.removeSession(context, info); 170 } 171 172 public BrokerId getBrokerId() { 173 return next.getBrokerId(); 174 } 175 176 public String getBrokerName() { 177 return next.getBrokerName(); 178 } 179 180 public void gc() { 181 next.gc(); 182 } 183 184 public void addBroker(Connection connection, BrokerInfo info) { 185 next.addBroker(connection, info); 186 } 187 188 public void removeBroker(Connection connection, BrokerInfo info) { 189 next.removeBroker(connection, info); 190 } 191 192 public BrokerInfo[] getPeerBrokerInfos() { 193 return next.getPeerBrokerInfos(); 194 } 195 196 public void preProcessDispatch(MessageDispatch messageDispatch) { 197 next.preProcessDispatch(messageDispatch); 198 } 199 200 public void postProcessDispatch(MessageDispatch messageDispatch) { 201 next.postProcessDispatch(messageDispatch); 202 } 203 204 public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { 205 next.processDispatchNotification(messageDispatchNotification); 206 } 207 208 public boolean isStopped() { 209 return next.isStopped(); 210 } 211 212 public Set<ActiveMQDestination> getDurableDestinations() { 213 return next.getDurableDestinations(); 214 } 215 216 public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { 217 next.addDestinationInfo(context, info); 218 } 219 220 public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { 221 next.removeDestinationInfo(context, info); 222 } 223 224 public boolean isFaultTolerantConfiguration() { 225 return next.isFaultTolerantConfiguration(); 226 } 227 228 public ConnectionContext getAdminConnectionContext() { 229 return next.getAdminConnectionContext(); 230 } 231 232 public void setAdminConnectionContext(ConnectionContext adminConnectionContext) { 233 next.setAdminConnectionContext(adminConnectionContext); 234 } 235 236 public PListStore getTempDataStore() { 237 return next.getTempDataStore(); 238 } 239 240 public URI getVmConnectorURI() { 241 return next.getVmConnectorURI(); 242 } 243 244 public void brokerServiceStarted() { 245 next.brokerServiceStarted(); 246 } 247 248 public BrokerService getBrokerService() { 249 return next.getBrokerService(); 250 } 251 252 public boolean isExpired(MessageReference messageReference) { 253 return next.isExpired(messageReference); 254 } 255 256 public void messageExpired(ConnectionContext context, MessageReference message, Subscription subscription) { 257 next.messageExpired(context, message, subscription); 258 } 259 260 public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, 261 Subscription subscription) { 262 next.sendToDeadLetterQueue(context, messageReference, subscription); 263 } 264 265 public Broker getRoot() { 266 return next.getRoot(); 267 } 268 269 public long getBrokerSequenceId() { 270 return next.getBrokerSequenceId(); 271 } 272 273 274 public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) { 275 next.fastProducer(context, producerInfo); 276 } 277 278 public void isFull(ConnectionContext context,Destination destination, Usage usage) { 279 next.isFull(context,destination, usage); 280 } 281 282 public void messageConsumed(ConnectionContext context,MessageReference messageReference) { 283 next.messageConsumed(context, messageReference); 284 } 285 286 public void messageDelivered(ConnectionContext context,MessageReference messageReference) { 287 next.messageDelivered(context, messageReference); 288 } 289 290 public void messageDiscarded(ConnectionContext context,Subscription sub, MessageReference messageReference) { 291 next.messageDiscarded(context, sub, messageReference); 292 } 293 294 public void slowConsumer(ConnectionContext context, Destination destination,Subscription subs) { 295 next.slowConsumer(context, destination,subs); 296 } 297 298 public void nowMasterBroker() { 299 next.nowMasterBroker(); 300 } 301 302 public void processConsumerControl(ConsumerBrokerExchange consumerExchange, 303 ConsumerControl control) { 304 next.processConsumerControl(consumerExchange, control); 305 } 306 307 public Scheduler getScheduler() { 308 return next.getScheduler(); 309 } 310 311 public ThreadPoolExecutor getExecutor() { 312 return next.getExecutor(); 313 } 314 315 public void networkBridgeStarted(BrokerInfo brokerInfo, boolean createdByDuplex, String remoteIp) { 316 next.networkBridgeStarted(brokerInfo, createdByDuplex, remoteIp); 317 } 318 319 public void networkBridgeStopped(BrokerInfo brokerInfo) { 320 next.networkBridgeStopped(brokerInfo); 321 } 322 }