001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.command; 018 019 import org.apache.activemq.state.CommandVisitor; 020 021 /** 022 * @openwire:marshaller code="22" 023 * 024 */ 025 public class MessageAck extends BaseCommand { 026 027 public static final byte DATA_STRUCTURE_TYPE = CommandTypes.MESSAGE_ACK; 028 029 /** 030 * Used to let the broker know that the message has been delivered to the 031 * client. Message will still be retained until an standard ack is received. 032 * This is used get the broker to send more messages past prefetch limits 033 * when an standard ack has not been sent. 034 */ 035 public static final byte DELIVERED_ACK_TYPE = 0; 036 037 /** 038 * The standard ack case where a client wants the message to be discarded. 039 */ 040 public static final byte STANDARD_ACK_TYPE = 2; 041 042 /** 043 * In case the client want's to explicitly let the broker know that a 044 * message was not processed and the message was considered a poison 045 * message. 046 */ 047 public static final byte POSION_ACK_TYPE = 1; 048 049 /** 050 * In case the client want's to explicitly let the broker know that a 051 * message was not processed and it was re-delivered to the consumer 052 * but it was not yet considered to be a poison message. The messageCount 053 * field will hold the number of times the message was re-delivered. 054 */ 055 public static final byte REDELIVERED_ACK_TYPE = 3; 056 057 /** 058 * The ack case where a client wants only an individual message to be discarded. 059 */ 060 public static final byte INDIVIDUAL_ACK_TYPE = 4; 061 062 /** 063 * The ack case where a durable topic subscription does not match a selector. 064 */ 065 public static final byte UNMATCHED_ACK_TYPE = 5; 066 067 protected byte ackType; 068 protected ConsumerId consumerId; 069 protected MessageId firstMessageId; 070 protected MessageId lastMessageId; 071 protected ActiveMQDestination destination; 072 protected TransactionId transactionId; 073 protected int messageCount; 074 protected Throwable poisonCause; 075 076 protected transient String consumerKey; 077 078 public MessageAck() { 079 } 080 081 public MessageAck(MessageDispatch md, byte ackType, int messageCount) { 082 this.ackType = ackType; 083 this.consumerId = md.getConsumerId(); 084 this.destination = md.getDestination(); 085 this.lastMessageId = md.getMessage().getMessageId(); 086 this.messageCount = messageCount; 087 } 088 089 public MessageAck(Message message, byte ackType, int messageCount) { 090 this.ackType = ackType; 091 this.destination = message.getDestination(); 092 this.lastMessageId = message.getMessageId(); 093 this.messageCount = messageCount; 094 } 095 096 public void copy(MessageAck copy) { 097 super.copy(copy); 098 copy.firstMessageId = firstMessageId; 099 copy.lastMessageId = lastMessageId; 100 copy.destination = destination; 101 copy.transactionId = transactionId; 102 copy.ackType = ackType; 103 copy.consumerId = consumerId; 104 } 105 106 public byte getDataStructureType() { 107 return DATA_STRUCTURE_TYPE; 108 } 109 110 public boolean isMessageAck() { 111 return true; 112 } 113 114 public boolean isPoisonAck() { 115 return ackType == POSION_ACK_TYPE; 116 } 117 118 public boolean isStandardAck() { 119 return ackType == STANDARD_ACK_TYPE; 120 } 121 122 public boolean isDeliveredAck() { 123 return ackType == DELIVERED_ACK_TYPE; 124 } 125 126 public boolean isRedeliveredAck() { 127 return ackType == REDELIVERED_ACK_TYPE; 128 } 129 130 public boolean isIndividualAck() { 131 return ackType == INDIVIDUAL_ACK_TYPE; 132 } 133 134 public boolean isUnmatchedAck() { 135 return ackType == UNMATCHED_ACK_TYPE; 136 } 137 138 /** 139 * @openwire:property version=1 cache=true 140 */ 141 public ActiveMQDestination getDestination() { 142 return destination; 143 } 144 145 public void setDestination(ActiveMQDestination destination) { 146 this.destination = destination; 147 } 148 149 /** 150 * @openwire:property version=1 cache=true 151 */ 152 public TransactionId getTransactionId() { 153 return transactionId; 154 } 155 156 public void setTransactionId(TransactionId transactionId) { 157 this.transactionId = transactionId; 158 } 159 160 public boolean isInTransaction() { 161 return transactionId != null; 162 } 163 164 /** 165 * @openwire:property version=1 cache=true 166 */ 167 public ConsumerId getConsumerId() { 168 return consumerId; 169 } 170 171 public void setConsumerId(ConsumerId consumerId) { 172 this.consumerId = consumerId; 173 } 174 175 /** 176 * @openwire:property version=1 177 */ 178 public byte getAckType() { 179 return ackType; 180 } 181 182 public void setAckType(byte ackType) { 183 this.ackType = ackType; 184 } 185 186 /** 187 * @openwire:property version=1 188 */ 189 public MessageId getFirstMessageId() { 190 return firstMessageId; 191 } 192 193 public void setFirstMessageId(MessageId firstMessageId) { 194 this.firstMessageId = firstMessageId; 195 } 196 197 /** 198 * @openwire:property version=1 199 */ 200 public MessageId getLastMessageId() { 201 return lastMessageId; 202 } 203 204 public void setLastMessageId(MessageId lastMessageId) { 205 this.lastMessageId = lastMessageId; 206 } 207 208 /** 209 * The number of messages being acknowledged in the range. 210 * 211 * @openwire:property version=1 212 */ 213 public int getMessageCount() { 214 return messageCount; 215 } 216 217 public void setMessageCount(int messageCount) { 218 this.messageCount = messageCount; 219 } 220 221 /** 222 * The cause of a poison ack, if a message listener 223 * throws an exception it will be recorded here 224 * 225 * @openwire:property version=7 226 */ 227 public Throwable getPoisonCause() { 228 return poisonCause; 229 } 230 231 public void setPoisonCause(Throwable poisonCause) { 232 this.poisonCause = poisonCause; 233 } 234 235 public Response visit(CommandVisitor visitor) throws Exception { 236 return visitor.processMessageAck(this); 237 } 238 239 /** 240 * A helper method to allow a single message ID to be acknowledged 241 */ 242 public void setMessageID(MessageId messageID) { 243 setFirstMessageId(messageID); 244 setLastMessageId(messageID); 245 setMessageCount(1); 246 } 247 248 }