001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker;
018
019import java.net.URI;
020import java.util.Map;
021import java.util.Set;
022import java.util.concurrent.ThreadPoolExecutor;
023import org.apache.activemq.broker.region.Destination;
024import org.apache.activemq.broker.region.MessageReference;
025import org.apache.activemq.broker.region.Subscription;
026import org.apache.activemq.command.ActiveMQDestination;
027import org.apache.activemq.command.BrokerId;
028import org.apache.activemq.command.BrokerInfo;
029import org.apache.activemq.command.ConnectionInfo;
030import org.apache.activemq.command.ConsumerControl;
031import org.apache.activemq.command.ConsumerInfo;
032import org.apache.activemq.command.DestinationInfo;
033import org.apache.activemq.command.Message;
034import org.apache.activemq.command.MessageAck;
035import org.apache.activemq.command.MessageDispatch;
036import org.apache.activemq.command.MessageDispatchNotification;
037import org.apache.activemq.command.MessagePull;
038import org.apache.activemq.command.ProducerInfo;
039import org.apache.activemq.command.RemoveSubscriptionInfo;
040import org.apache.activemq.command.Response;
041import org.apache.activemq.command.SessionInfo;
042import org.apache.activemq.command.TransactionId;
043import org.apache.activemq.store.kahadb.plist.PListStore;
044import org.apache.activemq.thread.Scheduler;
045import org.apache.activemq.usage.Usage;
046
047/**
048 * Allows you to intercept broker operation so that features such as security
049 * can be implemented as a pluggable filter.
050 * 
051 * 
052 */
053public class BrokerFilter implements Broker {
054
055    protected final Broker next;
056
057    public BrokerFilter(Broker next) {
058        this.next = next;
059    }
060
061    public Broker getAdaptor(Class type) {
062        if (type.isInstance(this)) {
063            return this;
064        }
065        return next.getAdaptor(type);
066    }
067
068    public Map<ActiveMQDestination, Destination> getDestinationMap() {
069        return next.getDestinationMap();
070    }
071
072    public Set <Destination>getDestinations(ActiveMQDestination destination) {
073        return next.getDestinations(destination);
074    }
075
076    public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
077        next.acknowledge(consumerExchange, ack);
078    }
079
080    public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
081        return next.messagePull(context, pull);
082    }
083
084    public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
085        next.addConnection(context, info);
086    }
087
088    public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
089        return next.addConsumer(context, info);
090    }
091
092    public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
093        next.addProducer(context, info);
094    }
095
096    public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
097        next.commitTransaction(context, xid, onePhase);
098    }
099
100    public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
101        next.removeSubscription(context, info);
102    }
103
104    public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception {
105        return next.getPreparedTransactions(context);
106    }
107
108    public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
109        return next.prepareTransaction(context, xid);
110    }
111
112    public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
113        next.removeConnection(context, info, error);
114    }
115
116    public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
117        next.removeConsumer(context, info);
118    }
119
120    public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
121        next.removeProducer(context, info);
122    }
123
124    public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
125        next.rollbackTransaction(context, xid);
126    }
127
128    public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
129        next.send(producerExchange, messageSend);
130    }
131
132    public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
133        next.beginTransaction(context, xid);
134    }
135
136    public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception {
137        next.forgetTransaction(context, transactionId);
138    }
139
140    public Connection[] getClients() throws Exception {
141        return next.getClients();
142    }
143
144    public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean createIfTemporary) throws Exception {
145        return next.addDestination(context, destination,createIfTemporary);
146    }
147
148    public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
149        next.removeDestination(context, destination, timeout);
150    }
151
152    public ActiveMQDestination[] getDestinations() throws Exception {
153        return next.getDestinations();
154    }
155
156    public void start() throws Exception {
157        next.start();
158    }
159
160    public void stop() throws Exception {
161        next.stop();
162    }
163
164    public void addSession(ConnectionContext context, SessionInfo info) throws Exception {
165        next.addSession(context, info);
166    }
167
168    public void removeSession(ConnectionContext context, SessionInfo info) throws Exception {
169        next.removeSession(context, info);
170    }
171
172    public BrokerId getBrokerId() {
173        return next.getBrokerId();
174    }
175
176    public String getBrokerName() {
177        return next.getBrokerName();
178    }
179
180    public void gc() {
181        next.gc();
182    }
183
184    public void addBroker(Connection connection, BrokerInfo info) {
185        next.addBroker(connection, info);
186    }
187
188    public void removeBroker(Connection connection, BrokerInfo info) {
189        next.removeBroker(connection, info);
190    }
191
192    public BrokerInfo[] getPeerBrokerInfos() {
193        return next.getPeerBrokerInfos();
194    }
195
196    public void preProcessDispatch(MessageDispatch messageDispatch) {
197        next.preProcessDispatch(messageDispatch);
198    }
199
200    public void postProcessDispatch(MessageDispatch messageDispatch) {
201        next.postProcessDispatch(messageDispatch);
202    }
203
204    public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
205        next.processDispatchNotification(messageDispatchNotification);
206    }
207
208    public boolean isStopped() {
209        return next.isStopped();
210    }
211
212    public Set<ActiveMQDestination> getDurableDestinations() {
213        return next.getDurableDestinations();
214    }
215
216    public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
217        next.addDestinationInfo(context, info);
218    }
219
220    public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
221        next.removeDestinationInfo(context, info);
222    }
223
224    public boolean isFaultTolerantConfiguration() {
225        return next.isFaultTolerantConfiguration();
226    }
227
228    public ConnectionContext getAdminConnectionContext() {
229        return next.getAdminConnectionContext();
230    }
231
232    public void setAdminConnectionContext(ConnectionContext adminConnectionContext) {
233        next.setAdminConnectionContext(adminConnectionContext);
234    }
235
236    public PListStore getTempDataStore() {
237        return next.getTempDataStore();
238    }
239
240    public URI getVmConnectorURI() {
241        return next.getVmConnectorURI();
242    }
243
244    public void brokerServiceStarted() {
245        next.brokerServiceStarted();
246    }
247
248    public BrokerService getBrokerService() {
249        return next.getBrokerService();
250    }
251
252    public boolean isExpired(MessageReference messageReference) {
253        return next.isExpired(messageReference);
254    }
255
256    public void messageExpired(ConnectionContext context, MessageReference message, Subscription subscription) {
257        next.messageExpired(context, message, subscription);
258    }
259
260    public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
261                                      Subscription subscription) {
262        next.sendToDeadLetterQueue(context, messageReference, subscription);
263    }
264
265    public Broker getRoot() {
266        return next.getRoot();
267    }
268
269    public long getBrokerSequenceId() {
270        return next.getBrokerSequenceId();
271    }
272
273   
274    public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) {
275        next.fastProducer(context, producerInfo);
276    }
277
278    public void isFull(ConnectionContext context,Destination destination, Usage usage) {
279        next.isFull(context,destination, usage);
280    }
281
282    public void messageConsumed(ConnectionContext context,MessageReference messageReference) {
283        next.messageConsumed(context, messageReference);
284    }
285
286    public void messageDelivered(ConnectionContext context,MessageReference messageReference) {
287        next.messageDelivered(context, messageReference);
288    }
289
290    public void messageDiscarded(ConnectionContext context,Subscription sub, MessageReference messageReference) {
291        next.messageDiscarded(context, sub, messageReference);
292    }
293
294    public void slowConsumer(ConnectionContext context, Destination destination,Subscription subs) {
295        next.slowConsumer(context, destination,subs);
296    }
297    
298    public void nowMasterBroker() {   
299        next.nowMasterBroker();
300    }
301
302    public void processConsumerControl(ConsumerBrokerExchange consumerExchange,
303            ConsumerControl control) {
304        next.processConsumerControl(consumerExchange, control);
305    }
306
307    public Scheduler getScheduler() {
308       return next.getScheduler();
309    }
310
311    public ThreadPoolExecutor getExecutor() {
312       return next.getExecutor();
313    }
314
315    public void networkBridgeStarted(BrokerInfo brokerInfo, boolean createdByDuplex, String remoteIp) {
316        next.networkBridgeStarted(brokerInfo, createdByDuplex, remoteIp);
317    }
318
319    public void networkBridgeStopped(BrokerInfo brokerInfo) {
320        next.networkBridgeStopped(brokerInfo);
321    }
322}