001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.mqtt; 018 019import java.io.IOException; 020import java.util.zip.DataFormatException; 021 022import javax.jms.JMSException; 023 024import org.apache.activemq.command.ActiveMQDestination; 025import org.apache.activemq.command.ActiveMQMessage; 026import org.apache.activemq.command.ConsumerInfo; 027import org.apache.activemq.command.MessageAck; 028import org.apache.activemq.command.MessageDispatch; 029import org.fusesource.mqtt.client.QoS; 030import org.fusesource.mqtt.codec.PUBLISH; 031 032/** 033 * Keeps track of the MQTT client subscription so that acking is correctly done. 034 */ 035public class MQTTSubscription { 036 037 private final MQTTProtocolConverter protocolConverter; 038 039 private final ConsumerInfo consumerInfo; 040 private final String topicName; 041 private final QoS qos; 042 043 public MQTTSubscription(MQTTProtocolConverter protocolConverter, String topicName, QoS qos, ConsumerInfo consumerInfo) { 044 this.protocolConverter = protocolConverter; 045 this.consumerInfo = consumerInfo; 046 this.qos = qos; 047 this.topicName = topicName; 048 } 049 050 /** 051 * Create a {@link MessageAck} that will acknowledge the given {@link MessageDispatch}. 052 * 053 * @param md 054 * the {@link MessageDispatch} to acknowledge. 055 * 056 * @return a new {@link MessageAck} command to acknowledge the message. 057 */ 058 public MessageAck createMessageAck(MessageDispatch md) { 059 return new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1); 060 } 061 062 /** 063 * Creates a PUBLISH command that can be sent to a remote client from an 064 * incoming {@link ActiveMQMessage} instance. 065 * 066 * @param message 067 * the message to convert to a PUBLISH command. 068 * 069 * @return a new PUBLISH command that is populated from the {@link ActiveMQMessage}. 070 * 071 * @throws DataFormatException 072 * @throws IOException 073 * @throws JMSException 074 */ 075 public PUBLISH createPublish(ActiveMQMessage message) throws DataFormatException, IOException, JMSException { 076 PUBLISH publish = protocolConverter.convertMessage(message); 077 if (publish.qos().ordinal() > this.qos.ordinal()) { 078 publish.qos(this.qos); 079 } 080 switch (publish.qos()) { 081 case AT_LEAST_ONCE: 082 case EXACTLY_ONCE: 083 // set packet id, and optionally dup flag 084 protocolConverter.getPacketIdGenerator().setPacketId(protocolConverter.getClientId(), this, message, publish); 085 case AT_MOST_ONCE: 086 } 087 return publish; 088 } 089 090 /** 091 * Given a PUBLISH command determine if it will expect an ACK based on the 092 * QoS of the Publish command and the QoS of this subscription. 093 * 094 * @param publish 095 * The publish command to inspect. 096 * 097 * @return true if the client will expect an PUBACK for this PUBLISH. 098 */ 099 public boolean expectAck(PUBLISH publish) { 100 QoS publishQoS = publish.qos(); 101 if (publishQoS.compareTo(this.qos) > 0){ 102 publishQoS = this.qos; 103 } 104 return !publishQoS.equals(QoS.AT_MOST_ONCE); 105 } 106 107 /** 108 * @returns the original topic name value the client used when subscribing. 109 */ 110 public String getTopicName() { 111 return this.topicName; 112 } 113 114 /** 115 * The real {@link ActiveMQDestination} that this subscription is assigned. 116 * 117 * @return the real {@link ActiveMQDestination} assigned to this subscription. 118 */ 119 public ActiveMQDestination getDestination() { 120 return consumerInfo.getDestination(); 121 } 122 123 /** 124 * Gets the {@link ConsumerInfo} that describes the subscription sent to ActiveMQ. 125 * 126 * @return the {@link ConsumerInfo} used to create this subscription. 127 */ 128 public ConsumerInfo getConsumerInfo() { 129 return consumerInfo; 130 } 131 132 /** 133 * @return the assigned QoS value for this subscription. 134 */ 135 public QoS getQoS() { 136 return qos; 137 } 138 139 @Override 140 public String toString() { 141 return "MQTT Sub: topic[" + topicName + "] -> [" + consumerInfo.getDestination() + "]"; 142 } 143}