001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.nio;
018
019import java.io.EOFException;
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.io.OutputStream;
023import java.nio.ByteBuffer;
024import java.nio.channels.WritableByteChannel;
025
026import org.apache.activemq.transport.tcp.TimeStampStream;
027
028import javax.net.ssl.SSLEngine;
029
030/**
031 * An optimized buffered outputstream for Tcp
032 */
033public class NIOOutputStream extends OutputStream implements TimeStampStream {
034
035    private static final int BUFFER_SIZE = 8192;
036
037    private final WritableByteChannel out;
038    private final byte[] buffer;
039    private final ByteBuffer byteBuffer;
040
041    private int count;
042    private boolean closed;
043    private volatile long writeTimestamp = -1;//concurrent reads of this value
044
045    private SSLEngine engine;
046
047    /**
048     * Constructor
049     *
050     * @param out
051     */
052    public NIOOutputStream(WritableByteChannel out) {
053        this(out, BUFFER_SIZE);
054    }
055
056    /**
057     * Creates a new buffered output stream to write data to the specified
058     * underlying output stream with the specified buffer size.
059     *
060     * @param out the underlying output stream.
061     * @param size the buffer size.
062     * @throws IllegalArgumentException if size <= 0.
063     */
064    public NIOOutputStream(WritableByteChannel out, int size) {
065        this.out = out;
066        if (size <= 0) {
067            throw new IllegalArgumentException("Buffer size <= 0");
068        }
069        buffer = new byte[size];
070        byteBuffer = ByteBuffer.wrap(buffer);
071    }
072
073    /**
074     * write a byte on to the stream
075     *
076     * @param b - byte to write
077     * @throws IOException
078     */
079    public void write(int b) throws IOException {
080        checkClosed();
081        if (availableBufferToWrite() < 1) {
082            flush();
083        }
084        buffer[count++] = (byte)b;
085    }
086
087    /**
088     * write a byte array to the stream
089     *
090     * @param b the byte buffer
091     * @param off the offset into the buffer
092     * @param len the length of data to write
093     * @throws IOException
094     */
095    public void write(byte b[], int off, int len) throws IOException {
096        checkClosed();
097        if (availableBufferToWrite() < len) {
098            flush();
099        }
100        if (buffer.length >= len) {
101            System.arraycopy(b, off, buffer, count, len);
102            count += len;
103        } else {
104            write(ByteBuffer.wrap(b, off, len));
105        }
106    }
107
108    /**
109     * flush the data to the output stream This doesn't call flush on the
110     * underlying outputstream, because Tcp is particularly efficent at doing
111     * this itself ....
112     *
113     * @throws IOException
114     */
115    public void flush() throws IOException {
116        if (count > 0 && out != null) {
117            byteBuffer.position(0);
118            byteBuffer.limit(count);
119            write(byteBuffer);
120            count = 0;
121        }
122    }
123
124    /**
125     * close this stream
126     *
127     * @throws IOException
128     */
129    public void close() throws IOException {
130        super.close();
131        if (engine != null) {
132            engine.closeOutbound();
133        }
134        closed = true;
135    }
136
137    /**
138     * Checks that the stream has not been closed
139     *
140     * @throws IOException
141     */
142    protected void checkClosed() throws IOException {
143        if (closed) {
144            throw new EOFException("Cannot write to the stream any more it has already been closed");
145        }
146    }
147
148    /**
149     * @return the amount free space in the buffer
150     */
151    private int availableBufferToWrite() {
152        return buffer.length - count;
153    }
154
155    protected void write(ByteBuffer data) throws IOException {
156        ByteBuffer plain;
157        if (engine != null) {
158            plain = ByteBuffer.allocate(engine.getSession().getPacketBufferSize());
159            plain.clear();
160            engine.wrap(data, plain);
161            plain.flip();
162        }  else {
163            plain = data;
164        }
165
166        int remaining = plain.remaining();
167        int lastRemaining = remaining - 1;
168        long delay = 1;
169        try {
170            writeTimestamp = System.currentTimeMillis();
171            while (remaining > 0) {
172
173                // We may need to do a little bit of sleeping to avoid a busy loop.
174                // Slow down if no data was written out..
175                if (remaining == lastRemaining) {
176                    try {
177                        // Use exponential rollback to increase sleep time.
178                        Thread.sleep(delay);
179                        delay *= 2;
180                        if (delay > 1000) {
181                            delay = 1000;
182                        }
183                    } catch (InterruptedException e) {
184                        throw new InterruptedIOException();
185                    }
186                } else {
187                    delay = 1;
188                }
189                lastRemaining = remaining;
190
191                // Since the write is non-blocking, all the data may not have been
192                // written.
193                out.write(plain);
194                remaining = data.remaining();
195            }
196        } finally {
197            writeTimestamp = -1;
198        }
199    }
200
201
202    /* (non-Javadoc)
203     * @see org.apache.activemq.transport.tcp.TimeStampStream#isWriting()
204     */
205    public boolean isWriting() {
206        return writeTimestamp > 0;
207    }
208
209    /* (non-Javadoc)
210     * @see org.apache.activemq.transport.tcp.TimeStampStream#getWriteTimestamp()
211     */
212    public long getWriteTimestamp() {
213        return writeTimestamp;
214    }
215
216    public void setEngine(SSLEngine engine) {
217        this.engine = engine;
218    }
219}