001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region.policy;
018
019import org.apache.activemq.broker.region.DurableTopicSubscription;
020import org.apache.activemq.broker.region.Subscription;
021import org.apache.activemq.command.ActiveMQDestination;
022import org.apache.activemq.command.ActiveMQQueue;
023import org.apache.activemq.command.ActiveMQTopic;
024import org.apache.activemq.command.Message;
025
026/**
027 * A {@link DeadLetterStrategy} where each destination has its own individual
028 * DLQ using the subject naming hierarchy.
029 *
030 * @org.apache.xbean.XBean
031 *
032 */
033public class IndividualDeadLetterStrategy extends AbstractDeadLetterStrategy {
034
035    private String topicPrefix = "ActiveMQ.DLQ.Topic.";
036    private String queuePrefix = "ActiveMQ.DLQ.Queue.";
037    private boolean useQueueForQueueMessages = true;
038    private boolean useQueueForTopicMessages = true;
039    private boolean destinationPerDurableSubscriber;
040
041    public ActiveMQDestination getDeadLetterQueueFor(Message message, Subscription subscription) {
042        if (message.getDestination().isQueue()) {
043            return createDestination(message, queuePrefix, useQueueForQueueMessages, subscription);
044        } else {
045            return createDestination(message, topicPrefix, useQueueForTopicMessages, subscription);
046        }
047    }
048
049    // Properties
050    // -------------------------------------------------------------------------
051
052    public String getQueuePrefix() {
053        return queuePrefix;
054    }
055
056    /**
057     * Sets the prefix to use for all dead letter queues for queue messages
058     */
059    public void setQueuePrefix(String queuePrefix) {
060        this.queuePrefix = queuePrefix;
061    }
062
063    public String getTopicPrefix() {
064        return topicPrefix;
065    }
066
067    /**
068     * Sets the prefix to use for all dead letter queues for topic messages
069     */
070    public void setTopicPrefix(String topicPrefix) {
071        this.topicPrefix = topicPrefix;
072    }
073
074    public boolean isUseQueueForQueueMessages() {
075        return useQueueForQueueMessages;
076    }
077
078    /**
079     * Sets whether a queue or topic should be used for queue messages sent to a
080     * DLQ. The default is to use a Queue
081     */
082    public void setUseQueueForQueueMessages(boolean useQueueForQueueMessages) {
083        this.useQueueForQueueMessages = useQueueForQueueMessages;
084    }
085
086    public boolean isUseQueueForTopicMessages() {
087        return useQueueForTopicMessages;
088    }
089
090    /**
091     * Sets whether a queue or topic should be used for topic messages sent to a
092     * DLQ. The default is to use a Queue
093     */
094    public void setUseQueueForTopicMessages(boolean useQueueForTopicMessages) {
095        this.useQueueForTopicMessages = useQueueForTopicMessages;
096    }
097
098    public boolean isDestinationPerDurableSubscriber() {
099        return destinationPerDurableSubscriber;
100    }
101
102    /**
103     * sets whether durable topic subscriptions are to get individual dead letter destinations.
104     * When true, the DLQ is of the form 'topicPrefix.clientId:subscriptionName'
105     * The default is false.
106     * @param destinationPerDurableSubscriber
107     */
108    public void setDestinationPerDurableSubscriber(boolean destinationPerDurableSubscriber) {
109        this.destinationPerDurableSubscriber = destinationPerDurableSubscriber;
110    }
111
112    // Implementation methods
113    // -------------------------------------------------------------------------
114    protected ActiveMQDestination createDestination(Message message,
115                                                    String prefix,
116                                                    boolean useQueue,
117                                                    Subscription subscription ) {
118        String name = null;
119
120        if (message.getRegionDestination() != null
121                && message.getRegionDestination().getActiveMQDestination() != null
122                && message.getRegionDestination().getActiveMQDestination().getPhysicalName() != null
123                && !message.getRegionDestination().getActiveMQDestination().getPhysicalName().isEmpty()){
124            name = prefix + message.getRegionDestination().getActiveMQDestination().getPhysicalName();
125        } else {
126            name = prefix + message.getDestination().getPhysicalName();
127        }
128
129        if (destinationPerDurableSubscriber && subscription instanceof DurableTopicSubscription) {
130            name += "." + ((DurableTopicSubscription)subscription).getSubscriptionKey();
131        }
132        if (useQueue) {
133            return new ActiveMQQueue(name);
134        } else {
135            return new ActiveMQTopic(name);
136        }
137    }
138}