001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker.region; 018 019 import java.util.ArrayList; 020 import java.util.HashMap; 021 import java.util.Iterator; 022 import java.util.List; 023 import java.util.Map; 024 import java.util.Set; 025 import java.util.concurrent.ConcurrentHashMap; 026 import java.util.concurrent.locks.ReentrantReadWriteLock; 027 028 import javax.jms.JMSException; 029 import org.apache.activemq.broker.ConnectionContext; 030 import org.apache.activemq.broker.ConsumerBrokerExchange; 031 import org.apache.activemq.broker.DestinationAlreadyExistsException; 032 import org.apache.activemq.broker.ProducerBrokerExchange; 033 import org.apache.activemq.broker.TransportConnection; 034 import org.apache.activemq.command.ActiveMQDestination; 035 import org.apache.activemq.command.ConsumerControl; 036 import org.apache.activemq.command.ConsumerId; 037 import org.apache.activemq.command.ConsumerInfo; 038 import org.apache.activemq.command.DestinationInfo; 039 import org.apache.activemq.command.Message; 040 import org.apache.activemq.command.MessageAck; 041 import org.apache.activemq.command.MessageDispatchNotification; 042 import org.apache.activemq.command.MessagePull; 043 import org.apache.activemq.command.ProducerInfo; 044 import org.apache.activemq.command.RemoveSubscriptionInfo; 045 import org.apache.activemq.command.Response; 046 import org.apache.activemq.filter.DestinationFilter; 047 import org.apache.activemq.filter.DestinationMap; 048 import org.apache.activemq.security.SecurityContext; 049 import org.apache.activemq.thread.TaskRunnerFactory; 050 import org.apache.activemq.usage.SystemUsage; 051 import org.slf4j.Logger; 052 import org.slf4j.LoggerFactory; 053 054 /** 055 * 056 */ 057 public abstract class AbstractRegion implements Region { 058 059 private static final Logger LOG = LoggerFactory.getLogger(AbstractRegion.class); 060 061 protected final Map<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>(); 062 protected final DestinationMap destinationMap = new DestinationMap(); 063 protected final Map<ConsumerId, Subscription> subscriptions = new ConcurrentHashMap<ConsumerId, Subscription>(); 064 protected final SystemUsage usageManager; 065 protected final DestinationFactory destinationFactory; 066 protected final DestinationStatistics destinationStatistics; 067 protected final RegionBroker broker; 068 protected boolean autoCreateDestinations = true; 069 protected final TaskRunnerFactory taskRunnerFactory; 070 protected final ReentrantReadWriteLock destinationsLock = new ReentrantReadWriteLock(); 071 protected final Map<ConsumerId, Object> consumerChangeMutexMap = new HashMap<ConsumerId, Object>(); 072 protected boolean started; 073 074 public AbstractRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, 075 TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { 076 if (broker == null) { 077 throw new IllegalArgumentException("null broker"); 078 } 079 this.broker = broker; 080 this.destinationStatistics = destinationStatistics; 081 this.usageManager = memoryManager; 082 this.taskRunnerFactory = taskRunnerFactory; 083 if (destinationFactory == null) { 084 throw new IllegalArgumentException("null destinationFactory"); 085 } 086 this.destinationFactory = destinationFactory; 087 } 088 089 public final void start() throws Exception { 090 started = true; 091 092 Set<ActiveMQDestination> inactiveDests = getInactiveDestinations(); 093 for (Iterator<ActiveMQDestination> iter = inactiveDests.iterator(); iter.hasNext();) { 094 ActiveMQDestination dest = iter.next(); 095 096 ConnectionContext context = new ConnectionContext(); 097 context.setBroker(broker.getBrokerService().getBroker()); 098 context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); 099 context.getBroker().addDestination(context, dest, false); 100 } 101 destinationsLock.readLock().lock(); 102 try{ 103 for (Iterator<Destination> i = destinations.values().iterator(); i.hasNext();) { 104 Destination dest = i.next(); 105 dest.start(); 106 } 107 } finally { 108 destinationsLock.readLock().unlock(); 109 } 110 } 111 112 public void stop() throws Exception { 113 started = false; 114 destinationsLock.readLock().lock(); 115 try{ 116 for (Iterator<Destination> i = destinations.values().iterator(); i.hasNext();) { 117 Destination dest = i.next(); 118 dest.stop(); 119 } 120 } finally { 121 destinationsLock.readLock().unlock(); 122 } 123 destinations.clear(); 124 } 125 126 public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, 127 boolean createIfTemporary) throws Exception { 128 129 destinationsLock.writeLock().lock(); 130 try { 131 Destination dest = destinations.get(destination); 132 if (dest == null) { 133 if (destination.isTemporary() == false || createIfTemporary) { 134 if (LOG.isDebugEnabled()) { 135 LOG.debug(broker.getBrokerName() + " adding destination: " + destination); 136 } 137 dest = createDestination(context, destination); 138 // intercept if there is a valid interceptor defined 139 DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor(); 140 if (destinationInterceptor != null) { 141 dest = destinationInterceptor.intercept(dest); 142 } 143 dest.start(); 144 destinations.put(destination, dest); 145 destinationMap.put(destination, dest); 146 addSubscriptionsForDestination(context, dest); 147 } 148 if (dest == null) { 149 throw new JMSException("The destination " + destination + " does not exist."); 150 } 151 } 152 return dest; 153 } finally { 154 destinationsLock.writeLock().unlock(); 155 } 156 } 157 158 public Map<ConsumerId, Subscription> getSubscriptions() { 159 return subscriptions; 160 } 161 162 protected List<Subscription> addSubscriptionsForDestination(ConnectionContext context, Destination dest) 163 throws Exception { 164 165 List<Subscription> rc = new ArrayList<Subscription>(); 166 // Add all consumers that are interested in the destination. 167 for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) { 168 Subscription sub = iter.next(); 169 if (sub.matches(dest.getActiveMQDestination())) { 170 dest.addSubscription(context, sub); 171 rc.add(sub); 172 } 173 } 174 return rc; 175 176 } 177 178 public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) 179 throws Exception { 180 181 // No timeout.. then try to shut down right way, fails if there are 182 // current subscribers. 183 if (timeout == 0) { 184 for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) { 185 Subscription sub = iter.next(); 186 if (sub.matches(destination)) { 187 throw new JMSException("Destination still has an active subscription: " + destination); 188 } 189 } 190 } 191 192 if (timeout > 0) { 193 // TODO: implement a way to notify the subscribers that we want to 194 // take the down 195 // the destination and that they should un-subscribe.. Then wait up 196 // to timeout time before 197 // dropping the subscription. 198 } 199 200 if (LOG.isDebugEnabled()) { 201 LOG.debug(broker.getBrokerName() + " removing destination: " + destination); 202 } 203 204 destinationsLock.writeLock().lock(); 205 try { 206 Destination dest = destinations.remove(destination); 207 if (dest != null) { 208 // timeout<0 or we timed out, we now force any remaining 209 // subscriptions to un-subscribe. 210 for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) { 211 Subscription sub = iter.next(); 212 if (sub.matches(destination)) { 213 dest.removeSubscription(context, sub, 0l); 214 } 215 } 216 destinationMap.removeAll(destination); 217 dispose(context, dest); 218 DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor(); 219 if (destinationInterceptor != null) { 220 destinationInterceptor.remove(dest); 221 } 222 223 } else { 224 if (LOG.isDebugEnabled()) { 225 LOG.debug("Cannot remove a destination that doesn't exist: " + destination); 226 } 227 } 228 } finally { 229 destinationsLock.writeLock().unlock(); 230 } 231 } 232 233 /** 234 * Provide an exact or wildcard lookup of destinations in the region 235 * 236 * @return a set of matching destination objects. 237 */ 238 @SuppressWarnings("unchecked") 239 public Set<Destination> getDestinations(ActiveMQDestination destination) { 240 destinationsLock.readLock().lock(); 241 try{ 242 return destinationMap.get(destination); 243 } finally { 244 destinationsLock.readLock().unlock(); 245 } 246 } 247 248 public Map<ActiveMQDestination, Destination> getDestinationMap() { 249 destinationsLock.readLock().lock(); 250 try{ 251 return destinations; 252 } finally { 253 destinationsLock.readLock().unlock(); 254 } 255 } 256 257 @SuppressWarnings("unchecked") 258 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 259 if (LOG.isDebugEnabled()) { 260 LOG.debug(broker.getBrokerName() + " adding consumer: " + info.getConsumerId() + " for destination: " 261 + info.getDestination()); 262 } 263 ActiveMQDestination destination = info.getDestination(); 264 if (destination != null && !destination.isPattern() && !destination.isComposite()) { 265 // lets auto-create the destination 266 lookup(context, destination,true); 267 } 268 269 Object addGuard; 270 synchronized (consumerChangeMutexMap) { 271 addGuard = consumerChangeMutexMap.get(info.getConsumerId()); 272 if (addGuard == null) { 273 addGuard = new Object(); 274 consumerChangeMutexMap.put(info.getConsumerId(), addGuard); 275 } 276 } 277 synchronized (addGuard) { 278 Subscription o = subscriptions.get(info.getConsumerId()); 279 if (o != null) { 280 LOG.warn("A duplicate subscription was detected. Clients may be misbehaving. Later warnings you may see about subscription removal are a consequence of this."); 281 return o; 282 } 283 284 // We may need to add some destinations that are in persistent store 285 // but not active 286 // in the broker. 287 // 288 // TODO: think about this a little more. This is good cause 289 // destinations are not loaded into 290 // memory until a client needs to use the queue, but a management 291 // agent viewing the 292 // broker will not see a destination that exists in persistent 293 // store. We may want to 294 // eagerly load all destinations into the broker but have an 295 // inactive state for the 296 // destination which has reduced memory usage. 297 // 298 DestinationFilter.parseFilter(info.getDestination()); 299 300 Subscription sub = createSubscription(context, info); 301 302 subscriptions.put(info.getConsumerId(), sub); 303 304 // At this point we're done directly manipulating subscriptions, 305 // but we need to retain the synchronized block here. Consider 306 // otherwise what would happen if at this point a second 307 // thread added, then removed, as would be allowed with 308 // no mutex held. Remove is only essentially run once 309 // so everything after this point would be leaked. 310 311 // Add the subscription to all the matching queues. 312 // But copy the matches first - to prevent deadlocks 313 List<Destination> addList = new ArrayList<Destination>(); 314 destinationsLock.readLock().lock(); 315 try { 316 for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) { 317 addList.add(dest); 318 } 319 } finally { 320 destinationsLock.readLock().unlock(); 321 } 322 323 for (Destination dest : addList) { 324 dest.addSubscription(context, sub); 325 } 326 327 if (info.isBrowser()) { 328 ((QueueBrowserSubscription) sub).destinationsAdded(); 329 } 330 331 return sub; 332 } 333 } 334 335 /** 336 * Get all the Destinations that are in storage 337 * 338 * @return Set of all stored destinations 339 */ 340 @SuppressWarnings("rawtypes") 341 public Set getDurableDestinations() { 342 return destinationFactory.getDestinations(); 343 } 344 345 /** 346 * @return all Destinations that don't have active consumers 347 */ 348 protected Set<ActiveMQDestination> getInactiveDestinations() { 349 Set<ActiveMQDestination> inactiveDests = destinationFactory.getDestinations(); 350 destinationsLock.readLock().lock(); 351 try { 352 inactiveDests.removeAll(destinations.keySet()); 353 } finally { 354 destinationsLock.readLock().unlock(); 355 } 356 return inactiveDests; 357 } 358 359 @SuppressWarnings("unchecked") 360 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 361 if (LOG.isDebugEnabled()) { 362 LOG.debug(broker.getBrokerName() + " removing consumer: " + info.getConsumerId() + " for destination: " 363 + info.getDestination()); 364 } 365 366 Subscription sub = subscriptions.remove(info.getConsumerId()); 367 // The sub could be removed elsewhere - see ConnectionSplitBroker 368 if (sub != null) { 369 370 // remove the subscription from all the matching queues. 371 List<Destination> removeList = new ArrayList<Destination>(); 372 destinationsLock.readLock().lock(); 373 try { 374 for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) { 375 removeList.add(dest); 376 } 377 } finally { 378 destinationsLock.readLock().unlock(); 379 } 380 for (Destination dest : removeList) { 381 dest.removeSubscription(context, sub, info.getLastDeliveredSequenceId()); 382 } 383 384 destroySubscription(sub); 385 } 386 synchronized (consumerChangeMutexMap) { 387 consumerChangeMutexMap.remove(info.getConsumerId()); 388 } 389 } 390 391 protected void destroySubscription(Subscription sub) { 392 sub.destroy(); 393 } 394 395 public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { 396 throw new JMSException("Invalid operation."); 397 } 398 399 public void send(final ProducerBrokerExchange producerExchange, Message messageSend) throws Exception { 400 final ConnectionContext context = producerExchange.getConnectionContext(); 401 402 if (producerExchange.isMutable() || producerExchange.getRegionDestination() == null) { 403 final Destination regionDestination = lookup(context, messageSend.getDestination(),false); 404 producerExchange.setRegionDestination(regionDestination); 405 } 406 407 producerExchange.getRegionDestination().send(producerExchange, messageSend); 408 } 409 410 public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception { 411 Subscription sub = consumerExchange.getSubscription(); 412 if (sub == null) { 413 sub = subscriptions.get(ack.getConsumerId()); 414 if (sub == null) { 415 if (!consumerExchange.getConnectionContext().isInRecoveryMode()) { 416 LOG.warn("Ack for non existent subscription, ack:" + ack); 417 throw new IllegalArgumentException("The subscription does not exist: " + ack.getConsumerId()); 418 } else { 419 if (LOG.isDebugEnabled()) { 420 LOG.debug("Ack for non existent subscription in recovery, ack:" + ack); 421 } 422 return; 423 } 424 } 425 consumerExchange.setSubscription(sub); 426 } 427 sub.acknowledge(consumerExchange.getConnectionContext(), ack); 428 } 429 430 public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception { 431 Subscription sub = subscriptions.get(pull.getConsumerId()); 432 if (sub == null) { 433 throw new IllegalArgumentException("The subscription does not exist: " + pull.getConsumerId()); 434 } 435 return sub.pullMessage(context, pull); 436 } 437 438 protected Destination lookup(ConnectionContext context, ActiveMQDestination destination,boolean createTemporary) throws Exception { 439 Destination dest = null; 440 441 destinationsLock.readLock().lock(); 442 try { 443 dest = destinations.get(destination); 444 } finally { 445 destinationsLock.readLock().unlock(); 446 } 447 448 if (dest == null) { 449 if (isAutoCreateDestinations()) { 450 // Try to auto create the destination... re-invoke broker 451 // from the 452 // top so that the proper security checks are performed. 453 try { 454 context.getBroker().addDestination(context, destination, createTemporary); 455 dest = addDestination(context, destination, false); 456 } catch (DestinationAlreadyExistsException e) { 457 // if the destination already exists then lets ignore 458 // this error 459 } 460 // We should now have the dest created. 461 destinationsLock.readLock().lock(); 462 try { 463 dest = destinations.get(destination); 464 } finally { 465 destinationsLock.readLock().unlock(); 466 } 467 } 468 469 if (dest == null) { 470 throw new JMSException("The destination " + destination + " does not exist."); 471 } 472 } 473 return dest; 474 } 475 476 public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { 477 Subscription sub = subscriptions.get(messageDispatchNotification.getConsumerId()); 478 if (sub != null) { 479 sub.processMessageDispatchNotification(messageDispatchNotification); 480 } else { 481 throw new JMSException("Slave broker out of sync with master - Subscription: " 482 + messageDispatchNotification.getConsumerId() + " on " 483 + messageDispatchNotification.getDestination() + " does not exist for dispatch of message: " 484 + messageDispatchNotification.getMessageId()); 485 } 486 } 487 488 /* 489 * For a Queue/TempQueue, dispatch order is imperative to match acks, so the 490 * dispatch is deferred till the notification to ensure that the 491 * subscription chosen by the master is used. AMQ-2102 492 */ 493 protected void processDispatchNotificationViaDestination(MessageDispatchNotification messageDispatchNotification) 494 throws Exception { 495 Destination dest = null; 496 destinationsLock.readLock().lock(); 497 try { 498 dest = destinations.get(messageDispatchNotification.getDestination()); 499 } finally { 500 destinationsLock.readLock().unlock(); 501 } 502 503 if (dest != null) { 504 dest.processDispatchNotification(messageDispatchNotification); 505 } else { 506 throw new JMSException("Slave broker out of sync with master - Destination: " 507 + messageDispatchNotification.getDestination() + " does not exist for consumer " 508 + messageDispatchNotification.getConsumerId() + " with message: " 509 + messageDispatchNotification.getMessageId()); 510 } 511 } 512 513 public void gc() { 514 for (Subscription sub : subscriptions.values()) { 515 sub.gc(); 516 } 517 518 destinationsLock.readLock().lock(); 519 try { 520 for (Destination dest : destinations.values()) { 521 dest.gc(); 522 } 523 } finally { 524 destinationsLock.readLock().unlock(); 525 } 526 } 527 528 protected abstract Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws Exception; 529 530 protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) 531 throws Exception { 532 return destinationFactory.createDestination(context, destination, destinationStatistics); 533 } 534 535 public boolean isAutoCreateDestinations() { 536 return autoCreateDestinations; 537 } 538 539 public void setAutoCreateDestinations(boolean autoCreateDestinations) { 540 this.autoCreateDestinations = autoCreateDestinations; 541 } 542 543 @SuppressWarnings("unchecked") 544 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { 545 destinationsLock.readLock().lock(); 546 try { 547 for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) { 548 dest.addProducer(context, info); 549 } 550 } finally { 551 destinationsLock.readLock().unlock(); 552 } 553 } 554 555 /** 556 * Removes a Producer. 557 * 558 * @param context 559 * the environment the operation is being executed under. 560 * @throws Exception 561 * TODO 562 */ 563 @SuppressWarnings("unchecked") 564 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { 565 destinationsLock.readLock().lock(); 566 try { 567 for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) { 568 dest.removeProducer(context, info); 569 } 570 } finally { 571 destinationsLock.readLock().unlock(); 572 } 573 } 574 575 protected void dispose(ConnectionContext context, Destination dest) throws Exception { 576 dest.dispose(context); 577 dest.stop(); 578 destinationFactory.removeDestination(dest); 579 } 580 581 public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control) { 582 Subscription sub = subscriptions.get(control.getConsumerId()); 583 if (sub != null && sub instanceof AbstractSubscription) { 584 ((AbstractSubscription) sub).setPrefetchSize(control.getPrefetch()); 585 if (LOG.isDebugEnabled()) { 586 LOG.debug("setting prefetch: " + control.getPrefetch() + ", on subscription: " 587 + control.getConsumerId()); 588 } 589 try { 590 lookup(consumerExchange.getConnectionContext(), control.getDestination(),false).wakeup(); 591 } catch (Exception e) { 592 LOG.warn("failed to deliver consumerControl to destination: " + control.getDestination(), e); 593 } 594 } 595 } 596 }