001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.transport.nio; 018 019 import java.io.EOFException; 020 import java.io.IOException; 021 import java.io.InterruptedIOException; 022 import java.io.OutputStream; 023 import java.nio.ByteBuffer; 024 import java.nio.channels.WritableByteChannel; 025 026 import org.apache.activemq.transport.tcp.TimeStampStream; 027 028 import javax.net.ssl.SSLEngine; 029 030 /** 031 * An optimized buffered outputstream for Tcp 032 */ 033 public class NIOOutputStream extends OutputStream implements TimeStampStream { 034 035 private static final int BUFFER_SIZE = 8192; 036 037 private final WritableByteChannel out; 038 private final byte[] buffer; 039 private final ByteBuffer byteBuffer; 040 041 private int count; 042 private boolean closed; 043 private volatile long writeTimestamp = -1;//concurrent reads of this value 044 045 private SSLEngine engine; 046 047 /** 048 * Constructor 049 * 050 * @param out 051 */ 052 public NIOOutputStream(WritableByteChannel out) { 053 this(out, BUFFER_SIZE); 054 } 055 056 /** 057 * Creates a new buffered output stream to write data to the specified 058 * underlying output stream with the specified buffer size. 059 * 060 * @param out the underlying output stream. 061 * @param size the buffer size. 062 * @throws IllegalArgumentException if size <= 0. 063 */ 064 public NIOOutputStream(WritableByteChannel out, int size) { 065 this.out = out; 066 if (size <= 0) { 067 throw new IllegalArgumentException("Buffer size <= 0"); 068 } 069 buffer = new byte[size]; 070 byteBuffer = ByteBuffer.wrap(buffer); 071 } 072 073 /** 074 * write a byte on to the stream 075 * 076 * @param b - byte to write 077 * @throws IOException 078 */ 079 public void write(int b) throws IOException { 080 checkClosed(); 081 if (availableBufferToWrite() < 1) { 082 flush(); 083 } 084 buffer[count++] = (byte)b; 085 } 086 087 /** 088 * write a byte array to the stream 089 * 090 * @param b the byte buffer 091 * @param off the offset into the buffer 092 * @param len the length of data to write 093 * @throws IOException 094 */ 095 public void write(byte b[], int off, int len) throws IOException { 096 checkClosed(); 097 if (availableBufferToWrite() < len) { 098 flush(); 099 } 100 if (buffer.length >= len) { 101 System.arraycopy(b, off, buffer, count, len); 102 count += len; 103 } else { 104 write(ByteBuffer.wrap(b, off, len)); 105 } 106 } 107 108 /** 109 * flush the data to the output stream This doesn't call flush on the 110 * underlying outputstream, because Tcp is particularly efficent at doing 111 * this itself .... 112 * 113 * @throws IOException 114 */ 115 public void flush() throws IOException { 116 if (count > 0 && out != null) { 117 byteBuffer.position(0); 118 byteBuffer.limit(count); 119 write(byteBuffer); 120 count = 0; 121 } 122 } 123 124 /** 125 * close this stream 126 * 127 * @throws IOException 128 */ 129 public void close() throws IOException { 130 super.close(); 131 if (engine != null) { 132 engine.closeOutbound(); 133 } 134 closed = true; 135 } 136 137 /** 138 * Checks that the stream has not been closed 139 * 140 * @throws IOException 141 */ 142 protected void checkClosed() throws IOException { 143 if (closed) { 144 throw new EOFException("Cannot write to the stream any more it has already been closed"); 145 } 146 } 147 148 /** 149 * @return the amount free space in the buffer 150 */ 151 private int availableBufferToWrite() { 152 return buffer.length - count; 153 } 154 155 protected void write(ByteBuffer data) throws IOException { 156 ByteBuffer plain; 157 if (engine != null) { 158 plain = ByteBuffer.allocate(engine.getSession().getPacketBufferSize()); 159 plain.clear(); 160 engine.wrap(data, plain); 161 plain.flip(); 162 } else { 163 plain = data; 164 } 165 166 int remaining = plain.remaining(); 167 int lastRemaining = remaining - 1; 168 long delay = 1; 169 try { 170 writeTimestamp = System.currentTimeMillis(); 171 while (remaining > 0) { 172 173 // We may need to do a little bit of sleeping to avoid a busy loop. 174 // Slow down if no data was written out.. 175 if (remaining == lastRemaining) { 176 try { 177 // Use exponential rollback to increase sleep time. 178 Thread.sleep(delay); 179 delay *= 2; 180 if (delay > 1000) { 181 delay = 1000; 182 } 183 } catch (InterruptedException e) { 184 throw new InterruptedIOException(); 185 } 186 } else { 187 delay = 1; 188 } 189 lastRemaining = remaining; 190 191 // Since the write is non-blocking, all the data may not have been 192 // written. 193 out.write(plain); 194 remaining = data.remaining(); 195 } 196 } finally { 197 writeTimestamp = -1; 198 } 199 } 200 201 202 /* (non-Javadoc) 203 * @see org.apache.activemq.transport.tcp.TimeStampStream#isWriting() 204 */ 205 public boolean isWriting() { 206 return writeTimestamp > 0; 207 } 208 209 /* (non-Javadoc) 210 * @see org.apache.activemq.transport.tcp.TimeStampStream#getWriteTimestamp() 211 */ 212 public long getWriteTimestamp() { 213 return writeTimestamp; 214 } 215 216 public void setEngine(SSLEngine engine) { 217 this.engine = engine; 218 } 219 }