001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region.policy; 018 019import org.apache.activemq.broker.region.DurableTopicSubscription; 020import org.apache.activemq.broker.region.Subscription; 021import org.apache.activemq.command.ActiveMQDestination; 022import org.apache.activemq.command.ActiveMQQueue; 023import org.apache.activemq.command.ActiveMQTopic; 024import org.apache.activemq.command.Message; 025 026/** 027 * A {@link DeadLetterStrategy} where each destination has its own individual 028 * DLQ using the subject naming hierarchy. 029 * 030 * @org.apache.xbean.XBean 031 * 032 */ 033public class IndividualDeadLetterStrategy extends AbstractDeadLetterStrategy { 034 035 private String topicPrefix = "ActiveMQ.DLQ.Topic."; 036 private String queuePrefix = "ActiveMQ.DLQ.Queue."; 037 private boolean useQueueForQueueMessages = true; 038 private boolean useQueueForTopicMessages = true; 039 private boolean destinationPerDurableSubscriber; 040 041 public ActiveMQDestination getDeadLetterQueueFor(Message message, Subscription subscription) { 042 if (message.getDestination().isQueue()) { 043 return createDestination(message, queuePrefix, useQueueForQueueMessages, subscription); 044 } else { 045 return createDestination(message, topicPrefix, useQueueForTopicMessages, subscription); 046 } 047 } 048 049 // Properties 050 // ------------------------------------------------------------------------- 051 052 public String getQueuePrefix() { 053 return queuePrefix; 054 } 055 056 /** 057 * Sets the prefix to use for all dead letter queues for queue messages 058 */ 059 public void setQueuePrefix(String queuePrefix) { 060 this.queuePrefix = queuePrefix; 061 } 062 063 public String getTopicPrefix() { 064 return topicPrefix; 065 } 066 067 /** 068 * Sets the prefix to use for all dead letter queues for topic messages 069 */ 070 public void setTopicPrefix(String topicPrefix) { 071 this.topicPrefix = topicPrefix; 072 } 073 074 public boolean isUseQueueForQueueMessages() { 075 return useQueueForQueueMessages; 076 } 077 078 /** 079 * Sets whether a queue or topic should be used for queue messages sent to a 080 * DLQ. The default is to use a Queue 081 */ 082 public void setUseQueueForQueueMessages(boolean useQueueForQueueMessages) { 083 this.useQueueForQueueMessages = useQueueForQueueMessages; 084 } 085 086 public boolean isUseQueueForTopicMessages() { 087 return useQueueForTopicMessages; 088 } 089 090 /** 091 * Sets whether a queue or topic should be used for topic messages sent to a 092 * DLQ. The default is to use a Queue 093 */ 094 public void setUseQueueForTopicMessages(boolean useQueueForTopicMessages) { 095 this.useQueueForTopicMessages = useQueueForTopicMessages; 096 } 097 098 public boolean isDestinationPerDurableSubscriber() { 099 return destinationPerDurableSubscriber; 100 } 101 102 /** 103 * sets whether durable topic subscriptions are to get individual dead letter destinations. 104 * When true, the DLQ is of the form 'topicPrefix.clientId:subscriptionName' 105 * The default is false. 106 * @param destinationPerDurableSubscriber 107 */ 108 public void setDestinationPerDurableSubscriber(boolean destinationPerDurableSubscriber) { 109 this.destinationPerDurableSubscriber = destinationPerDurableSubscriber; 110 } 111 112 // Implementation methods 113 // ------------------------------------------------------------------------- 114 protected ActiveMQDestination createDestination(Message message, 115 String prefix, 116 boolean useQueue, 117 Subscription subscription ) { 118 String name = null; 119 120 if (message.getRegionDestination() != null 121 && message.getRegionDestination().getActiveMQDestination() != null 122 && message.getRegionDestination().getActiveMQDestination().getPhysicalName() != null 123 && !message.getRegionDestination().getActiveMQDestination().getPhysicalName().isEmpty()){ 124 name = prefix + message.getRegionDestination().getActiveMQDestination().getPhysicalName(); 125 } else { 126 name = prefix + message.getDestination().getPhysicalName(); 127 } 128 129 if (destinationPerDurableSubscriber && subscription instanceof DurableTopicSubscription) { 130 name += "." + ((DurableTopicSubscription)subscription).getSubscriptionKey(); 131 } 132 if (useQueue) { 133 return new ActiveMQQueue(name); 134 } else { 135 return new ActiveMQTopic(name); 136 } 137 } 138}