001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker.region.virtual; 018 019 import org.apache.activemq.broker.*; 020 import org.apache.activemq.broker.region.Destination; 021 import org.apache.activemq.broker.region.DestinationFilter; 022 import org.apache.activemq.broker.region.DestinationInterceptor; 023 import org.apache.activemq.command.ActiveMQDestination; 024 import org.apache.activemq.command.ActiveMQTopic; 025 import org.apache.activemq.command.Message; 026 import org.slf4j.Logger; 027 import org.slf4j.LoggerFactory; 028 029 /** 030 * Creates <a href="http://activemq.org/site/mirrored-queues.html">Mirrored 031 * Queue</a> using a prefix and postfix to define the topic name on which to mirror the queue to. 032 * 033 * 034 * @org.apache.xbean.XBean 035 */ 036 public class MirroredQueue implements DestinationInterceptor, BrokerServiceAware { 037 private static final transient Logger LOG = LoggerFactory.getLogger(MirroredQueue.class); 038 private String prefix = "VirtualTopic.Mirror."; 039 private String postfix = ""; 040 private boolean copyMessage = true; 041 private BrokerService brokerService; 042 043 public Destination intercept(final Destination destination) { 044 if (destination.getActiveMQDestination().isQueue()) { 045 if (!destination.getActiveMQDestination().isTemporary() || brokerService.isUseTempMirroredQueues()) { 046 try { 047 final Destination mirrorDestination = getMirrorDestination(destination); 048 if (mirrorDestination != null) { 049 return new DestinationFilter(destination) { 050 public void send(ProducerBrokerExchange context, Message message) throws Exception { 051 message.setDestination(mirrorDestination.getActiveMQDestination()); 052 mirrorDestination.send(context, message); 053 054 if (isCopyMessage()) { 055 message = message.copy(); 056 } 057 message.setDestination(destination.getActiveMQDestination()); 058 super.send(context, message); 059 } 060 }; 061 } 062 } 063 catch (Exception e) { 064 LOG.error("Failed to lookup the mirror destination for: " + destination + ". Reason: " + e, e); 065 } 066 } 067 } 068 return destination; 069 } 070 071 072 public void remove(Destination destination) { 073 if (brokerService == null) { 074 throw new IllegalArgumentException("No brokerService injected!"); 075 } 076 ActiveMQDestination topic = getMirrorTopic(destination.getActiveMQDestination()); 077 if (topic != null) { 078 try { 079 brokerService.removeDestination(topic); 080 } catch (Exception e) { 081 LOG.error("Failed to remove mirror destination for " + destination + ". Reason: " + e,e); 082 } 083 } 084 085 } 086 087 public void create(Broker broker, ConnectionContext context, ActiveMQDestination destination) {} 088 089 // Properties 090 // ------------------------------------------------------------------------- 091 092 public String getPostfix() { 093 return postfix; 094 } 095 096 /** 097 * Sets any postix used to identify the queue consumers 098 */ 099 public void setPostfix(String postfix) { 100 this.postfix = postfix; 101 } 102 103 public String getPrefix() { 104 return prefix; 105 } 106 107 /** 108 * Sets the prefix wildcard used to identify the queue consumers for a given 109 * topic 110 */ 111 public void setPrefix(String prefix) { 112 this.prefix = prefix; 113 } 114 115 public boolean isCopyMessage() { 116 return copyMessage; 117 } 118 119 /** 120 * Sets whether a copy of the message will be sent to each destination. 121 * Defaults to true so that the forward destination is set as the 122 * destination of the message 123 */ 124 public void setCopyMessage(boolean copyMessage) { 125 this.copyMessage = copyMessage; 126 } 127 128 public void setBrokerService(BrokerService brokerService) { 129 this.brokerService = brokerService; 130 } 131 132 // Implementation methods 133 //------------------------------------------------------------------------- 134 protected Destination getMirrorDestination(Destination destination) throws Exception { 135 if (brokerService == null) { 136 throw new IllegalArgumentException("No brokerService injected!"); 137 } 138 ActiveMQDestination topic = getMirrorTopic(destination.getActiveMQDestination()); 139 return brokerService.getDestination(topic); 140 } 141 142 protected ActiveMQDestination getMirrorTopic(ActiveMQDestination original) { 143 return new ActiveMQTopic(prefix + original.getPhysicalName() + postfix); 144 } 145 146 }