001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region.virtual; 018 019import java.util.Collection; 020import java.util.Iterator; 021 022import org.apache.activemq.broker.ProducerBrokerExchange; 023import org.apache.activemq.broker.region.Destination; 024import org.apache.activemq.broker.region.DestinationFilter; 025import org.apache.activemq.command.ActiveMQDestination; 026import org.apache.activemq.command.Message; 027import org.apache.activemq.filter.MessageEvaluationContext; 028import org.apache.activemq.filter.NonCachedMessageEvaluationContext; 029 030/** 031 * Represents a composite {@link Destination} where send()s are replicated to 032 * each Destination instance. 033 * 034 * 035 */ 036public class CompositeDestinationFilter extends DestinationFilter { 037 038 private Collection forwardDestinations; 039 private boolean forwardOnly; 040 private boolean copyMessage; 041 042 public CompositeDestinationFilter(Destination next, Collection forwardDestinations, boolean forwardOnly, boolean copyMessage) { 043 super(next); 044 this.forwardDestinations = forwardDestinations; 045 this.forwardOnly = forwardOnly; 046 this.copyMessage = copyMessage; 047 } 048 049 public void send(ProducerBrokerExchange context, Message message) throws Exception { 050 MessageEvaluationContext messageContext = null; 051 052 for (Iterator iter = forwardDestinations.iterator(); iter.hasNext();) { 053 ActiveMQDestination destination = null; 054 Object value = iter.next(); 055 056 if (value instanceof FilteredDestination) { 057 FilteredDestination filteredDestination = (FilteredDestination)value; 058 if (messageContext == null) { 059 messageContext = new NonCachedMessageEvaluationContext(); 060 messageContext.setMessageReference(message); 061 } 062 messageContext.setDestination(filteredDestination.getDestination()); 063 if (filteredDestination.matches(messageContext)) { 064 destination = filteredDestination.getDestination(); 065 } 066 } else if (value instanceof ActiveMQDestination) { 067 destination = (ActiveMQDestination)value; 068 } 069 if (destination == null) { 070 continue; 071 } 072 073 Message forwarded_message; 074 if (copyMessage) { 075 forwarded_message = message.copy(); 076 forwarded_message.setDestination(destination); 077 } 078 else { 079 forwarded_message = message; 080 } 081 082 send(context, forwarded_message, destination); 083 } 084 if (!forwardOnly) { 085 super.send(context, message); 086 } 087 } 088}