001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region.policy;
018    
019    import org.apache.activemq.broker.region.DurableTopicSubscription;
020    import org.apache.activemq.broker.region.Subscription;
021    import org.apache.activemq.command.ActiveMQDestination;
022    import org.apache.activemq.command.ActiveMQQueue;
023    import org.apache.activemq.command.ActiveMQTopic;
024    import org.apache.activemq.command.Message;
025    
026    /**
027     * A {@link DeadLetterStrategy} where each destination has its own individual
028     * DLQ using the subject naming hierarchy.
029     *
030     * @org.apache.xbean.XBean
031     *
032     */
033    public class IndividualDeadLetterStrategy extends AbstractDeadLetterStrategy {
034    
035        private String topicPrefix = "ActiveMQ.DLQ.Topic.";
036        private String queuePrefix = "ActiveMQ.DLQ.Queue.";
037        private boolean useQueueForQueueMessages = true;
038        private boolean useQueueForTopicMessages = true;
039        private boolean destinationPerDurableSubscriber;
040    
041        public ActiveMQDestination getDeadLetterQueueFor(Message message, Subscription subscription) {
042            if (message.getDestination().isQueue()) {
043                return createDestination(message, queuePrefix, useQueueForQueueMessages, subscription);
044            } else {
045                return createDestination(message, topicPrefix, useQueueForTopicMessages, subscription);
046            }
047        }
048    
049        // Properties
050        // -------------------------------------------------------------------------
051    
052        public String getQueuePrefix() {
053            return queuePrefix;
054        }
055    
056        /**
057         * Sets the prefix to use for all dead letter queues for queue messages
058         */
059        public void setQueuePrefix(String queuePrefix) {
060            this.queuePrefix = queuePrefix;
061        }
062    
063        public String getTopicPrefix() {
064            return topicPrefix;
065        }
066    
067        /**
068         * Sets the prefix to use for all dead letter queues for topic messages
069         */
070        public void setTopicPrefix(String topicPrefix) {
071            this.topicPrefix = topicPrefix;
072        }
073    
074        public boolean isUseQueueForQueueMessages() {
075            return useQueueForQueueMessages;
076        }
077    
078        /**
079         * Sets whether a queue or topic should be used for queue messages sent to a
080         * DLQ. The default is to use a Queue
081         */
082        public void setUseQueueForQueueMessages(boolean useQueueForQueueMessages) {
083            this.useQueueForQueueMessages = useQueueForQueueMessages;
084        }
085    
086        public boolean isUseQueueForTopicMessages() {
087            return useQueueForTopicMessages;
088        }
089    
090        /**
091         * Sets whether a queue or topic should be used for topic messages sent to a
092         * DLQ. The default is to use a Queue
093         */
094        public void setUseQueueForTopicMessages(boolean useQueueForTopicMessages) {
095            this.useQueueForTopicMessages = useQueueForTopicMessages;
096        }
097    
098        public boolean isDestinationPerDurableSubscriber() {
099            return destinationPerDurableSubscriber;
100        }
101    
102        /**
103         * sets whether durable topic subscriptions are to get individual dead letter destinations.
104         * When true, the DLQ is of the form 'topicPrefix.clientId:subscriptionName'
105         * The default is false.
106         * @param destinationPerDurableSubscriber
107         */
108        public void setDestinationPerDurableSubscriber(boolean destinationPerDurableSubscriber) {
109            this.destinationPerDurableSubscriber = destinationPerDurableSubscriber;
110        }
111    
112        // Implementation methods
113        // -------------------------------------------------------------------------
114        protected ActiveMQDestination createDestination(Message message,
115                                                        String prefix,
116                                                        boolean useQueue,
117                                                        Subscription subscription ) {
118            String name = null;
119    
120            if (message.getRegionDestination() != null
121                    && message.getRegionDestination().getActiveMQDestination() != null
122                    && message.getRegionDestination().getActiveMQDestination().getPhysicalName() != null
123                    && !message.getRegionDestination().getActiveMQDestination().getPhysicalName().isEmpty()){
124                name = prefix + message.getRegionDestination().getActiveMQDestination().getPhysicalName();
125            } else {
126                name = prefix + message.getDestination().getPhysicalName();
127            }
128    
129            if (destinationPerDurableSubscriber && subscription instanceof DurableTopicSubscription) {
130                name += "." + ((DurableTopicSubscription)subscription).getSubscriptionKey();
131            }
132            if (useQueue) {
133                return new ActiveMQQueue(name);
134            } else {
135                return new ActiveMQTopic(name);
136            }
137        }
138    }