001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.mqtt;
018
019import java.io.IOException;
020import java.security.cert.X509Certificate;
021
022import javax.jms.JMSException;
023import org.apache.activemq.broker.BrokerContext;
024import org.apache.activemq.command.Command;
025import org.apache.activemq.transport.Transport;
026import org.apache.activemq.transport.TransportFilter;
027import org.apache.activemq.transport.TransportListener;
028import org.apache.activemq.transport.tcp.SslTransport;
029import org.apache.activemq.util.IOExceptionSupport;
030import org.apache.activemq.wireformat.WireFormat;
031import org.fusesource.mqtt.codec.MQTTFrame;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035/**
036 * The MQTTTransportFilter normally sits on top of a TcpTransport that has been
037 * configured with the StompWireFormat and is used to convert MQTT commands to
038 * ActiveMQ commands. All of the conversion work is done by delegating to the
039 * MQTTProtocolConverter
040 */
041public class MQTTTransportFilter extends TransportFilter implements MQTTTransport {
042    private static final Logger LOG = LoggerFactory.getLogger(MQTTTransportFilter.class);
043    private static final Logger TRACE = LoggerFactory.getLogger(MQTTTransportFilter.class.getPackage().getName() + ".MQTTIO");
044    private final MQTTProtocolConverter protocolConverter;
045    private MQTTInactivityMonitor monitor;
046    private MQTTWireFormat wireFormat;
047
048    private boolean trace;
049
050    public MQTTTransportFilter(Transport next, WireFormat wireFormat, BrokerContext brokerContext) {
051        super(next);
052        this.protocolConverter = new MQTTProtocolConverter(this, brokerContext);
053
054        if (wireFormat instanceof MQTTWireFormat) {
055            this.wireFormat = (MQTTWireFormat) wireFormat;
056        }
057    }
058
059    public void oneway(Object o) throws IOException {
060        try {
061            final Command command = (Command) o;
062            protocolConverter.onActiveMQCommand(command);
063        } catch (Exception e) {
064            throw IOExceptionSupport.create(e);
065        }
066    }
067
068    public void onCommand(Object command) {
069        try {
070            if (trace) {
071                TRACE.trace("Received: \n" + command);
072            }
073
074            protocolConverter.onMQTTCommand((MQTTFrame) command);
075        } catch (IOException e) {
076            handleException(e);
077        } catch (JMSException e) {
078            onException(IOExceptionSupport.create(e));
079        }
080    }
081
082    public void sendToActiveMQ(Command command) {
083        TransportListener l = transportListener;
084        if (l != null) {
085            l.onCommand(command);
086        }
087    }
088
089    public void sendToMQTT(MQTTFrame command) throws IOException {
090        if (trace) {
091            TRACE.trace("Sending: \n" + command);
092        }
093        Transport n = next;
094        if (n != null) {
095            n.oneway(command);
096        }
097    }
098
099    public X509Certificate[] getPeerCertificates() {
100        if (next instanceof SslTransport) {
101            X509Certificate[] peerCerts = ((SslTransport) next).getPeerCertificates();
102            if (trace && peerCerts != null) {
103                LOG.debug("Peer Identity has been verified\n");
104            }
105            return peerCerts;
106        }
107        return null;
108    }
109
110    public boolean isTrace() {
111        return trace;
112    }
113
114    public void setTrace(boolean trace) {
115        this.trace = trace;
116    }
117
118    @Override
119    public MQTTInactivityMonitor getInactivityMonitor() {
120        return monitor;
121    }
122
123    public void setInactivityMonitor(MQTTInactivityMonitor monitor) {
124        this.monitor = monitor;
125    }
126
127    @Override
128    public MQTTWireFormat getWireFormat() {
129        return this.wireFormat;
130    }
131
132    public void handleException(IOException e) {
133        protocolConverter.onTransportError();
134        super.onException(e);
135    }
136
137
138}