Java NIO SocketChannel write与DirectByteBuffer实现分析

系统部同事反映了一个问题,在Cassandra内部场景中使用了NIO通信,并使用了one client one thread模型,且不说这种通信模型的问题,同事反映每当有较大的数据传输时都会有很大的内存增长,看似内存泄露,其实这部分内存并不是jvm堆内存,实际叫堆外内存,不容易回收,下面我们先来分析为什么会有内存的不断增长,然后再给出一个比较好的解决方案。
让我们透过源码一步一步追踪其调用过程,对socket发送数据使用SocketChannel 的public abstract int write(ByteBuffer src) throws IOException;方法,下面看下其源码实现(Open JDK7U):

public int write(ByteBuffer buf) throws IOException {
        if (buf == null)
            throw new NullPointerException();
        synchronized (writeLock) {
            ensureWriteOpen();
            int n = 0;
            Object traceContext =
                IoTrace.socketWriteBegin();

            try {
                begin();
                synchronized (stateLock) {
                    if (!isOpen())
                        return 0;
                    writerThread = NativeThread.current();
                }
                for (;;) {
                    n = IOUtil.write(fd, buf, -1, nd);
                    if ((n == IOStatus.INTERRUPTED) && isOpen())
                        continue;
                    return IOStatus.normalize(n);
                }
            } finally {
                writerCleanup();
                IoTrace.socketWriteEnd(traceContext, remoteAddress.getAddress(),
                                       remoteAddress.getPort(), n > 0 ? n : 0);
                end(n > 0 || (n == IOStatus.UNAVAILABLE));
                synchronized (stateLock) {
                    if ((n <= 0) && (!isOutputOpen))
                        throw new AsynchronousCloseException();
                }
                assert IOStatus.check(n);
            }
        }
    }

接着进入第18行n = IOUtil.write(fd, buf, -1, nd);其源码如下:

static int write(FileDescriptor fd, ByteBuffer src, long position,
                     NativeDispatcher nd)
        throws IOException
    {
        if (src instanceof DirectBuffer)
            return writeFromNativeBuffer(fd, src, position, nd);

        // Substitute a native buffer
        int pos = src.position();
        int lim = src.limit();
        assert (pos <= lim);
        int rem = (pos <= lim ? lim - pos : 0);
        ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
        try {
            bb.put(src);
            bb.flip();
            // Do not update src until we see how many bytes were written
            src.position(pos);

            int n = writeFromNativeBuffer(fd, bb, position, nd);
            if (n > 0) {
                // now update src
                src.position(pos + n);
            }
            return n;
        } finally {
            Util.offerFirstTemporaryDirectBuffer(bb);
        }
    }

看第13行ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);,获取当前线程的DirectBuffer,此处开始看内存是如何分配的,问题也在这个地方产生,源码如下:

/**
     * Returns a temporary buffer of at least the given size
     */
    static ByteBuffer getTemporaryDirectBuffer(int size) {
        BufferCache cache = bufferCache.get();
        ByteBuffer buf = cache.get(size);
        if (buf != null) {
            return buf;
        } else {
            // No suitable buffer in the cache so we need to allocate a new
            // one. To avoid the cache growing then we remove the first
            // buffer from the cache and free it.
            if (!cache.isEmpty()) {
                buf = cache.removeFirst();
                free(buf);
            }
            return ByteBuffer.allocateDirect(size);
        }
    }

bufferCache是一个ThreadLocal的静态变量,从当前线程BufferCache寻找满足该大小的cache,找不到就重新分配,至于其释放时机又是不确定的,哼,问题在这了。

怎样解决呢,只要我们每次申请的内存不超过一个固定值就不会引起DirectBuffer的内存重新alloc了,其实在读董西成的Hadoop技术内幕时也看到过对这部分的讨论,不过时间久之忘记了,书中明确说明可将写入的数据分成固定大小(比如8KB)的chunk,并以chunk为单位写入DirectBuffer,那就以书中示例代码给出参考,如下:

final static int NIO_BUFFER_LIMIT = 8 * 1024;//chunk 大小8KB
	
//将Buffer中的数据写入Channel中,其中Channel处于非阻塞模式
public int channelWrite(WritableByteChannel channel, 
                        ByteBuffer buffer) throws IOException {
	//如果缓冲区中的数据小于8KB,则直接写入Channel中,否则以chunk为单位写入
	return (buffer.remaining() <= NIO_BUFFER_LIMIT) ? 
               channel.write(buffer) : channelIO(null, channel, buffer);
}
	
private static int channelIO(ReadableByteChannel readCh, 
                             WritableByteChannel writeCh, 
                             ByteBuffer buf) throws IOException {
	int originalLimit = buf.limit();
	int initialRemaining = buf.remaining();
	int ret = 0;
	while (buf.remaining() > 0) {
		try{
			int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
			buf.limit(buf.position() + ioSize);
			ret = (readCh == null) ? 
                              writeCh.write(buf) : readCh.read(buf);
			//非阻塞模式下,write或read对应的网络缓冲区满后会直接返回
			//返回值为实际写入或读取的数据
			if (ret < ioSize) {
				break;
			}
		}
		finally {
			buf.limit(originalLimit);
		}
	}
	int nBytes = initialRemaining - buf.remaining();
	return nBytes > 0 ? nBytes : ret;
} 

refer:
1. 《Hadoop技术内幕-深入解析MapReduce架构设计与实现原理》

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注