001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker; 018 019import java.util.ArrayList; 020import java.util.List; 021import org.apache.activemq.broker.region.Destination; 022import org.apache.activemq.broker.region.Subscription; 023import org.apache.activemq.command.ActiveMQDestination; 024import org.apache.activemq.command.BrokerInfo; 025import org.apache.activemq.command.ConnectionInfo; 026import org.apache.activemq.command.ConsumerInfo; 027import org.apache.activemq.command.Message; 028import org.apache.activemq.command.MessageAck; 029import org.apache.activemq.command.ProducerInfo; 030import org.apache.activemq.command.RemoveSubscriptionInfo; 031import org.apache.activemq.command.SessionInfo; 032import org.apache.activemq.command.TransactionId; 033 034/** 035 * Used to add listeners for Broker actions 036 * 037 * 038 */ 039public class BrokerBroadcaster extends BrokerFilter { 040 protected volatile Broker[] listeners = new Broker[0]; 041 042 public BrokerBroadcaster(Broker next) { 043 super(next); 044 } 045 046 @Override 047 public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception { 048 next.acknowledge(consumerExchange, ack); 049 Broker brokers[] = getListeners(); 050 for (int i = 0; i < brokers.length; i++) { 051 brokers[i].acknowledge(consumerExchange, ack); 052 } 053 } 054 055 @Override 056 public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { 057 next.addConnection(context, info); 058 Broker brokers[] = getListeners(); 059 for (int i = 0; i < brokers.length; i++) { 060 brokers[i].addConnection(context, info); 061 } 062 } 063 064 @Override 065 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 066 Subscription answer = next.addConsumer(context, info); 067 Broker brokers[] = getListeners(); 068 for (int i = 0; i < brokers.length; i++) { 069 brokers[i].addConsumer(context, info); 070 } 071 return answer; 072 } 073 074 @Override 075 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { 076 next.addProducer(context, info); 077 Broker brokers[] = getListeners(); 078 for (int i = 0; i < brokers.length; i++) { 079 brokers[i].addProducer(context, info); 080 } 081 } 082 083 @Override 084 public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception { 085 next.commitTransaction(context, xid, onePhase); 086 Broker brokers[] = getListeners(); 087 for (int i = 0; i < brokers.length; i++) { 088 brokers[i].commitTransaction(context, xid, onePhase); 089 } 090 } 091 092 @Override 093 public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { 094 next.removeSubscription(context, info); 095 Broker brokers[] = getListeners(); 096 for (int i = 0; i < brokers.length; i++) { 097 brokers[i].removeSubscription(context, info); 098 } 099 } 100 101 @Override 102 public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception { 103 int result = next.prepareTransaction(context, xid); 104 Broker brokers[] = getListeners(); 105 for (int i = 0; i < brokers.length; i++) { 106 // TODO decide what to do with return values 107 brokers[i].prepareTransaction(context, xid); 108 } 109 return result; 110 } 111 112 @Override 113 public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { 114 next.removeConnection(context, info, error); 115 Broker brokers[] = getListeners(); 116 for (int i = 0; i < brokers.length; i++) { 117 brokers[i].removeConnection(context, info, error); 118 } 119 } 120 121 @Override 122 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 123 next.removeConsumer(context, info); 124 Broker brokers[] = getListeners(); 125 for (int i = 0; i < brokers.length; i++) { 126 brokers[i].removeConsumer(context, info); 127 } 128 } 129 130 @Override 131 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { 132 next.removeProducer(context, info); 133 Broker brokers[] = getListeners(); 134 for (int i = 0; i < brokers.length; i++) { 135 brokers[i].removeProducer(context, info); 136 } 137 } 138 139 @Override 140 public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception { 141 next.rollbackTransaction(context, xid); 142 Broker brokers[] = getListeners(); 143 for (int i = 0; i < brokers.length; i++) { 144 brokers[i].rollbackTransaction(context, xid); 145 } 146 } 147 148 @Override 149 public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception { 150 next.send(producerExchange, messageSend); 151 Broker brokers[] = getListeners(); 152 for (int i = 0; i < brokers.length; i++) { 153 brokers[i].send(producerExchange, messageSend); 154 } 155 } 156 157 @Override 158 public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception { 159 next.beginTransaction(context, xid); 160 Broker brokers[] = getListeners(); 161 for (int i = 0; i < brokers.length; i++) { 162 brokers[i].beginTransaction(context, xid); 163 } 164 } 165 166 @Override 167 public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception { 168 next.forgetTransaction(context, transactionId); 169 Broker brokers[] = getListeners(); 170 for (int i = 0; i < brokers.length; i++) { 171 brokers[i].forgetTransaction(context, transactionId); 172 } 173 } 174 175 @Override 176 public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean createIfTemporary) throws Exception { 177 Destination result = next.addDestination(context, destination,createIfTemporary); 178 Broker brokers[] = getListeners(); 179 for (int i = 0; i < brokers.length; i++) { 180 brokers[i].addDestination(context, destination,createIfTemporary); 181 } 182 return result; 183 } 184 185 @Override 186 public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { 187 next.removeDestination(context, destination, timeout); 188 Broker brokers[] = getListeners(); 189 for (int i = 0; i < brokers.length; i++) { 190 brokers[i].removeDestination(context, destination, timeout); 191 } 192 } 193 194 @Override 195 public void start() throws Exception { 196 next.start(); 197 Broker brokers[] = getListeners(); 198 for (int i = 0; i < brokers.length; i++) { 199 brokers[i].start(); 200 } 201 } 202 203 @Override 204 public void stop() throws Exception { 205 next.stop(); 206 Broker brokers[] = getListeners(); 207 for (int i = 0; i < brokers.length; i++) { 208 brokers[i].stop(); 209 } 210 } 211 212 @Override 213 public void addSession(ConnectionContext context, SessionInfo info) throws Exception { 214 next.addSession(context, info); 215 Broker brokers[] = getListeners(); 216 for (int i = 0; i < brokers.length; i++) { 217 brokers[i].addSession(context, info); 218 } 219 } 220 221 @Override 222 public void removeSession(ConnectionContext context, SessionInfo info) throws Exception { 223 next.removeSession(context, info); 224 Broker brokers[] = getListeners(); 225 for (int i = 0; i < brokers.length; i++) { 226 brokers[i].removeSession(context, info); 227 } 228 } 229 230 @Override 231 public void gc() { 232 next.gc(); 233 Broker brokers[] = getListeners(); 234 for (int i = 0; i < brokers.length; i++) { 235 brokers[i].gc(); 236 } 237 } 238 239 @Override 240 public void addBroker(Connection connection, BrokerInfo info) { 241 next.addBroker(connection, info); 242 Broker brokers[] = getListeners(); 243 for (int i = 0; i < brokers.length; i++) { 244 brokers[i].addBroker(connection, info); 245 } 246 } 247 248 protected Broker[] getListeners() { 249 return listeners; 250 } 251 252 public synchronized void addListener(Broker broker) { 253 List<Broker> tmp = getListenersAsList(); 254 tmp.add(broker); 255 listeners = tmp.toArray(new Broker[tmp.size()]); 256 } 257 258 public synchronized void removeListener(Broker broker) { 259 List<Broker> tmp = getListenersAsList(); 260 tmp.remove(broker); 261 listeners = tmp.toArray(new Broker[tmp.size()]); 262 } 263 264 protected List<Broker> getListenersAsList() { 265 List<Broker> tmp = new ArrayList<Broker>(); 266 Broker brokers[] = getListeners(); 267 for (int i = 0; i < brokers.length; i++) { 268 tmp.add(brokers[i]); 269 } 270 return tmp; 271 } 272}