001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker.scheduler; 018 019 import java.io.File; 020 import java.util.concurrent.atomic.AtomicBoolean; 021 022 import org.apache.activemq.ScheduledMessage; 023 import org.apache.activemq.advisory.AdvisorySupport; 024 import org.apache.activemq.broker.Broker; 025 import org.apache.activemq.broker.BrokerFilter; 026 import org.apache.activemq.broker.ConnectionContext; 027 import org.apache.activemq.broker.ProducerBrokerExchange; 028 import org.apache.activemq.command.ActiveMQDestination; 029 import org.apache.activemq.command.Message; 030 import org.apache.activemq.command.MessageId; 031 import org.apache.activemq.command.ProducerId; 032 import org.apache.activemq.command.ProducerInfo; 033 import org.apache.activemq.openwire.OpenWireFormat; 034 import org.apache.activemq.security.SecurityContext; 035 import org.apache.activemq.state.ProducerState; 036 import org.apache.activemq.util.IdGenerator; 037 import org.apache.activemq.util.LongSequenceGenerator; 038 import org.apache.activemq.util.TypeConversionSupport; 039 import org.apache.activemq.wireformat.WireFormat; 040 import org.slf4j.Logger; 041 import org.slf4j.LoggerFactory; 042 import org.apache.kahadb.util.ByteSequence; 043 044 public class SchedulerBroker extends BrokerFilter implements JobListener { 045 private static final Logger LOG = LoggerFactory.getLogger(SchedulerBroker.class); 046 private static final IdGenerator ID_GENERATOR = new IdGenerator(); 047 private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); 048 private final AtomicBoolean started = new AtomicBoolean(); 049 private final WireFormat wireFormat = new OpenWireFormat(); 050 private final ConnectionContext context = new ConnectionContext(); 051 private final ProducerId producerId = new ProducerId(); 052 private File directory; 053 054 private JobSchedulerStore store; 055 private JobScheduler scheduler; 056 057 public SchedulerBroker(Broker next, File directory) throws Exception { 058 super(next); 059 this.directory = directory; 060 this.producerId.setConnectionId(ID_GENERATOR.generateId()); 061 this.context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); 062 context.setBroker(next); 063 LOG.info("Scheduler using directory: " + directory); 064 065 } 066 067 public synchronized JobScheduler getJobScheduler() throws Exception { 068 return new JobSchedulerFacade(this); 069 } 070 071 /** 072 * @return the directory 073 */ 074 public File getDirectory() { 075 return this.directory; 076 } 077 /** 078 * @param directory 079 * the directory to set 080 */ 081 public void setDirectory(File directory) { 082 this.directory = directory; 083 } 084 085 @Override 086 public void start() throws Exception { 087 this.started.set(true); 088 getInternalScheduler(); 089 super.start(); 090 } 091 092 @Override 093 public void stop() throws Exception { 094 if (this.started.compareAndSet(true, false)) { 095 096 if (this.store != null) { 097 this.store.stop(); 098 } 099 if (this.scheduler != null) { 100 this.scheduler.removeListener(this); 101 this.scheduler = null; 102 } 103 } 104 super.stop(); 105 } 106 107 @Override 108 public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception { 109 long delay = 0; 110 long period = 0; 111 int repeat = 0; 112 String cronEntry = ""; 113 String jobId = (String) messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_ID); 114 Object cronValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_CRON); 115 Object periodValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD); 116 Object delayValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY); 117 118 String physicalName = messageSend.getDestination().getPhysicalName(); 119 boolean schedularManage = physicalName.regionMatches(true, 0, 120 ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION, 0, 121 ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION.length()); 122 123 if (schedularManage == true) { 124 125 JobScheduler scheduler = getInternalScheduler(); 126 ActiveMQDestination replyTo = messageSend.getReplyTo(); 127 128 String action = (String) messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION); 129 130 if (action != null ) { 131 132 Object startTime = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_START_TIME); 133 Object endTime = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_END_TIME); 134 135 if (replyTo != null && action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE)) { 136 137 if( startTime != null && endTime != null ) { 138 139 long start = (Long) TypeConversionSupport.convert(startTime, Long.class); 140 long finish = (Long) TypeConversionSupport.convert(endTime, Long.class); 141 142 for (Job job : scheduler.getAllJobs(start, finish)) { 143 sendScheduledJob(producerExchange.getConnectionContext(), job, replyTo); 144 } 145 } else { 146 for (Job job : scheduler.getAllJobs()) { 147 sendScheduledJob(producerExchange.getConnectionContext(), job, replyTo); 148 } 149 } 150 } 151 if (jobId != null && action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVE)) { 152 scheduler.remove(jobId); 153 } else if (action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL)) { 154 155 if( startTime != null && endTime != null ) { 156 157 long start = (Long) TypeConversionSupport.convert(startTime, Long.class); 158 long finish = (Long) TypeConversionSupport.convert(endTime, Long.class); 159 160 scheduler.removeAllJobs(start, finish); 161 } else { 162 scheduler.removeAllJobs(); 163 } 164 } 165 } 166 167 } else if ((cronValue != null || periodValue != null || delayValue != null) && jobId == null) { 168 //clear transaction context 169 Message msg = messageSend.copy(); 170 msg.setTransactionId(null); 171 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(msg); 172 if (cronValue != null) { 173 cronEntry = cronValue.toString(); 174 } 175 if (periodValue != null) { 176 period = (Long) TypeConversionSupport.convert(periodValue, Long.class); 177 } 178 if (delayValue != null) { 179 delay = (Long) TypeConversionSupport.convert(delayValue, Long.class); 180 } 181 Object repeatValue = msg.getProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT); 182 if (repeatValue != null) { 183 repeat = (Integer) TypeConversionSupport.convert(repeatValue, Integer.class); 184 } 185 getInternalScheduler().schedule(msg.getMessageId().toString(), 186 new ByteSequence(packet.data, packet.offset, packet.length),cronEntry, delay, period, repeat); 187 188 } else { 189 super.send(producerExchange, messageSend); 190 } 191 } 192 193 public void scheduledJob(String id, ByteSequence job) { 194 org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(job.getData(), job 195 .getOffset(), job.getLength()); 196 try { 197 Message messageSend = (Message) this.wireFormat.unmarshal(packet); 198 messageSend.setOriginalTransactionId(null); 199 Object repeatValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT); 200 Object cronValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_CRON); 201 String cronStr = cronValue != null ? cronValue.toString() : null; 202 int repeat = 0; 203 if (repeatValue != null) { 204 repeat = (Integer) TypeConversionSupport.convert(repeatValue, Integer.class); 205 } 206 207 if (repeat != 0 || cronStr != null && cronStr.length() > 0) { 208 // create a unique id - the original message could be sent 209 // lots of times 210 messageSend.setMessageId( 211 new MessageId(this.producerId, this.messageIdGenerator.getNextSequenceId())); 212 } 213 214 // Add the jobId as a property 215 messageSend.setProperty("scheduledJobId", id); 216 217 // if this goes across a network - we don't want it rescheduled 218 messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD); 219 messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY); 220 messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT); 221 messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_CRON); 222 223 if (messageSend.getTimestamp() > 0 && messageSend.getExpiration() > 0) { 224 225 long oldExpiration = messageSend.getExpiration(); 226 long newTimeStamp = System.currentTimeMillis(); 227 long timeToLive = 0; 228 long oldTimestamp = messageSend.getTimestamp(); 229 230 if (oldExpiration > 0) { 231 timeToLive = oldExpiration - oldTimestamp; 232 } 233 234 long expiration = timeToLive + newTimeStamp; 235 236 if(expiration > oldExpiration) { 237 if (timeToLive > 0 && expiration > 0) { 238 messageSend.setExpiration(expiration); 239 } 240 messageSend.setTimestamp(newTimeStamp); 241 if (LOG.isDebugEnabled()) { 242 LOG.debug("Set message " + messageSend.getMessageId() + " timestamp from " + oldTimestamp + " to " + newTimeStamp); 243 } 244 } 245 } 246 247 final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange(); 248 producerExchange.setConnectionContext(context); 249 producerExchange.setMutable(true); 250 producerExchange.setProducerState(new ProducerState(new ProducerInfo())); 251 super.send(producerExchange, messageSend); 252 } catch (Exception e) { 253 LOG.error("Failed to send scheduled message " + id, e); 254 } 255 } 256 257 protected synchronized JobScheduler getInternalScheduler() throws Exception { 258 if (this.started.get()) { 259 if (this.scheduler == null) { 260 this.scheduler = getStore().getJobScheduler("JMS"); 261 this.scheduler.addListener(this); 262 } 263 return this.scheduler; 264 } 265 return null; 266 } 267 268 private JobSchedulerStore getStore() throws Exception { 269 if (started.get()) { 270 if (this.store == null) { 271 this.store = new JobSchedulerStore(); 272 this.store.setDirectory(directory); 273 this.store.start(); 274 } 275 return this.store; 276 } 277 return null; 278 } 279 280 protected void sendScheduledJob(ConnectionContext context, Job job, ActiveMQDestination replyTo) 281 throws Exception { 282 283 org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(job.getPayload()); 284 try { 285 Message msg = (Message) this.wireFormat.unmarshal(packet); 286 msg.setOriginalTransactionId(null); 287 msg.setPersistent(false); 288 msg.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE); 289 msg.setMessageId(new MessageId(this.producerId, this.messageIdGenerator.getNextSequenceId())); 290 msg.setDestination(replyTo); 291 msg.setResponseRequired(false); 292 msg.setProducerId(this.producerId); 293 294 // Add the jobId as a property 295 msg.setProperty("scheduledJobId", job.getJobId()); 296 297 final boolean originalFlowControl = context.isProducerFlowControl(); 298 final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange(); 299 producerExchange.setConnectionContext(context); 300 producerExchange.setMutable(true); 301 producerExchange.setProducerState(new ProducerState(new ProducerInfo())); 302 try { 303 context.setProducerFlowControl(false); 304 this.next.send(producerExchange, msg); 305 } finally { 306 context.setProducerFlowControl(originalFlowControl); 307 } 308 } catch (Exception e) { 309 LOG.error("Failed to send scheduled message " + job.getJobId(), e); 310 } 311 312 } 313 }