001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.mqtt; 018 019import java.io.IOException; 020import java.security.cert.X509Certificate; 021 022import javax.jms.JMSException; 023import org.apache.activemq.broker.BrokerContext; 024import org.apache.activemq.command.Command; 025import org.apache.activemq.transport.Transport; 026import org.apache.activemq.transport.TransportFilter; 027import org.apache.activemq.transport.TransportListener; 028import org.apache.activemq.transport.tcp.SslTransport; 029import org.apache.activemq.util.IOExceptionSupport; 030import org.apache.activemq.wireformat.WireFormat; 031import org.fusesource.mqtt.codec.MQTTFrame; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035/** 036 * The MQTTTransportFilter normally sits on top of a TcpTransport that has been 037 * configured with the StompWireFormat and is used to convert MQTT commands to 038 * ActiveMQ commands. All of the conversion work is done by delegating to the 039 * MQTTProtocolConverter 040 */ 041public class MQTTTransportFilter extends TransportFilter implements MQTTTransport { 042 private static final Logger LOG = LoggerFactory.getLogger(MQTTTransportFilter.class); 043 private static final Logger TRACE = LoggerFactory.getLogger(MQTTTransportFilter.class.getPackage().getName() + ".MQTTIO"); 044 private final MQTTProtocolConverter protocolConverter; 045 private MQTTInactivityMonitor monitor; 046 private MQTTWireFormat wireFormat; 047 048 private boolean trace; 049 050 public MQTTTransportFilter(Transport next, WireFormat wireFormat, BrokerContext brokerContext) { 051 super(next); 052 this.protocolConverter = new MQTTProtocolConverter(this, brokerContext); 053 054 if (wireFormat instanceof MQTTWireFormat) { 055 this.wireFormat = (MQTTWireFormat) wireFormat; 056 } 057 } 058 059 public void oneway(Object o) throws IOException { 060 try { 061 final Command command = (Command) o; 062 protocolConverter.onActiveMQCommand(command); 063 } catch (Exception e) { 064 throw IOExceptionSupport.create(e); 065 } 066 } 067 068 public void onCommand(Object command) { 069 try { 070 if (trace) { 071 TRACE.trace("Received: \n" + command); 072 } 073 074 protocolConverter.onMQTTCommand((MQTTFrame) command); 075 } catch (IOException e) { 076 handleException(e); 077 } catch (JMSException e) { 078 onException(IOExceptionSupport.create(e)); 079 } 080 } 081 082 public void sendToActiveMQ(Command command) { 083 TransportListener l = transportListener; 084 if (l != null) { 085 l.onCommand(command); 086 } 087 } 088 089 public void sendToMQTT(MQTTFrame command) throws IOException { 090 if (trace) { 091 TRACE.trace("Sending: \n" + command); 092 } 093 Transport n = next; 094 if (n != null) { 095 n.oneway(command); 096 } 097 } 098 099 public X509Certificate[] getPeerCertificates() { 100 if (next instanceof SslTransport) { 101 X509Certificate[] peerCerts = ((SslTransport) next).getPeerCertificates(); 102 if (trace && peerCerts != null) { 103 LOG.debug("Peer Identity has been verified\n"); 104 } 105 return peerCerts; 106 } 107 return null; 108 } 109 110 public boolean isTrace() { 111 return trace; 112 } 113 114 public void setTrace(boolean trace) { 115 this.trace = trace; 116 } 117 118 @Override 119 public MQTTInactivityMonitor getInactivityMonitor() { 120 return monitor; 121 } 122 123 public void setInactivityMonitor(MQTTInactivityMonitor monitor) { 124 this.monitor = monitor; 125 } 126 127 @Override 128 public MQTTWireFormat getWireFormat() { 129 return this.wireFormat; 130 } 131 132 public void handleException(IOException e) { 133 protocolConverter.onTransportError(); 134 super.onException(e); 135 } 136 137 138}