系统部同事反映了一个问题,在Cassandra内部场景中使用了NIO通信,并使用了one client one thread模型,且不说这种通信模型的问题,同事反映每当有较大的数据传输时都会有很大的内存增长,看似内存泄露,其实这部分内存并不是jvm堆内存,实际叫堆外内存,不容易回收,下面我们先来分析为什么会有内存的不断增长,然后再给出一个比较好的解决方案。
让我们透过源码一步一步追踪其调用过程,对socket发送数据使用SocketChannel 的public abstract int write(ByteBuffer src) throws IOException;方法,下面看下其源码实现(Open JDK7U):
public int write(ByteBuffer buf) throws IOException { if (buf == null) throw new NullPointerException(); synchronized (writeLock) { ensureWriteOpen(); int n = 0; Object traceContext = IoTrace.socketWriteBegin(); try { begin(); synchronized (stateLock) { if (!isOpen()) return 0; writerThread = NativeThread.current(); } for (;;) { n = IOUtil.write(fd, buf, -1, nd); if ((n == IOStatus.INTERRUPTED) && isOpen()) continue; return IOStatus.normalize(n); } } finally { writerCleanup(); IoTrace.socketWriteEnd(traceContext, remoteAddress.getAddress(), remoteAddress.getPort(), n > 0 ? n : 0); end(n > 0 || (n == IOStatus.UNAVAILABLE)); synchronized (stateLock) { if ((n <= 0) && (!isOutputOpen)) throw new AsynchronousCloseException(); } assert IOStatus.check(n); } } }
接着进入第18行n = IOUtil.write(fd, buf, -1, nd);其源码如下:
static int write(FileDescriptor fd, ByteBuffer src, long position, NativeDispatcher nd) throws IOException { if (src instanceof DirectBuffer) return writeFromNativeBuffer(fd, src, position, nd); // Substitute a native buffer int pos = src.position(); int lim = src.limit(); assert (pos <= lim); int rem = (pos <= lim ? lim - pos : 0); ByteBuffer bb = Util.getTemporaryDirectBuffer(rem); try { bb.put(src); bb.flip(); // Do not update src until we see how many bytes were written src.position(pos); int n = writeFromNativeBuffer(fd, bb, position, nd); if (n > 0) { // now update src src.position(pos + n); } return n; } finally { Util.offerFirstTemporaryDirectBuffer(bb); } }
看第13行ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);,获取当前线程的DirectBuffer,此处开始看内存是如何分配的,问题也在这个地方产生,源码如下:
/** * Returns a temporary buffer of at least the given size */ static ByteBuffer getTemporaryDirectBuffer(int size) { BufferCache cache = bufferCache.get(); ByteBuffer buf = cache.get(size); if (buf != null) { return buf; } else { // No suitable buffer in the cache so we need to allocate a new // one. To avoid the cache growing then we remove the first // buffer from the cache and free it. if (!cache.isEmpty()) { buf = cache.removeFirst(); free(buf); } return ByteBuffer.allocateDirect(size); } }
bufferCache是一个ThreadLocal的静态变量,从当前线程BufferCache寻找满足该大小的cache,找不到就重新分配,至于其释放时机又是不确定的,哼,问题在这了。
怎样解决呢,只要我们每次申请的内存不超过一个固定值就不会引起DirectBuffer的内存重新alloc了,其实在读董西成的Hadoop技术内幕时也看到过对这部分的讨论,不过时间久之忘记了,书中明确说明可将写入的数据分成固定大小(比如8KB)的chunk,并以chunk为单位写入DirectBuffer,那就以书中示例代码给出参考,如下:
final static int NIO_BUFFER_LIMIT = 8 * 1024;//chunk 大小8KB //将Buffer中的数据写入Channel中,其中Channel处于非阻塞模式 public int channelWrite(WritableByteChannel channel, ByteBuffer buffer) throws IOException { //如果缓冲区中的数据小于8KB,则直接写入Channel中,否则以chunk为单位写入 return (buffer.remaining() <= NIO_BUFFER_LIMIT) ? channel.write(buffer) : channelIO(null, channel, buffer); } private static int channelIO(ReadableByteChannel readCh, WritableByteChannel writeCh, ByteBuffer buf) throws IOException { int originalLimit = buf.limit(); int initialRemaining = buf.remaining(); int ret = 0; while (buf.remaining() > 0) { try{ int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT); buf.limit(buf.position() + ioSize); ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf); //非阻塞模式下,write或read对应的网络缓冲区满后会直接返回 //返回值为实际写入或读取的数据 if (ret < ioSize) { break; } } finally { buf.limit(originalLimit); } } int nBytes = initialRemaining - buf.remaining(); return nBytes > 0 ? nBytes : ret; }
refer:
1. 《Hadoop技术内幕-深入解析MapReduce架构设计与实现原理》