001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.command; 018 019 import java.io.IOException; 020 import java.io.UnsupportedEncodingException; 021 import java.util.Enumeration; 022 import java.util.HashMap; 023 import java.util.Iterator; 024 import java.util.List; 025 import java.util.Map; 026 import java.util.Vector; 027 import javax.jms.DeliveryMode; 028 import javax.jms.Destination; 029 import javax.jms.JMSException; 030 import javax.jms.MessageFormatException; 031 import javax.jms.MessageNotWriteableException; 032 import org.apache.activemq.ActiveMQConnection; 033 import org.apache.activemq.ScheduledMessage; 034 import org.apache.activemq.broker.scheduler.CronParser; 035 import org.apache.activemq.filter.PropertyExpression; 036 import org.apache.activemq.state.CommandVisitor; 037 import org.apache.activemq.util.Callback; 038 import org.apache.activemq.util.JMSExceptionSupport; 039 import org.apache.activemq.util.TypeConversionSupport; 040 041 /** 042 * 043 * @openwire:marshaller code="23" 044 */ 045 public class ActiveMQMessage extends Message implements org.apache.activemq.Message, ScheduledMessage { 046 public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_MESSAGE; 047 public static final String DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY = "dlqDeliveryFailureCause"; 048 private static final Map<String, PropertySetter> JMS_PROPERTY_SETERS = new HashMap<String, PropertySetter>(); 049 050 protected transient Callback acknowledgeCallback; 051 052 public byte getDataStructureType() { 053 return DATA_STRUCTURE_TYPE; 054 } 055 056 057 @Override 058 public Message copy() { 059 ActiveMQMessage copy = new ActiveMQMessage(); 060 copy(copy); 061 return copy; 062 } 063 064 protected void copy(ActiveMQMessage copy) { 065 super.copy(copy); 066 copy.acknowledgeCallback = acknowledgeCallback; 067 } 068 069 @Override 070 public int hashCode() { 071 MessageId id = getMessageId(); 072 if (id != null) { 073 return id.hashCode(); 074 } else { 075 return super.hashCode(); 076 } 077 } 078 079 @Override 080 public boolean equals(Object o) { 081 if (this == o) { 082 return true; 083 } 084 if (o == null || o.getClass() != getClass()) { 085 return false; 086 } 087 088 ActiveMQMessage msg = (ActiveMQMessage) o; 089 MessageId oMsg = msg.getMessageId(); 090 MessageId thisMsg = this.getMessageId(); 091 return thisMsg != null && oMsg != null && oMsg.equals(thisMsg); 092 } 093 094 public void acknowledge() throws JMSException { 095 if (acknowledgeCallback != null) { 096 try { 097 acknowledgeCallback.execute(); 098 } catch (JMSException e) { 099 throw e; 100 } catch (Throwable e) { 101 throw JMSExceptionSupport.create(e); 102 } 103 } 104 } 105 106 @Override 107 public void clearBody() throws JMSException { 108 setContent(null); 109 readOnlyBody = false; 110 } 111 112 public String getJMSMessageID() { 113 MessageId messageId = this.getMessageId(); 114 if (messageId == null) { 115 return null; 116 } 117 return messageId.toString(); 118 } 119 120 /** 121 * Seems to be invalid because the parameter doesn't initialize MessageId 122 * instance variables ProducerId and ProducerSequenceId 123 * 124 * @param value 125 * @throws JMSException 126 */ 127 public void setJMSMessageID(String value) throws JMSException { 128 if (value != null) { 129 try { 130 MessageId id = new MessageId(value); 131 this.setMessageId(id); 132 } catch (NumberFormatException e) { 133 // we must be some foreign JMS provider or strange user-supplied 134 // String 135 // so lets set the IDs to be 1 136 MessageId id = new MessageId(); 137 id.setTextView(value); 138 this.setMessageId(messageId); 139 } 140 } else { 141 this.setMessageId(null); 142 } 143 } 144 145 /** 146 * This will create an object of MessageId. For it to be valid, the instance 147 * variable ProducerId and producerSequenceId must be initialized. 148 * 149 * @param producerId 150 * @param producerSequenceId 151 * @throws JMSException 152 */ 153 public void setJMSMessageID(ProducerId producerId, long producerSequenceId) throws JMSException { 154 MessageId id = null; 155 try { 156 id = new MessageId(producerId, producerSequenceId); 157 this.setMessageId(id); 158 } catch (Throwable e) { 159 throw JMSExceptionSupport.create("Invalid message id '" + id + "', reason: " + e.getMessage(), e); 160 } 161 } 162 163 public long getJMSTimestamp() { 164 return this.getTimestamp(); 165 } 166 167 public void setJMSTimestamp(long timestamp) { 168 this.setTimestamp(timestamp); 169 } 170 171 public String getJMSCorrelationID() { 172 return this.getCorrelationId(); 173 } 174 175 public void setJMSCorrelationID(String correlationId) { 176 this.setCorrelationId(correlationId); 177 } 178 179 public byte[] getJMSCorrelationIDAsBytes() throws JMSException { 180 return encodeString(this.getCorrelationId()); 181 } 182 183 public void setJMSCorrelationIDAsBytes(byte[] correlationId) throws JMSException { 184 this.setCorrelationId(decodeString(correlationId)); 185 } 186 187 public String getJMSXMimeType() { 188 return "jms/message"; 189 } 190 191 protected static String decodeString(byte[] data) throws JMSException { 192 try { 193 if (data == null) { 194 return null; 195 } 196 return new String(data, "UTF-8"); 197 } catch (UnsupportedEncodingException e) { 198 throw new JMSException("Invalid UTF-8 encoding: " + e.getMessage()); 199 } 200 } 201 202 protected static byte[] encodeString(String data) throws JMSException { 203 try { 204 if (data == null) { 205 return null; 206 } 207 return data.getBytes("UTF-8"); 208 } catch (UnsupportedEncodingException e) { 209 throw new JMSException("Invalid UTF-8 encoding: " + e.getMessage()); 210 } 211 } 212 213 public Destination getJMSReplyTo() { 214 return this.getReplyTo(); 215 } 216 217 public void setJMSReplyTo(Destination destination) throws JMSException { 218 this.setReplyTo(ActiveMQDestination.transform(destination)); 219 } 220 221 public Destination getJMSDestination() { 222 return this.getDestination(); 223 } 224 225 public void setJMSDestination(Destination destination) throws JMSException { 226 this.setDestination(ActiveMQDestination.transform(destination)); 227 } 228 229 public int getJMSDeliveryMode() { 230 return this.isPersistent() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT; 231 } 232 233 public void setJMSDeliveryMode(int mode) { 234 this.setPersistent(mode == DeliveryMode.PERSISTENT); 235 } 236 237 public boolean getJMSRedelivered() { 238 return this.isRedelivered(); 239 } 240 241 public void setJMSRedelivered(boolean redelivered) { 242 this.setRedelivered(redelivered); 243 } 244 245 public String getJMSType() { 246 return this.getType(); 247 } 248 249 public void setJMSType(String type) { 250 this.setType(type); 251 } 252 253 public long getJMSExpiration() { 254 return this.getExpiration(); 255 } 256 257 public void setJMSExpiration(long expiration) { 258 this.setExpiration(expiration); 259 } 260 261 public int getJMSPriority() { 262 return this.getPriority(); 263 } 264 265 public void setJMSPriority(int priority) { 266 this.setPriority((byte) priority); 267 } 268 269 @Override 270 public void clearProperties() { 271 super.clearProperties(); 272 readOnlyProperties = false; 273 } 274 275 public boolean propertyExists(String name) throws JMSException { 276 try { 277 return (this.getProperties().containsKey(name) || getObjectProperty(name)!= null); 278 } catch (IOException e) { 279 throw JMSExceptionSupport.create(e); 280 } 281 } 282 283 public Enumeration getPropertyNames() throws JMSException { 284 try { 285 Vector<String> result = new Vector<String>(this.getProperties().keySet()); 286 return result.elements(); 287 } catch (IOException e) { 288 throw JMSExceptionSupport.create(e); 289 } 290 } 291 292 /** 293 * return all property names, including standard JMS properties and JMSX properties 294 * @return Enumeration of all property names on this message 295 * @throws JMSException 296 */ 297 public Enumeration getAllPropertyNames() throws JMSException { 298 try { 299 Vector<String> result = new Vector<String>(this.getProperties().keySet()); 300 result.addAll(JMS_PROPERTY_SETERS.keySet()); 301 return result.elements(); 302 } catch (IOException e) { 303 throw JMSExceptionSupport.create(e); 304 } 305 } 306 307 interface PropertySetter { 308 309 void set(Message message, Object value) throws MessageFormatException; 310 } 311 312 static { 313 JMS_PROPERTY_SETERS.put("JMSXDeliveryCount", new PropertySetter() { 314 public void set(Message message, Object value) throws MessageFormatException { 315 Integer rc = (Integer) TypeConversionSupport.convert(value, Integer.class); 316 if (rc == null) { 317 throw new MessageFormatException("Property JMSXDeliveryCount cannot be set from a " + value.getClass().getName() + "."); 318 } 319 message.setRedeliveryCounter(rc.intValue() - 1); 320 } 321 }); 322 JMS_PROPERTY_SETERS.put("JMSXGroupID", new PropertySetter() { 323 public void set(Message message, Object value) throws MessageFormatException { 324 String rc = (String) TypeConversionSupport.convert(value, String.class); 325 if (rc == null) { 326 throw new MessageFormatException("Property JMSXGroupID cannot be set from a " + value.getClass().getName() + "."); 327 } 328 message.setGroupID(rc); 329 } 330 }); 331 JMS_PROPERTY_SETERS.put("JMSXGroupSeq", new PropertySetter() { 332 public void set(Message message, Object value) throws MessageFormatException { 333 Integer rc = (Integer) TypeConversionSupport.convert(value, Integer.class); 334 if (rc == null) { 335 throw new MessageFormatException("Property JMSXGroupSeq cannot be set from a " + value.getClass().getName() + "."); 336 } 337 message.setGroupSequence(rc.intValue()); 338 } 339 }); 340 JMS_PROPERTY_SETERS.put("JMSCorrelationID", new PropertySetter() { 341 public void set(Message message, Object value) throws MessageFormatException { 342 String rc = (String) TypeConversionSupport.convert(value, String.class); 343 if (rc == null) { 344 throw new MessageFormatException("Property JMSCorrelationID cannot be set from a " + value.getClass().getName() + "."); 345 } 346 ((ActiveMQMessage) message).setJMSCorrelationID(rc); 347 } 348 }); 349 JMS_PROPERTY_SETERS.put("JMSDeliveryMode", new PropertySetter() { 350 public void set(Message message, Object value) throws MessageFormatException { 351 Integer rc = (Integer) TypeConversionSupport.convert(value, Integer.class); 352 if (rc == null) { 353 Boolean bool = (Boolean) TypeConversionSupport.convert(value, Boolean.class); 354 if (bool == null) { 355 throw new MessageFormatException("Property JMSDeliveryMode cannot be set from a " + value.getClass().getName() + "."); 356 } 357 else { 358 rc = bool.booleanValue() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT; 359 } 360 } 361 ((ActiveMQMessage) message).setJMSDeliveryMode(rc); 362 } 363 }); 364 JMS_PROPERTY_SETERS.put("JMSExpiration", new PropertySetter() { 365 public void set(Message message, Object value) throws MessageFormatException { 366 Long rc = (Long) TypeConversionSupport.convert(value, Long.class); 367 if (rc == null) { 368 throw new MessageFormatException("Property JMSExpiration cannot be set from a " + value.getClass().getName() + "."); 369 } 370 ((ActiveMQMessage) message).setJMSExpiration(rc.longValue()); 371 } 372 }); 373 JMS_PROPERTY_SETERS.put("JMSPriority", new PropertySetter() { 374 public void set(Message message, Object value) throws MessageFormatException { 375 Integer rc = (Integer) TypeConversionSupport.convert(value, Integer.class); 376 if (rc == null) { 377 throw new MessageFormatException("Property JMSPriority cannot be set from a " + value.getClass().getName() + "."); 378 } 379 ((ActiveMQMessage) message).setJMSPriority(rc.intValue()); 380 } 381 }); 382 JMS_PROPERTY_SETERS.put("JMSRedelivered", new PropertySetter() { 383 public void set(Message message, Object value) throws MessageFormatException { 384 Boolean rc = (Boolean) TypeConversionSupport.convert(value, Boolean.class); 385 if (rc == null) { 386 throw new MessageFormatException("Property JMSRedelivered cannot be set from a " + value.getClass().getName() + "."); 387 } 388 ((ActiveMQMessage) message).setJMSRedelivered(rc.booleanValue()); 389 } 390 }); 391 JMS_PROPERTY_SETERS.put("JMSReplyTo", new PropertySetter() { 392 public void set(Message message, Object value) throws MessageFormatException { 393 ActiveMQDestination rc = (ActiveMQDestination) TypeConversionSupport.convert(value, ActiveMQDestination.class); 394 if (rc == null) { 395 throw new MessageFormatException("Property JMSReplyTo cannot be set from a " + value.getClass().getName() + "."); 396 } 397 ((ActiveMQMessage) message).setReplyTo(rc); 398 } 399 }); 400 JMS_PROPERTY_SETERS.put("JMSTimestamp", new PropertySetter() { 401 public void set(Message message, Object value) throws MessageFormatException { 402 Long rc = (Long) TypeConversionSupport.convert(value, Long.class); 403 if (rc == null) { 404 throw new MessageFormatException("Property JMSTimestamp cannot be set from a " + value.getClass().getName() + "."); 405 } 406 ((ActiveMQMessage) message).setJMSTimestamp(rc.longValue()); 407 } 408 }); 409 JMS_PROPERTY_SETERS.put("JMSType", new PropertySetter() { 410 public void set(Message message, Object value) throws MessageFormatException { 411 String rc = (String) TypeConversionSupport.convert(value, String.class); 412 if (rc == null) { 413 throw new MessageFormatException("Property JMSType cannot be set from a " + value.getClass().getName() + "."); 414 } 415 ((ActiveMQMessage) message).setJMSType(rc); 416 } 417 }); 418 } 419 420 public void setObjectProperty(String name, Object value) throws JMSException { 421 setObjectProperty(name, value, true); 422 } 423 424 public void setObjectProperty(String name, Object value, boolean checkReadOnly) throws JMSException { 425 426 if (checkReadOnly) { 427 checkReadOnlyProperties(); 428 } 429 if (name == null || name.equals("")) { 430 throw new IllegalArgumentException("Property name cannot be empty or null"); 431 } 432 433 checkValidObject(value); 434 value = convertScheduled(name, value); 435 PropertySetter setter = JMS_PROPERTY_SETERS.get(name); 436 437 if (setter != null && value != null) { 438 setter.set(this, value); 439 } else { 440 try { 441 this.setProperty(name, value); 442 } catch (IOException e) { 443 throw JMSExceptionSupport.create(e); 444 } 445 } 446 } 447 448 public void setProperties(Map properties) throws JMSException { 449 for (Iterator iter = properties.entrySet().iterator(); iter.hasNext();) { 450 Map.Entry entry = (Map.Entry) iter.next(); 451 452 // Lets use the object property method as we may contain standard 453 // extension headers like JMSXGroupID 454 setObjectProperty((String) entry.getKey(), entry.getValue()); 455 } 456 } 457 458 protected void checkValidObject(Object value) throws MessageFormatException { 459 460 boolean valid = value instanceof Boolean || value instanceof Byte || value instanceof Short || value instanceof Integer || value instanceof Long; 461 valid = valid || value instanceof Float || value instanceof Double || value instanceof Character || value instanceof String || value == null; 462 463 if (!valid) { 464 465 ActiveMQConnection conn = getConnection(); 466 // conn is null if we are in the broker rather than a JMS client 467 if (conn == null || conn.isNestedMapAndListEnabled()) { 468 if (!(value instanceof Map || value instanceof List)) { 469 throw new MessageFormatException("Only objectified primitive objects, String, Map and List types are allowed but was: " + value + " type: " + value.getClass()); 470 } 471 } else { 472 throw new MessageFormatException("Only objectified primitive objects and String types are allowed but was: " + value + " type: " + value.getClass()); 473 } 474 } 475 } 476 477 protected void checkValidScheduled(String name, Object value) throws MessageFormatException { 478 if (AMQ_SCHEDULED_DELAY.equals(name) || AMQ_SCHEDULED_PERIOD.equals(name) || AMQ_SCHEDULED_REPEAT.equals(name)) { 479 if (value instanceof Long == false && value instanceof Integer == false) { 480 throw new MessageFormatException(name + " should be long or int value"); 481 } 482 } 483 if (AMQ_SCHEDULED_CRON.equals(name)) { 484 CronParser.validate(value.toString()); 485 } 486 } 487 488 protected Object convertScheduled(String name, Object value) throws MessageFormatException { 489 Object result = value; 490 if (AMQ_SCHEDULED_DELAY.equals(name)){ 491 result = TypeConversionSupport.convert(value, Long.class); 492 } 493 else if (AMQ_SCHEDULED_PERIOD.equals(name)){ 494 result = TypeConversionSupport.convert(value, Long.class); 495 } 496 else if (AMQ_SCHEDULED_REPEAT.equals(name)){ 497 result = TypeConversionSupport.convert(value, Integer.class); 498 } 499 return result; 500 } 501 502 public Object getObjectProperty(String name) throws JMSException { 503 if (name == null) { 504 throw new NullPointerException("Property name cannot be null"); 505 } 506 507 // PropertyExpression handles converting message headers to properties. 508 PropertyExpression expression = new PropertyExpression(name); 509 return expression.evaluate(this); 510 } 511 512 public boolean getBooleanProperty(String name) throws JMSException { 513 Object value = getObjectProperty(name); 514 if (value == null) { 515 return false; 516 } 517 Boolean rc = (Boolean) TypeConversionSupport.convert(value, Boolean.class); 518 if (rc == null) { 519 throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a boolean"); 520 } 521 return rc.booleanValue(); 522 } 523 524 public byte getByteProperty(String name) throws JMSException { 525 Object value = getObjectProperty(name); 526 if (value == null) { 527 throw new NumberFormatException("property " + name + " was null"); 528 } 529 Byte rc = (Byte) TypeConversionSupport.convert(value, Byte.class); 530 if (rc == null) { 531 throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a byte"); 532 } 533 return rc.byteValue(); 534 } 535 536 public short getShortProperty(String name) throws JMSException { 537 Object value = getObjectProperty(name); 538 if (value == null) { 539 throw new NumberFormatException("property " + name + " was null"); 540 } 541 Short rc = (Short) TypeConversionSupport.convert(value, Short.class); 542 if (rc == null) { 543 throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a short"); 544 } 545 return rc.shortValue(); 546 } 547 548 public int getIntProperty(String name) throws JMSException { 549 Object value = getObjectProperty(name); 550 if (value == null) { 551 throw new NumberFormatException("property " + name + " was null"); 552 } 553 Integer rc = (Integer) TypeConversionSupport.convert(value, Integer.class); 554 if (rc == null) { 555 throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as an integer"); 556 } 557 return rc.intValue(); 558 } 559 560 public long getLongProperty(String name) throws JMSException { 561 Object value = getObjectProperty(name); 562 if (value == null) { 563 throw new NumberFormatException("property " + name + " was null"); 564 } 565 Long rc = (Long) TypeConversionSupport.convert(value, Long.class); 566 if (rc == null) { 567 throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a long"); 568 } 569 return rc.longValue(); 570 } 571 572 public float getFloatProperty(String name) throws JMSException { 573 Object value = getObjectProperty(name); 574 if (value == null) { 575 throw new NullPointerException("property " + name + " was null"); 576 } 577 Float rc = (Float) TypeConversionSupport.convert(value, Float.class); 578 if (rc == null) { 579 throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a float"); 580 } 581 return rc.floatValue(); 582 } 583 584 public double getDoubleProperty(String name) throws JMSException { 585 Object value = getObjectProperty(name); 586 if (value == null) { 587 throw new NullPointerException("property " + name + " was null"); 588 } 589 Double rc = (Double) TypeConversionSupport.convert(value, Double.class); 590 if (rc == null) { 591 throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a double"); 592 } 593 return rc.doubleValue(); 594 } 595 596 public String getStringProperty(String name) throws JMSException { 597 Object value = null; 598 if (name.equals("JMSXUserID")) { 599 value = getUserID(); 600 if (value == null) { 601 value = getObjectProperty(name); 602 } 603 } else { 604 value = getObjectProperty(name); 605 } 606 if (value == null) { 607 return null; 608 } 609 String rc = (String) TypeConversionSupport.convert(value, String.class); 610 if (rc == null) { 611 throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a String"); 612 } 613 return rc; 614 } 615 616 public void setBooleanProperty(String name, boolean value) throws JMSException { 617 setBooleanProperty(name, value, true); 618 } 619 620 public void setBooleanProperty(String name, boolean value, boolean checkReadOnly) throws JMSException { 621 setObjectProperty(name, Boolean.valueOf(value), checkReadOnly); 622 } 623 624 public void setByteProperty(String name, byte value) throws JMSException { 625 setObjectProperty(name, Byte.valueOf(value)); 626 } 627 628 public void setShortProperty(String name, short value) throws JMSException { 629 setObjectProperty(name, Short.valueOf(value)); 630 } 631 632 public void setIntProperty(String name, int value) throws JMSException { 633 setObjectProperty(name, Integer.valueOf(value)); 634 } 635 636 public void setLongProperty(String name, long value) throws JMSException { 637 setObjectProperty(name, Long.valueOf(value)); 638 } 639 640 public void setFloatProperty(String name, float value) throws JMSException { 641 setObjectProperty(name, new Float(value)); 642 } 643 644 public void setDoubleProperty(String name, double value) throws JMSException { 645 setObjectProperty(name, new Double(value)); 646 } 647 648 public void setStringProperty(String name, String value) throws JMSException { 649 setObjectProperty(name, value); 650 } 651 652 private void checkReadOnlyProperties() throws MessageNotWriteableException { 653 if (readOnlyProperties) { 654 throw new MessageNotWriteableException("Message properties are read-only"); 655 } 656 } 657 658 protected void checkReadOnlyBody() throws MessageNotWriteableException { 659 if (readOnlyBody) { 660 throw new MessageNotWriteableException("Message body is read-only"); 661 } 662 } 663 664 public Callback getAcknowledgeCallback() { 665 return acknowledgeCallback; 666 } 667 668 public void setAcknowledgeCallback(Callback acknowledgeCallback) { 669 this.acknowledgeCallback = acknowledgeCallback; 670 } 671 672 /** 673 * Send operation event listener. Used to get the message ready to be sent. 674 */ 675 public void onSend() throws JMSException { 676 setReadOnlyBody(true); 677 setReadOnlyProperties(true); 678 } 679 680 public Response visit(CommandVisitor visitor) throws Exception { 681 return visitor.processMessage(this); 682 } 683 }