001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.mqtt;
018
019import java.io.DataInput;
020import java.io.DataInputStream;
021import java.io.DataOutput;
022import java.io.DataOutputStream;
023import java.io.IOException;
024
025import org.apache.activemq.util.ByteArrayInputStream;
026import org.apache.activemq.util.ByteArrayOutputStream;
027import org.apache.activemq.util.ByteSequence;
028import org.apache.activemq.wireformat.WireFormat;
029import org.fusesource.hawtbuf.Buffer;
030import org.fusesource.mqtt.codec.MQTTFrame;
031
032/**
033 * Implements marshalling and unmarsalling the <a
034 * href="http://mqtt.org/">MQTT</a> protocol.
035 */
036public class MQTTWireFormat implements WireFormat {
037
038
039    static final int MAX_MESSAGE_LENGTH = 1024 * 1024 * 256;
040
041    private boolean encodingEnabled = false;
042    private int version = 1;
043
044    public ByteSequence marshal(Object command) throws IOException {
045        ByteArrayOutputStream baos = new ByteArrayOutputStream();
046        DataOutputStream dos = new DataOutputStream(baos);
047        marshal(command, dos);
048        dos.close();
049        return baos.toByteSequence();
050    }
051
052    public Object unmarshal(ByteSequence packet) throws IOException {
053        ByteArrayInputStream stream = new ByteArrayInputStream(packet);
054        DataInputStream dis = new DataInputStream(stream);
055        return unmarshal(dis);
056    }
057
058    public void marshal(Object command, DataOutput dataOut) throws IOException {
059        MQTTFrame frame = (MQTTFrame) command;
060        dataOut.write(frame.header());
061
062        int remaining = 0;
063        for (Buffer buffer : frame.buffers) {
064            remaining += buffer.length;
065        }
066        do {
067            byte digit = (byte) (remaining & 0x7F);
068            remaining >>>= 7;
069            if (remaining > 0) {
070                digit |= 0x80;
071            }
072            dataOut.write(digit);
073        } while (remaining > 0);
074        for (Buffer buffer : frame.buffers) {
075            dataOut.write(buffer.data, buffer.offset, buffer.length);
076        }
077    }
078
079    public Object unmarshal(DataInput dataIn) throws IOException {
080        byte header = dataIn.readByte();
081
082        byte digit;
083        int multiplier = 1;
084        int length = 0;
085        do {
086            digit = dataIn.readByte();
087            length += (digit & 0x7F) * multiplier;
088            multiplier <<= 7;
089        }
090        while ((digit & 0x80) != 0);
091
092        if (length >= 0) {
093            if (length > MAX_MESSAGE_LENGTH) {
094                throw new IOException("The maximum message length was exceeded");
095            }
096
097            if (length > 0) {
098                byte[] data = new byte[length];
099                dataIn.readFully(data);
100                Buffer body = new Buffer(data);
101                return new MQTTFrame(body).header(header);
102            } else {
103                return new MQTTFrame().header(header);
104            }
105        }
106        return null;
107    }
108
109    /**
110     * @param the version of the wire format
111     */
112    public void setVersion(int version) {
113        this.version = version;
114    }
115
116    /**
117     * @return the version of the wire format
118     */
119    public int getVersion() {
120        return this.version;
121    }
122
123
124}