001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq;
018
019import java.io.IOException;
020import java.io.OutputStream;
021import java.util.HashMap;
022import java.util.Iterator;
023import java.util.Map;
024
025import javax.jms.InvalidDestinationException;
026import javax.jms.JMSException;
027
028import org.apache.activemq.command.ActiveMQBytesMessage;
029import org.apache.activemq.command.ActiveMQDestination;
030import org.apache.activemq.command.ActiveMQMessage;
031import org.apache.activemq.command.MessageId;
032import org.apache.activemq.command.ProducerId;
033import org.apache.activemq.command.ProducerInfo;
034import org.apache.activemq.util.IOExceptionSupport;
035import org.apache.activemq.util.IntrospectionSupport;
036
037/**
038 *
039 */
040public class ActiveMQOutputStream extends OutputStream implements Disposable {
041
042    protected int count;
043
044    final byte buffer[];
045
046    private final ActiveMQConnection connection;
047    private final Map<String, Object> properties;
048    private final ProducerInfo info;
049
050    private long messageSequence;
051    private boolean closed;
052    private final int deliveryMode;
053    private final int priority;
054    private final long timeToLive;
055    private boolean alwaysSyncSend = false;
056
057    /**
058     * JMS Property which is used to specify the size (in kb) which is used as chunk size when splitting the stream. Default is 64kb
059     */
060    public final static String AMQ_STREAM_CHUNK_SIZE = "AMQ_STREAM_CHUNK_SIZE";
061
062    public ActiveMQOutputStream(ActiveMQConnection connection, ProducerId producerId, ActiveMQDestination destination, Map<String, Object> properties, int deliveryMode, int priority,
063                                long timeToLive) throws JMSException {
064        this.connection = connection;
065        this.deliveryMode = deliveryMode;
066        this.priority = priority;
067        this.timeToLive = timeToLive;
068        this.properties = properties == null ? null : new HashMap<String, Object>(properties);
069
070        Integer chunkSize = this.properties == null ? null : (Integer) this.properties.get(AMQ_STREAM_CHUNK_SIZE);
071        if (chunkSize == null) {
072            chunkSize = 64 * 1024;
073        } else {
074            if (chunkSize < 1) {
075                throw new IllegalArgumentException("Chunk size must be greater then 0");
076            } else {
077                chunkSize *= 1024;
078            }
079        }
080
081        buffer = new byte[chunkSize];
082
083        if (destination == null) {
084            throw new InvalidDestinationException("Don't understand null destinations");
085        }
086
087        this.info = new ProducerInfo(producerId);
088
089        // Allows the options on the destination to configure the stream
090        if (destination.getOptions() != null) {
091            Map<String, String> options = new HashMap<String, String>(destination.getOptions());
092            IntrospectionSupport.setProperties(this, options, "producer.");
093            IntrospectionSupport.setProperties(this.info, options, "producer.");
094        }
095
096        this.info.setDestination(destination);
097
098        this.connection.addOutputStream(this);
099        this.connection.asyncSendPacket(info);
100    }
101
102    public void close() throws IOException {
103        if (!closed) {
104            flushBuffer();
105            try {
106                // Send an EOS style empty message to signal EOS.
107                send(new ActiveMQMessage(), true);
108                dispose();
109                this.connection.asyncSendPacket(info.createRemoveCommand());
110            } catch (JMSException e) {
111                IOExceptionSupport.create(e);
112            }
113        }
114    }
115
116    public void dispose() {
117        if (!closed) {
118            this.connection.removeOutputStream(this);
119            closed = true;
120        }
121    }
122
123    public synchronized void write(int b) throws IOException {
124        buffer[count++] = (byte) b;
125        if (count == buffer.length) {
126            flushBuffer();
127        }
128    }
129
130    public synchronized void write(byte b[], int off, int len) throws IOException {
131        while (len > 0) {
132            int max = Math.min(len, buffer.length - count);
133            System.arraycopy(b, off, buffer, count, max);
134
135            len -= max;
136            count += max;
137            off += max;
138
139            if (count == buffer.length) {
140                flushBuffer();
141            }
142        }
143    }
144
145    public synchronized void flush() throws IOException {
146        flushBuffer();
147    }
148
149    private void flushBuffer() throws IOException {
150        if (count != 0) {
151            try {
152                ActiveMQBytesMessage msg = new ActiveMQBytesMessage();
153                msg.writeBytes(buffer, 0, count);
154                send(msg, false);
155            } catch (JMSException e) {
156                throw IOExceptionSupport.create(e);
157            }
158            count = 0;
159        }
160    }
161
162    /**
163     * @param msg
164     * @throws JMSException
165     */
166    private void send(ActiveMQMessage msg, boolean eosMessage) throws JMSException {
167        if (properties != null) {
168            for (Iterator<String> iter = properties.keySet().iterator(); iter.hasNext();) {
169                String key = iter.next();
170                Object value = properties.get(key);
171                msg.setObjectProperty(key, value);
172            }
173        }
174        msg.setType("org.apache.activemq.Stream");
175        msg.setGroupID(info.getProducerId().toString());
176        if (eosMessage) {
177            msg.setGroupSequence(-1);
178        } else {
179            msg.setGroupSequence((int) messageSequence);
180        }
181        MessageId id = new MessageId(info.getProducerId(), messageSequence++);
182        connection.send(info.getDestination(), msg, id, deliveryMode, priority, timeToLive, !eosMessage && !isAlwaysSyncSend());
183    }
184
185    public String toString() {
186        return "ActiveMQOutputStream { producerId=" + info.getProducerId() + " }";
187    }
188
189    public boolean isAlwaysSyncSend() {
190        return alwaysSyncSend;
191    }
192
193    public void setAlwaysSyncSend(boolean alwaysSyncSend) {
194        this.alwaysSyncSend = alwaysSyncSend;
195    }
196
197}