001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.command;
018
019import java.io.DataInputStream;
020import java.io.DataOutputStream;
021import java.io.IOException;
022import java.util.Collections;
023import java.util.HashMap;
024import java.util.Map;
025import javax.jms.JMSException;
026import org.apache.activemq.ActiveMQConnection;
027import org.apache.activemq.advisory.AdvisorySupport;
028import org.apache.activemq.broker.region.Destination;
029import org.apache.activemq.broker.region.MessageReference;
030import org.apache.activemq.broker.region.RegionBroker;
031import org.apache.activemq.usage.MemoryUsage;
032import org.apache.activemq.util.ByteArrayInputStream;
033import org.apache.activemq.util.ByteArrayOutputStream;
034import org.apache.activemq.util.ByteSequence;
035import org.apache.activemq.util.MarshallingSupport;
036import org.apache.activemq.wireformat.WireFormat;
037
038/**
039 * Represents an ActiveMQ message
040 *
041 * @openwire:marshaller
042 *
043 */
044public abstract class Message extends BaseCommand implements MarshallAware, MessageReference {
045
046    /**
047     * The default minimum amount of memory a message is assumed to use
048     */
049    public static final int DEFAULT_MINIMUM_MESSAGE_SIZE = 1024;
050
051    protected MessageId messageId;
052    protected ActiveMQDestination originalDestination;
053    protected TransactionId originalTransactionId;
054
055    protected ProducerId producerId;
056    protected ActiveMQDestination destination;
057    protected TransactionId transactionId;
058
059    protected long expiration;
060    protected long timestamp;
061    protected long arrival;
062    protected long brokerInTime;
063    protected long brokerOutTime;
064    protected String correlationId;
065    protected ActiveMQDestination replyTo;
066    protected boolean persistent;
067    protected String type;
068    protected byte priority;
069    protected String groupID;
070    protected int groupSequence;
071    protected ConsumerId targetConsumerId;
072    protected boolean compressed;
073    protected String userID;
074
075    protected ByteSequence content;
076    protected ByteSequence marshalledProperties;
077    protected DataStructure dataStructure;
078    protected int redeliveryCounter;
079
080    protected int size;
081    protected Map<String, Object> properties;
082    protected boolean readOnlyProperties;
083    protected boolean readOnlyBody;
084    protected transient boolean recievedByDFBridge;
085    protected boolean droppable;
086
087    private transient short referenceCount;
088    private transient ActiveMQConnection connection;
089    private transient org.apache.activemq.broker.region.Destination regionDestination;
090    private transient MemoryUsage memoryUsage;
091
092    private BrokerId[] brokerPath;
093    private BrokerId[] cluster;
094
095    public abstract Message copy();
096    public abstract void clearBody() throws JMSException;
097
098    // useful to reduce the memory footprint of a persisted message
099    public void clearMarshalledState() throws JMSException {
100        properties = null;
101    }
102
103    protected void copy(Message copy) {
104        super.copy(copy);
105        copy.producerId = producerId;
106        copy.transactionId = transactionId;
107        copy.destination = destination;
108        copy.messageId = messageId != null ? messageId.copy() : null;
109        copy.originalDestination = originalDestination;
110        copy.originalTransactionId = originalTransactionId;
111        copy.expiration = expiration;
112        copy.timestamp = timestamp;
113        copy.correlationId = correlationId;
114        copy.replyTo = replyTo;
115        copy.persistent = persistent;
116        copy.redeliveryCounter = redeliveryCounter;
117        copy.type = type;
118        copy.priority = priority;
119        copy.size = size;
120        copy.groupID = groupID;
121        copy.userID = userID;
122        copy.groupSequence = groupSequence;
123
124        if (properties != null) {
125            copy.properties = new HashMap<String, Object>(properties);
126
127            // The new message hasn't expired, so remove this feild.
128            copy.properties.remove(RegionBroker.ORIGINAL_EXPIRATION);
129        } else {
130            copy.properties = properties;
131        }
132
133        copy.content = content;
134        copy.marshalledProperties = marshalledProperties;
135        copy.dataStructure = dataStructure;
136        copy.readOnlyProperties = readOnlyProperties;
137        copy.readOnlyBody = readOnlyBody;
138        copy.compressed = compressed;
139        copy.recievedByDFBridge = recievedByDFBridge;
140
141        copy.arrival = arrival;
142        copy.connection = connection;
143        copy.regionDestination = regionDestination;
144        copy.brokerInTime = brokerInTime;
145        copy.brokerOutTime = brokerOutTime;
146        copy.memoryUsage=this.memoryUsage;
147        copy.brokerPath = brokerPath;
148
149        // lets not copy the following fields
150        // copy.targetConsumerId = targetConsumerId;
151        // copy.referenceCount = referenceCount;
152    }
153
154    public Object getProperty(String name) throws IOException {
155        if (properties == null) {
156            if (marshalledProperties == null) {
157                return null;
158            }
159            properties = unmarsallProperties(marshalledProperties);
160        }
161        return properties.get(name);
162    }
163
164    @SuppressWarnings("unchecked")
165    public Map<String, Object> getProperties() throws IOException {
166        if (properties == null) {
167            if (marshalledProperties == null) {
168                return Collections.EMPTY_MAP;
169            }
170            properties = unmarsallProperties(marshalledProperties);
171        }
172        return Collections.unmodifiableMap(properties);
173    }
174
175    public void clearProperties() {
176        marshalledProperties = null;
177        properties = null;
178    }
179
180    public void setProperty(String name, Object value) throws IOException {
181        lazyCreateProperties();
182        properties.put(name, value);
183    }
184
185    public void removeProperty(String name) throws IOException {
186        lazyCreateProperties();
187        properties.remove(name);
188    }
189
190    protected void lazyCreateProperties() throws IOException {
191        if (properties == null) {
192            if (marshalledProperties == null) {
193                properties = new HashMap<String, Object>();
194            } else {
195                properties = unmarsallProperties(marshalledProperties);
196                marshalledProperties = null;
197            }
198        }
199    }
200
201    private Map<String, Object> unmarsallProperties(ByteSequence marshalledProperties) throws IOException {
202        return MarshallingSupport.unmarshalPrimitiveMap(new DataInputStream(new ByteArrayInputStream(marshalledProperties)));
203    }
204
205    public void beforeMarshall(WireFormat wireFormat) throws IOException {
206        // Need to marshal the properties.
207        if (marshalledProperties == null && properties != null) {
208            ByteArrayOutputStream baos = new ByteArrayOutputStream();
209            DataOutputStream os = new DataOutputStream(baos);
210            MarshallingSupport.marshalPrimitiveMap(properties, os);
211            os.close();
212            marshalledProperties = baos.toByteSequence();
213        }
214    }
215
216    public void afterMarshall(WireFormat wireFormat) throws IOException {
217    }
218
219    public void beforeUnmarshall(WireFormat wireFormat) throws IOException {
220    }
221
222    public void afterUnmarshall(WireFormat wireFormat) throws IOException {
223    }
224
225    // /////////////////////////////////////////////////////////////////
226    //
227    // Simple Field accessors
228    //
229    // /////////////////////////////////////////////////////////////////
230
231    /**
232     * @openwire:property version=1 cache=true
233     */
234    public ProducerId getProducerId() {
235        return producerId;
236    }
237
238    public void setProducerId(ProducerId producerId) {
239        this.producerId = producerId;
240    }
241
242    /**
243     * @openwire:property version=1 cache=true
244     */
245    public ActiveMQDestination getDestination() {
246        return destination;
247    }
248
249    public void setDestination(ActiveMQDestination destination) {
250        this.destination = destination;
251    }
252
253    /**
254     * @openwire:property version=1 cache=true
255     */
256    public TransactionId getTransactionId() {
257        return transactionId;
258    }
259
260    public void setTransactionId(TransactionId transactionId) {
261        this.transactionId = transactionId;
262    }
263
264    public boolean isInTransaction() {
265        return transactionId != null;
266    }
267
268    /**
269     * @openwire:property version=1 cache=true
270     */
271    public ActiveMQDestination getOriginalDestination() {
272        return originalDestination;
273    }
274
275    public void setOriginalDestination(ActiveMQDestination destination) {
276        this.originalDestination = destination;
277    }
278
279    /**
280     * @openwire:property version=1
281     */
282    public MessageId getMessageId() {
283        return messageId;
284    }
285
286    public void setMessageId(MessageId messageId) {
287        this.messageId = messageId;
288    }
289
290    /**
291     * @openwire:property version=1 cache=true
292     */
293    public TransactionId getOriginalTransactionId() {
294        return originalTransactionId;
295    }
296
297    public void setOriginalTransactionId(TransactionId transactionId) {
298        this.originalTransactionId = transactionId;
299    }
300
301    /**
302     * @openwire:property version=1
303     */
304    public String getGroupID() {
305        return groupID;
306    }
307
308    public void setGroupID(String groupID) {
309        this.groupID = groupID;
310    }
311
312    /**
313     * @openwire:property version=1
314     */
315    public int getGroupSequence() {
316        return groupSequence;
317    }
318
319    public void setGroupSequence(int groupSequence) {
320        this.groupSequence = groupSequence;
321    }
322
323    /**
324     * @openwire:property version=1
325     */
326    public String getCorrelationId() {
327        return correlationId;
328    }
329
330    public void setCorrelationId(String correlationId) {
331        this.correlationId = correlationId;
332    }
333
334    /**
335     * @openwire:property version=1
336     */
337    public boolean isPersistent() {
338        return persistent;
339    }
340
341    public void setPersistent(boolean deliveryMode) {
342        this.persistent = deliveryMode;
343    }
344
345    /**
346     * @openwire:property version=1
347     */
348    public long getExpiration() {
349        return expiration;
350    }
351
352    public void setExpiration(long expiration) {
353        this.expiration = expiration;
354    }
355
356    /**
357     * @openwire:property version=1
358     */
359    public byte getPriority() {
360        return priority;
361    }
362
363    public void setPriority(byte priority) {
364        if (priority < 0) {
365            this.priority = 0;
366        } else if (priority > 9) {
367            this.priority = 9;
368        } else {
369            this.priority = priority;
370        }
371    }
372
373    /**
374     * @openwire:property version=1
375     */
376    public ActiveMQDestination getReplyTo() {
377        return replyTo;
378    }
379
380    public void setReplyTo(ActiveMQDestination replyTo) {
381        this.replyTo = replyTo;
382    }
383
384    /**
385     * @openwire:property version=1
386     */
387    public long getTimestamp() {
388        return timestamp;
389    }
390
391    public void setTimestamp(long timestamp) {
392        this.timestamp = timestamp;
393    }
394
395    /**
396     * @openwire:property version=1
397     */
398    public String getType() {
399        return type;
400    }
401
402    public void setType(String type) {
403        this.type = type;
404    }
405
406    /**
407     * @openwire:property version=1
408     */
409    public ByteSequence getContent() {
410        return content;
411    }
412
413    public void setContent(ByteSequence content) {
414        this.content = content;
415    }
416
417    /**
418     * @openwire:property version=1
419     */
420    public ByteSequence getMarshalledProperties() {
421        return marshalledProperties;
422    }
423
424    public void setMarshalledProperties(ByteSequence marshalledProperties) {
425        this.marshalledProperties = marshalledProperties;
426    }
427
428    /**
429     * @openwire:property version=1
430     */
431    public DataStructure getDataStructure() {
432        return dataStructure;
433    }
434
435    public void setDataStructure(DataStructure data) {
436        this.dataStructure = data;
437    }
438
439    /**
440     * Can be used to route the message to a specific consumer. Should be null
441     * to allow the broker use normal JMS routing semantics. If the target
442     * consumer id is an active consumer on the broker, the message is dropped.
443     * Used by the AdvisoryBroker to replay advisory messages to a specific
444     * consumer.
445     *
446     * @openwire:property version=1 cache=true
447     */
448    public ConsumerId getTargetConsumerId() {
449        return targetConsumerId;
450    }
451
452    public void setTargetConsumerId(ConsumerId targetConsumerId) {
453        this.targetConsumerId = targetConsumerId;
454    }
455
456    public boolean isExpired() {
457        long expireTime = getExpiration();
458        return expireTime > 0 && System.currentTimeMillis() > expireTime;
459    }
460
461    public boolean isAdvisory() {
462        return type != null && type.equals(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
463    }
464
465    /**
466     * @openwire:property version=1
467     */
468    public boolean isCompressed() {
469        return compressed;
470    }
471
472    public void setCompressed(boolean compressed) {
473        this.compressed = compressed;
474    }
475
476    public boolean isRedelivered() {
477        return redeliveryCounter > 0;
478    }
479
480    public void setRedelivered(boolean redelivered) {
481        if (redelivered) {
482            if (!isRedelivered()) {
483                setRedeliveryCounter(1);
484            }
485        } else {
486            if (isRedelivered()) {
487                setRedeliveryCounter(0);
488            }
489        }
490    }
491
492    public void incrementRedeliveryCounter() {
493        redeliveryCounter++;
494    }
495
496    /**
497     * @openwire:property version=1
498     */
499    public int getRedeliveryCounter() {
500        return redeliveryCounter;
501    }
502
503    public void setRedeliveryCounter(int deliveryCounter) {
504        this.redeliveryCounter = deliveryCounter;
505    }
506
507    /**
508     * The route of brokers the command has moved through.
509     *
510     * @openwire:property version=1 cache=true
511     */
512    public BrokerId[] getBrokerPath() {
513        return brokerPath;
514    }
515
516    public void setBrokerPath(BrokerId[] brokerPath) {
517        this.brokerPath = brokerPath;
518    }
519
520    public boolean isReadOnlyProperties() {
521        return readOnlyProperties;
522    }
523
524    public void setReadOnlyProperties(boolean readOnlyProperties) {
525        this.readOnlyProperties = readOnlyProperties;
526    }
527
528    public boolean isReadOnlyBody() {
529        return readOnlyBody;
530    }
531
532    public void setReadOnlyBody(boolean readOnlyBody) {
533        this.readOnlyBody = readOnlyBody;
534    }
535
536    public ActiveMQConnection getConnection() {
537        return this.connection;
538    }
539
540    public void setConnection(ActiveMQConnection connection) {
541        this.connection = connection;
542    }
543
544    /**
545     * Used to schedule the arrival time of a message to a broker. The broker
546     * will not dispatch a message to a consumer until it's arrival time has
547     * elapsed.
548     *
549     * @openwire:property version=1
550     */
551    public long getArrival() {
552        return arrival;
553    }
554
555    public void setArrival(long arrival) {
556        this.arrival = arrival;
557    }
558
559    /**
560     * Only set by the broker and defines the userID of the producer connection
561     * who sent this message. This is an optional field, it needs to be enabled
562     * on the broker to have this field populated.
563     *
564     * @openwire:property version=1
565     */
566    public String getUserID() {
567        return userID;
568    }
569
570    public void setUserID(String jmsxUserID) {
571        this.userID = jmsxUserID;
572    }
573
574    public int getReferenceCount() {
575        return referenceCount;
576    }
577
578    public Message getMessageHardRef() {
579        return this;
580    }
581
582    public Message getMessage() {
583        return this;
584    }
585
586    public org.apache.activemq.broker.region.Destination getRegionDestination() {
587        return regionDestination;
588    }
589
590    public void setRegionDestination(org.apache.activemq.broker.region.Destination destination) {
591        this.regionDestination = destination;
592        if(this.memoryUsage==null) {
593            this.memoryUsage=regionDestination.getMemoryUsage();
594        }
595    }
596
597    public MemoryUsage getMemoryUsage() {
598        return this.memoryUsage;
599    }
600
601    public void setMemoryUsage(MemoryUsage usage) {
602        this.memoryUsage=usage;
603    }
604
605    @Override
606    public boolean isMarshallAware() {
607        return true;
608    }
609
610    public int incrementReferenceCount() {
611        int rc;
612        int size;
613        synchronized (this) {
614            rc = ++referenceCount;
615            size = getSize();
616        }
617
618        if (rc == 1 && getMemoryUsage() != null) {
619            getMemoryUsage().increaseUsage(size);
620            //System.err.println("INCREASE USAGE " + System.identityHashCode(getMemoryUsage()) + " PERCENT = " + getMemoryUsage().getPercentUsage());
621
622        }
623
624        //System.out.println(" + "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc);
625        return rc;
626    }
627
628    public int decrementReferenceCount() {
629        int rc;
630        int size;
631        synchronized (this) {
632            rc = --referenceCount;
633            size = getSize();
634        }
635
636        if (rc == 0 && getMemoryUsage() != null) {
637            getMemoryUsage().decreaseUsage(size);
638            //Thread.dumpStack();
639            //System.err.println("DECREADED USAGE " + System.identityHashCode(getMemoryUsage()) + " PERCENT = " + getMemoryUsage().getPercentUsage());
640        }
641
642        //System.out.println(" - "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc);
643
644        return rc;
645    }
646
647    public int getSize() {
648        int minimumMessageSize = getMinimumMessageSize();
649        if (size < minimumMessageSize || size == 0) {
650            size = minimumMessageSize;
651            if (marshalledProperties != null) {
652                size += marshalledProperties.getLength();
653            }
654            if (content != null) {
655                size += content.getLength();
656            }
657        }
658        return size;
659    }
660
661    protected int getMinimumMessageSize() {
662        int result = DEFAULT_MINIMUM_MESSAGE_SIZE;
663        //let destination override
664        Destination dest = regionDestination;
665        if (dest != null) {
666            result=dest.getMinimumMessageSize();
667        }
668        return result;
669    }
670
671    /**
672     * @openwire:property version=1
673     * @return Returns the recievedByDFBridge.
674     */
675    public boolean isRecievedByDFBridge() {
676        return recievedByDFBridge;
677    }
678
679    /**
680     * @param recievedByDFBridge The recievedByDFBridge to set.
681     */
682    public void setRecievedByDFBridge(boolean recievedByDFBridge) {
683        this.recievedByDFBridge = recievedByDFBridge;
684    }
685
686    public void onMessageRolledBack() {
687        incrementRedeliveryCounter();
688    }
689
690    /**
691     * @openwire:property version=2 cache=true
692     */
693    public boolean isDroppable() {
694        return droppable;
695    }
696
697    public void setDroppable(boolean droppable) {
698        this.droppable = droppable;
699    }
700
701    /**
702     * If a message is stored in multiple nodes on a cluster, all the cluster
703     * members will be listed here. Otherwise, it will be null.
704     *
705     * @openwire:property version=3 cache=true
706     */
707    public BrokerId[] getCluster() {
708        return cluster;
709    }
710
711    public void setCluster(BrokerId[] cluster) {
712        this.cluster = cluster;
713    }
714
715    @Override
716    public boolean isMessage() {
717        return true;
718    }
719
720    /**
721     * @openwire:property version=3
722     */
723    public long getBrokerInTime() {
724        return this.brokerInTime;
725    }
726
727    public void setBrokerInTime(long brokerInTime) {
728        this.brokerInTime = brokerInTime;
729    }
730
731    /**
732     * @openwire:property version=3
733     */
734    public long getBrokerOutTime() {
735        return this.brokerOutTime;
736    }
737
738    public void setBrokerOutTime(long brokerOutTime) {
739        this.brokerOutTime = brokerOutTime;
740    }
741
742    public boolean isDropped() {
743        return false;
744    }
745
746    @Override
747    public String toString() {
748        return toString(null);
749    }
750
751    @Override
752    public String toString(Map<String, Object>overrideFields) {
753        try {
754            getProperties();
755        } catch (IOException e) {
756        }
757        return super.toString(overrideFields);
758    }
759}