001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq; 018 019 import java.util.HashMap; 020 import java.util.Map; 021 import java.util.concurrent.atomic.AtomicLong; 022 import javax.jms.Destination; 023 import javax.jms.IllegalStateException; 024 import javax.jms.InvalidDestinationException; 025 import javax.jms.JMSException; 026 import javax.jms.Message; 027 import org.apache.activemq.command.ActiveMQDestination; 028 import org.apache.activemq.command.ProducerAck; 029 import org.apache.activemq.command.ProducerId; 030 import org.apache.activemq.command.ProducerInfo; 031 import org.apache.activemq.management.JMSProducerStatsImpl; 032 import org.apache.activemq.management.StatsCapable; 033 import org.apache.activemq.management.StatsImpl; 034 import org.apache.activemq.usage.MemoryUsage; 035 import org.apache.activemq.util.IntrospectionSupport; 036 import org.slf4j.Logger; 037 import org.slf4j.LoggerFactory; 038 039 /** 040 * A client uses a <CODE>MessageProducer</CODE> object to send messages to a 041 * destination. A <CODE>MessageProducer</CODE> object is created by passing a 042 * <CODE>Destination</CODE> object to a message-producer creation method 043 * supplied by a session. 044 * <P> 045 * <CODE>MessageProducer</CODE> is the parent interface for all message 046 * producers. 047 * <P> 048 * A client also has the option of creating a message producer without supplying 049 * a destination. In this case, a destination must be provided with every send 050 * operation. A typical use for this kind of message producer is to send replies 051 * to requests using the request's <CODE>JMSReplyTo</CODE> destination. 052 * <P> 053 * A client can specify a default delivery mode, priority, and time to live for 054 * messages sent by a message producer. It can also specify the delivery mode, 055 * priority, and time to live for an individual message. 056 * <P> 057 * A client can specify a time-to-live value in milliseconds for each message it 058 * sends. This value defines a message expiration time that is the sum of the 059 * message's time-to-live and the GMT when it is sent (for transacted sends, 060 * this is the time the client sends the message, not the time the transaction 061 * is committed). 062 * <P> 063 * A JMS provider should do its best to expire messages accurately; however, the 064 * JMS API does not define the accuracy provided. 065 * 066 * 067 * @see javax.jms.TopicPublisher 068 * @see javax.jms.QueueSender 069 * @see javax.jms.Session#createProducer 070 */ 071 public class ActiveMQMessageProducer extends ActiveMQMessageProducerSupport implements StatsCapable, Disposable { 072 073 private static final Logger LOG = LoggerFactory.getLogger(ActiveMQMessageProducer.class); 074 075 protected ProducerInfo info; 076 protected boolean closed; 077 078 private final JMSProducerStatsImpl stats; 079 private AtomicLong messageSequence; 080 private final long startTime; 081 private MessageTransformer transformer; 082 private MemoryUsage producerWindow; 083 084 protected ActiveMQMessageProducer(ActiveMQSession session, ProducerId producerId, ActiveMQDestination destination, int sendTimeout) throws JMSException { 085 super(session); 086 this.info = new ProducerInfo(producerId); 087 this.info.setWindowSize(session.connection.getProducerWindowSize()); 088 // Allows the options on the destination to configure the producerInfo 089 if (destination != null && destination.getOptions() != null) { 090 Map<String, Object> options = IntrospectionSupport.extractProperties( 091 new HashMap<String, Object>(destination.getOptions()), "producer."); 092 IntrospectionSupport.setProperties(this.info, options); 093 if (options.size() > 0) { 094 String msg = "There are " + options.size() 095 + " producer options that couldn't be set on the producer." 096 + " Check the options are spelled correctly." 097 + " Unknown parameters=[" + options + "]." 098 + " This producer cannot be started."; 099 LOG.warn(msg); 100 throw new ConfigurationException(msg); 101 } 102 } 103 104 this.info.setDestination(destination); 105 106 // Enable producer window flow control if protocol > 3 and the window 107 // size > 0 108 if (session.connection.getProtocolVersion() >= 3 && this.info.getWindowSize() > 0) { 109 producerWindow = new MemoryUsage("Producer Window: " + producerId); 110 producerWindow.setExecutor(session.getConnectionExecutor()); 111 producerWindow.setLimit(this.info.getWindowSize()); 112 producerWindow.start(); 113 } 114 115 this.defaultDeliveryMode = Message.DEFAULT_DELIVERY_MODE; 116 this.defaultPriority = Message.DEFAULT_PRIORITY; 117 this.defaultTimeToLive = Message.DEFAULT_TIME_TO_LIVE; 118 this.startTime = System.currentTimeMillis(); 119 this.messageSequence = new AtomicLong(0); 120 this.stats = new JMSProducerStatsImpl(session.getSessionStats(), destination); 121 this.session.addProducer(this); 122 this.session.asyncSendPacket(info); 123 this.setSendTimeout(sendTimeout); 124 setTransformer(session.getTransformer()); 125 } 126 127 public StatsImpl getStats() { 128 return stats; 129 } 130 131 public JMSProducerStatsImpl getProducerStats() { 132 return stats; 133 } 134 135 /** 136 * Gets the destination associated with this <CODE>MessageProducer</CODE>. 137 * 138 * @return this producer's <CODE>Destination/ <CODE> 139 * @throws JMSException if the JMS provider fails to close the producer due to 140 * some internal error. 141 * @since 1.1 142 */ 143 public Destination getDestination() throws JMSException { 144 checkClosed(); 145 return this.info.getDestination(); 146 } 147 148 /** 149 * Closes the message producer. 150 * <P> 151 * Since a provider may allocate some resources on behalf of a <CODE> 152 * MessageProducer</CODE> 153 * outside the Java virtual machine, clients should close them when they are 154 * not needed. Relying on garbage collection to eventually reclaim these 155 * resources may not be timely enough. 156 * 157 * @throws JMSException if the JMS provider fails to close the producer due 158 * to some internal error. 159 */ 160 public void close() throws JMSException { 161 if (!closed) { 162 dispose(); 163 this.session.asyncSendPacket(info.createRemoveCommand()); 164 } 165 } 166 167 public void dispose() { 168 if (!closed) { 169 this.session.removeProducer(this); 170 if (producerWindow != null) { 171 producerWindow.stop(); 172 } 173 closed = true; 174 } 175 } 176 177 /** 178 * Check if the instance of this producer has been closed. 179 * 180 * @throws IllegalStateException 181 */ 182 @Override 183 protected void checkClosed() throws IllegalStateException { 184 if (closed) { 185 throw new IllegalStateException("The producer is closed"); 186 } 187 } 188 189 /** 190 * Sends a message to a destination for an unidentified message producer, 191 * specifying delivery mode, priority and time to live. 192 * <P> 193 * Typically, a message producer is assigned a destination at creation time; 194 * however, the JMS API also supports unidentified message producers, which 195 * require that the destination be supplied every time a message is sent. 196 * 197 * @param destination the destination to send this message to 198 * @param message the message to send 199 * @param deliveryMode the delivery mode to use 200 * @param priority the priority for this message 201 * @param timeToLive the message's lifetime (in milliseconds) 202 * @throws JMSException if the JMS provider fails to send the message due to 203 * some internal error. 204 * @throws UnsupportedOperationException if an invalid destination is 205 * specified. 206 * @throws InvalidDestinationException if a client uses this method with an 207 * invalid destination. 208 * @see javax.jms.Session#createProducer 209 * @since 1.1 210 */ 211 public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException { 212 this.send(destination, message, deliveryMode, priority, timeToLive, null); 213 } 214 215 public void send(Message message, AsyncCallback onComplete) throws JMSException { 216 this.send(this.getDestination(), 217 message, 218 this.defaultDeliveryMode, 219 this.defaultPriority, 220 this.defaultTimeToLive, onComplete); 221 } 222 223 public void send(Destination destination, Message message, AsyncCallback onComplete) throws JMSException { 224 this.send(destination, 225 message, 226 this.defaultDeliveryMode, 227 this.defaultPriority, 228 this.defaultTimeToLive, 229 onComplete); 230 } 231 232 public void send(Message message, int deliveryMode, int priority, long timeToLive, AsyncCallback onComplete) throws JMSException { 233 this.send(this.getDestination(), 234 message, 235 deliveryMode, 236 priority, 237 timeToLive, 238 onComplete); 239 } 240 241 public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, AsyncCallback onComplete) throws JMSException { 242 checkClosed(); 243 if (destination == null) { 244 if (info.getDestination() == null) { 245 throw new UnsupportedOperationException("A destination must be specified."); 246 } 247 throw new InvalidDestinationException("Don't understand null destinations"); 248 } 249 250 ActiveMQDestination dest; 251 if (destination == info.getDestination()) { 252 dest = (ActiveMQDestination)destination; 253 } else if (info.getDestination() == null) { 254 dest = ActiveMQDestination.transform(destination); 255 } else { 256 throw new UnsupportedOperationException("This producer can only send messages to: " + this.info.getDestination().getPhysicalName()); 257 } 258 if (dest == null) { 259 throw new JMSException("No destination specified"); 260 } 261 262 if (transformer != null) { 263 Message transformedMessage = transformer.producerTransform(session, this, message); 264 if (transformedMessage != null) { 265 message = transformedMessage; 266 } 267 } 268 269 if (producerWindow != null) { 270 try { 271 producerWindow.waitForSpace(); 272 } catch (InterruptedException e) { 273 throw new JMSException("Send aborted due to thread interrupt."); 274 } 275 } 276 277 this.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow, sendTimeout, onComplete); 278 279 stats.onMessage(); 280 } 281 282 public MessageTransformer getTransformer() { 283 return transformer; 284 } 285 286 /** 287 * Sets the transformer used to transform messages before they are sent on 288 * to the JMS bus 289 */ 290 public void setTransformer(MessageTransformer transformer) { 291 this.transformer = transformer; 292 } 293 294 /** 295 * @return the time in milli second when this object was created. 296 */ 297 protected long getStartTime() { 298 return this.startTime; 299 } 300 301 /** 302 * @return Returns the messageSequence. 303 */ 304 protected long getMessageSequence() { 305 return messageSequence.incrementAndGet(); 306 } 307 308 /** 309 * @param messageSequence The messageSequence to set. 310 */ 311 protected void setMessageSequence(AtomicLong messageSequence) { 312 this.messageSequence = messageSequence; 313 } 314 315 /** 316 * @return Returns the info. 317 */ 318 protected ProducerInfo getProducerInfo() { 319 return this.info != null ? this.info : null; 320 } 321 322 /** 323 * @param info The info to set 324 */ 325 protected void setProducerInfo(ProducerInfo info) { 326 this.info = info; 327 } 328 329 @Override 330 public String toString() { 331 return "ActiveMQMessageProducer { value=" + info.getProducerId() + " }"; 332 } 333 334 public void onProducerAck(ProducerAck pa) { 335 if (this.producerWindow != null) { 336 this.producerWindow.decreaseUsage(pa.getSize()); 337 } 338 } 339 340 }