001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.network; 018 019 import java.io.IOException; 020 import java.util.Iterator; 021 022 import org.apache.activemq.command.ActiveMQDestination; 023 import org.apache.activemq.command.ConsumerId; 024 import org.apache.activemq.command.ConsumerInfo; 025 import org.apache.activemq.filter.DestinationFilter; 026 import org.apache.activemq.transport.Transport; 027 import org.slf4j.Logger; 028 import org.slf4j.LoggerFactory; 029 030 /** 031 * Consolidates subscriptions 032 * 033 * 034 */ 035 public class DurableConduitBridge extends ConduitBridge { 036 private static final Logger LOG = LoggerFactory.getLogger(DurableConduitBridge.class); 037 038 /** 039 * Constructor 040 * 041 * @param configuration 042 * 043 * @param localBroker 044 * @param remoteBroker 045 */ 046 public DurableConduitBridge(NetworkBridgeConfiguration configuration, Transport localBroker, 047 Transport remoteBroker) { 048 super(configuration, localBroker, remoteBroker); 049 } 050 051 /** 052 * Subscriptions for these destinations are always created 053 * 054 */ 055 protected void setupStaticDestinations() { 056 super.setupStaticDestinations(); 057 ActiveMQDestination[] dests = configuration.isDynamicOnly() ? null : durableDestinations; 058 if (dests != null) { 059 for (int i = 0; i < dests.length; i++) { 060 ActiveMQDestination dest = dests[i]; 061 if (isPermissableDestination(dest) && !doesConsumerExist(dest)) { 062 DemandSubscription sub = createDemandSubscription(dest); 063 if (dest.isTopic()) { 064 sub.getLocalInfo().setSubscriptionName(getSubscriberName(dest)); 065 } 066 try { 067 addSubscription(sub); 068 } catch (IOException e) { 069 LOG.error("Failed to add static destination " + dest, e); 070 } 071 if (LOG.isTraceEnabled()) { 072 LOG.trace("Forwarding messages for durable destination: " + dest); 073 } 074 } 075 } 076 } 077 } 078 079 protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException { 080 if (addToAlreadyInterestedConsumers(info)) { 081 return null; // don't want this subscription added 082 } 083 //add our original id to ourselves 084 info.addNetworkConsumerId(info.getConsumerId()); 085 086 if (info.isDurable()) { 087 // set the subscriber name to something reproducible 088 info.setSubscriptionName(getSubscriberName(info.getDestination())); 089 // and override the consumerId with something unique so that it won't 090 // be removed if the durable subscriber (at the other end) goes away 091 info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator 092 .getNextSequenceId())); 093 } 094 info.setSelector(null); 095 return doCreateDemandSubscription(info); 096 } 097 098 protected String getSubscriberName(ActiveMQDestination dest) { 099 String subscriberName = DURABLE_SUB_PREFIX + configuration.getBrokerName() + "_" + dest.getPhysicalName(); 100 return subscriberName; 101 } 102 103 protected boolean doesConsumerExist(ActiveMQDestination dest) { 104 DestinationFilter filter = DestinationFilter.parseFilter(dest); 105 for (Iterator i = subscriptionMapByLocalId.values().iterator(); i.hasNext();) { 106 DemandSubscription ds = (DemandSubscription)i.next(); 107 if (filter.matches(ds.getLocalInfo().getDestination())) { 108 return true; 109 } 110 } 111 return false; 112 } 113 }