001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.util;
018
019import org.apache.activemq.broker.BrokerPluginSupport;
020import org.apache.activemq.broker.ProducerBrokerExchange;
021import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
022import org.apache.activemq.command.ActiveMQDestination;
023import org.apache.activemq.command.ActiveMQMessage;
024import org.apache.activemq.command.Message;
025import org.slf4j.Logger;
026import org.slf4j.LoggerFactory;
027
028/**
029 * A Broker interceptor which updates a JMS Client's timestamp on the message
030 * with a broker timestamp. Useful when the clocks on client machines are known
031 * to not be correct and you can only trust the time set on the broker machines.
032 *
033 * Enabling this plugin will break JMS compliance since the timestamp that the
034 * producer sees on the messages after as send() will be different from the
035 * timestamp the consumer will observe when he receives the message. This plugin
036 * is not enabled in the default ActiveMQ configuration.
037 *
038 * 2 new attributes have been added which will allow the administrator some override control
039 * over the expiration time for incoming messages:
040 *
041 * Attribute 'zeroExpirationOverride' can be used to apply an expiration
042 * time to incoming messages with no expiration defined (messages that would never expire)
043 *
044 * Attribute 'ttlCeiling' can be used to apply a limit to the expiration time
045 *
046 * @org.apache.xbean.XBean element="timeStampingBrokerPlugin"
047 *
048 *
049 */
050public class TimeStampingBrokerPlugin extends BrokerPluginSupport {
051    private static final Logger LOG = LoggerFactory.getLogger(TimeStampingBrokerPlugin.class);
052    /**
053    * variable which (when non-zero) is used to override
054    * the expiration date for messages that arrive with
055    * no expiration date set (in Milliseconds).
056    */
057    long zeroExpirationOverride = 0;
058
059    /**
060    * variable which (when non-zero) is used to limit
061    * the expiration date (in Milliseconds).
062    */
063    long ttlCeiling = 0;
064
065    /**
066     * If true, the plugin will not update timestamp to past values
067     * False by default
068     */
069    boolean futureOnly = false;
070
071
072    /**
073     * if true, update timestamp even if message has passed through a network
074     * default false
075     */
076    boolean processNetworkMessages = false;
077
078    /**
079    * setter method for zeroExpirationOverride
080    */
081    public void setZeroExpirationOverride(long ttl)
082    {
083        this.zeroExpirationOverride = ttl;
084    }
085
086    /**
087    * setter method for ttlCeiling
088    */
089    public void setTtlCeiling(long ttlCeiling)
090    {
091        this.ttlCeiling = ttlCeiling;
092    }
093
094    public void setFutureOnly(boolean futureOnly) {
095        this.futureOnly = futureOnly;
096    }
097
098    public void setProcessNetworkMessages(Boolean processNetworkMessages) {
099        this.processNetworkMessages = processNetworkMessages;
100    }
101
102    @Override
103    public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
104
105        if (message.getTimestamp() > 0 && !isDestinationDLQ(message) &&
106           (processNetworkMessages || (message.getBrokerPath() == null || message.getBrokerPath().length == 0))) {
107            // timestamp not been disabled and has not passed through a network or processNetworkMessages=true
108
109            long oldExpiration = message.getExpiration();
110            long newTimeStamp = System.currentTimeMillis();
111            long timeToLive = zeroExpirationOverride;
112            long oldTimestamp = message.getTimestamp();
113            if (oldExpiration > 0) {
114                timeToLive = oldExpiration - oldTimestamp;
115            }
116            if (timeToLive > 0 && ttlCeiling > 0 && timeToLive > ttlCeiling) {
117                timeToLive = ttlCeiling;
118            }
119            long expiration = timeToLive + newTimeStamp;
120            // In the scenario that the Broker is behind the clients we never want to set the
121            // Timestamp and Expiration in the past
122            if(!futureOnly || (expiration > oldExpiration)) {
123                if (timeToLive > 0 && expiration > 0) {
124                    message.setExpiration(expiration);
125                }
126                message.setTimestamp(newTimeStamp);
127                if (LOG.isDebugEnabled()) {
128                    LOG.debug("Set message " + message.getMessageId() + " timestamp from " + oldTimestamp + " to " + newTimeStamp);
129                }
130            }
131        }
132        super.send(producerExchange, message);
133    }
134
135    private boolean isDestinationDLQ(Message message) {
136        DeadLetterStrategy deadLetterStrategy;
137        Message tmp;
138
139        if (message != null && message.getRegionDestination() != null) {
140            deadLetterStrategy = message.getRegionDestination().getDeadLetterStrategy();
141            if (deadLetterStrategy != null) {
142                // Cheap copy, since we only need two fields
143                tmp = new ActiveMQMessage();
144                tmp.setDestination(message.getOriginalDestination());
145                tmp.setRegionDestination(message.getRegionDestination());
146
147                // Determine if we are headed for a DLQ
148                ActiveMQDestination deadLetterDestination = deadLetterStrategy.getDeadLetterQueueFor(tmp, null);
149                if (deadLetterDestination.equals(message.getDestination())) {
150                    return true;
151                }
152            }
153        }
154        return false;
155    }
156}