001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.plugin; 018 019import java.util.regex.Pattern; 020import org.apache.activemq.broker.Broker; 021import org.apache.activemq.broker.BrokerFilter; 022import org.apache.activemq.broker.ConnectionContext; 023import org.apache.activemq.broker.region.MessageReference; 024import org.apache.activemq.broker.region.Subscription; 025import org.apache.activemq.command.ActiveMQDestination; 026import org.apache.activemq.command.Message; 027import org.slf4j.Logger; 028import org.slf4j.LoggerFactory; 029 030/** 031 * @author Filip Hanik 032 * @version 1.0 033 */ 034public class DiscardingDLQBroker extends BrokerFilter { 035 public static Logger log = LoggerFactory.getLogger(DiscardingDLQBroker.class); 036 private boolean dropTemporaryTopics = true; 037 private boolean dropTemporaryQueues = true; 038 private boolean dropAll = true; 039 private Pattern[] destFilter; 040 private int reportInterval = 1000; 041 private long dropCount = 0; 042 043 public DiscardingDLQBroker(Broker next) { 044 super(next); 045 } 046 047 @Override 048 public void sendToDeadLetterQueue(ConnectionContext ctx, MessageReference msgRef, 049 Subscription subscription) { 050 if (log.isTraceEnabled()) { 051 log.trace("Discarding DLQ BrokerFilter[pass through] - skipping message:" + (msgRef != null ? msgRef.getMessage() : null)); 052 } 053 boolean dropped = true; 054 Message msg = null; 055 ActiveMQDestination dest = null; 056 String destName = null; 057 msg = msgRef.getMessage(); 058 dest = msg.getDestination(); 059 destName = dest.getPhysicalName(); 060 061 if (dest == null || destName == null ) { 062 //do nothing, no need to forward it 063 skipMessage("NULL DESTINATION",msgRef); 064 } else if (dropAll) { 065 //do nothing 066 skipMessage("dropAll",msgRef); 067 } else if (dropTemporaryTopics && dest.isTemporary() && dest.isTopic()) { 068 //do nothing 069 skipMessage("dropTemporaryTopics",msgRef); 070 } else if (dropTemporaryQueues && dest.isTemporary() && dest.isQueue()) { 071 //do nothing 072 skipMessage("dropTemporaryQueues",msgRef); 073 } else if (destFilter!=null && matches(destName)) { 074 //do nothing 075 skipMessage("dropOnly",msgRef); 076 } else { 077 dropped = false; 078 next.sendToDeadLetterQueue(ctx, msgRef, subscription); 079 } 080 if (dropped && getReportInterval()>0) { 081 if ((++dropCount)%getReportInterval() == 0 ) { 082 log.info("Total of "+dropCount+" messages were discarded, since their destination was the dead letter queue"); 083 } 084 } 085 } 086 087 public boolean matches(String destName) { 088 for (int i=0; destFilter!=null && i<destFilter.length; i++) { 089 if (destFilter[i]!=null && destFilter[i].matcher(destName).matches()) { 090 return true; 091 } 092 } 093 return false; 094 } 095 096 private void skipMessage(String prefix, MessageReference msgRef) { 097 if (log.isDebugEnabled()) { 098 String lmsg = "Discarding DLQ BrokerFilter["+prefix+"] - skipping message:" + (msgRef!=null?msgRef.getMessage():null); 099 log.debug(lmsg); 100 } 101 } 102 103 public void setDropTemporaryTopics(boolean dropTemporaryTopics) { 104 this.dropTemporaryTopics = dropTemporaryTopics; 105 } 106 107 public void setDropTemporaryQueues(boolean dropTemporaryQueues) { 108 this.dropTemporaryQueues = dropTemporaryQueues; 109 } 110 111 public void setDropAll(boolean dropAll) { 112 this.dropAll = dropAll; 113 } 114 115 public void setDestFilter(Pattern[] destFilter) { 116 this.destFilter = destFilter; 117 } 118 119 public void setReportInterval(int reportInterval) { 120 this.reportInterval = reportInterval; 121 } 122 123 public boolean isDropTemporaryTopics() { 124 return dropTemporaryTopics; 125 } 126 127 public boolean isDropTemporaryQueues() { 128 return dropTemporaryQueues; 129 } 130 131 public boolean isDropAll() { 132 return dropAll; 133 } 134 135 public Pattern[] getDestFilter() { 136 return destFilter; 137 } 138 139 public int getReportInterval() { 140 return reportInterval; 141 } 142 143}