001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.util; 018 019import org.apache.activemq.broker.BrokerPluginSupport; 020import org.apache.activemq.broker.ProducerBrokerExchange; 021import org.apache.activemq.broker.region.policy.DeadLetterStrategy; 022import org.apache.activemq.command.ActiveMQDestination; 023import org.apache.activemq.command.ActiveMQMessage; 024import org.apache.activemq.command.Message; 025import org.slf4j.Logger; 026import org.slf4j.LoggerFactory; 027 028/** 029 * A Broker interceptor which updates a JMS Client's timestamp on the message 030 * with a broker timestamp. Useful when the clocks on client machines are known 031 * to not be correct and you can only trust the time set on the broker machines. 032 * 033 * Enabling this plugin will break JMS compliance since the timestamp that the 034 * producer sees on the messages after as send() will be different from the 035 * timestamp the consumer will observe when he receives the message. This plugin 036 * is not enabled in the default ActiveMQ configuration. 037 * 038 * 2 new attributes have been added which will allow the administrator some override control 039 * over the expiration time for incoming messages: 040 * 041 * Attribute 'zeroExpirationOverride' can be used to apply an expiration 042 * time to incoming messages with no expiration defined (messages that would never expire) 043 * 044 * Attribute 'ttlCeiling' can be used to apply a limit to the expiration time 045 * 046 * @org.apache.xbean.XBean element="timeStampingBrokerPlugin" 047 * 048 * 049 */ 050public class TimeStampingBrokerPlugin extends BrokerPluginSupport { 051 private static final Logger LOG = LoggerFactory.getLogger(TimeStampingBrokerPlugin.class); 052 /** 053 * variable which (when non-zero) is used to override 054 * the expiration date for messages that arrive with 055 * no expiration date set (in Milliseconds). 056 */ 057 long zeroExpirationOverride = 0; 058 059 /** 060 * variable which (when non-zero) is used to limit 061 * the expiration date (in Milliseconds). 062 */ 063 long ttlCeiling = 0; 064 065 /** 066 * If true, the plugin will not update timestamp to past values 067 * False by default 068 */ 069 boolean futureOnly = false; 070 071 072 /** 073 * if true, update timestamp even if message has passed through a network 074 * default false 075 */ 076 boolean processNetworkMessages = false; 077 078 /** 079 * setter method for zeroExpirationOverride 080 */ 081 public void setZeroExpirationOverride(long ttl) 082 { 083 this.zeroExpirationOverride = ttl; 084 } 085 086 /** 087 * setter method for ttlCeiling 088 */ 089 public void setTtlCeiling(long ttlCeiling) 090 { 091 this.ttlCeiling = ttlCeiling; 092 } 093 094 public void setFutureOnly(boolean futureOnly) { 095 this.futureOnly = futureOnly; 096 } 097 098 public void setProcessNetworkMessages(Boolean processNetworkMessages) { 099 this.processNetworkMessages = processNetworkMessages; 100 } 101 102 @Override 103 public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception { 104 105 if (message.getTimestamp() > 0 && !isDestinationDLQ(message) && 106 (processNetworkMessages || (message.getBrokerPath() == null || message.getBrokerPath().length == 0))) { 107 // timestamp not been disabled and has not passed through a network or processNetworkMessages=true 108 109 long oldExpiration = message.getExpiration(); 110 long newTimeStamp = System.currentTimeMillis(); 111 long timeToLive = zeroExpirationOverride; 112 long oldTimestamp = message.getTimestamp(); 113 if (oldExpiration > 0) { 114 timeToLive = oldExpiration - oldTimestamp; 115 } 116 if (timeToLive > 0 && ttlCeiling > 0 && timeToLive > ttlCeiling) { 117 timeToLive = ttlCeiling; 118 } 119 long expiration = timeToLive + newTimeStamp; 120 // In the scenario that the Broker is behind the clients we never want to set the 121 // Timestamp and Expiration in the past 122 if(!futureOnly || (expiration > oldExpiration)) { 123 if (timeToLive > 0 && expiration > 0) { 124 message.setExpiration(expiration); 125 } 126 message.setTimestamp(newTimeStamp); 127 if (LOG.isDebugEnabled()) { 128 LOG.debug("Set message " + message.getMessageId() + " timestamp from " + oldTimestamp + " to " + newTimeStamp); 129 } 130 } 131 } 132 super.send(producerExchange, message); 133 } 134 135 private boolean isDestinationDLQ(Message message) { 136 DeadLetterStrategy deadLetterStrategy; 137 Message tmp; 138 139 if (message != null && message.getRegionDestination() != null) { 140 deadLetterStrategy = message.getRegionDestination().getDeadLetterStrategy(); 141 if (deadLetterStrategy != null) { 142 // Cheap copy, since we only need two fields 143 tmp = new ActiveMQMessage(); 144 tmp.setDestination(message.getOriginalDestination()); 145 tmp.setRegionDestination(message.getRegionDestination()); 146 147 // Determine if we are headed for a DLQ 148 ActiveMQDestination deadLetterDestination = deadLetterStrategy.getDeadLetterQueueFor(tmp, null); 149 if (deadLetterDestination.equals(message.getDestination())) { 150 return true; 151 } 152 } 153 } 154 return false; 155 } 156}