Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Memory leakage detected when IndexStoreService is enabled and TieredMessageStore's FileSegment is PosixFileSegment #8017

Open
3 tasks done
bxfjb opened this issue Apr 11, 2024 · 14 comments · May be fixed by #8036
Open
3 tasks done

Comments

@bxfjb
Copy link
Contributor

bxfjb commented Apr 11, 2024

Before Creating the Bug Report

  • I found a bug, not just asking a question, which should be created in GitHub Discussions.

  • I have searched the GitHub Issues and GitHub Discussions of this repository and believe that this is not a duplicate.

  • I have confirmed that this bug belongs to the current repository, not other repositories of RocketMQ.

Runtime platform environment

CentOS 7

RocketMQ version

branch: develop

JDK Version

OpenJDK 8

Describe the Bug

  • We deployed a rocketmq cluster with TieredStore and offload cold data to hdd with PosixFileSegment. After about 16 hours of producing messages, the shutdownHook was called and there is no other error log.

2024-04-11 09:57:22 INFO ShutdownHook - Shutdown hook was invoked, 1

  • By checking monitor, we found out that the direct memory usage kept on increasing in a stable rate,

内存上涨

  • which reminds us of checking if there is memory leak. Actually the increase rate is a direct clue -- about 570 MB each time, exactly the size of a compacted IndexFile, which leads us to here at IndexStoreService.java:
public boolean doCompactThenUploadFile(IndexFile indexFile) {
        ...  
        boolean result = flatAppendFile.commitAsync().join();
        ...
}
  • Add a watch of direct memory when debugging the UT doCompactionTest at IndexStoreServiceTest.java can prove that doCompactThenUploadFile may cause direct memory leak.

  • By diving in the code, the root cause actually is inappropriate use of FileChannel at PosixFileSegment.java:

@Override
    @SuppressWarnings("ResultOfMethodCallIgnored")
    public CompletableFuture<Boolean> commit0(
        FileSegmentInputStream inputStream, long position, int length, boolean append) {

        return CompletableFuture.supplyAsync(() -> {
            try {
                byte[] byteArray = ByteStreams.toByteArray(inputStream);
                writeFileChannel.position(position);
                ByteBuffer buffer = ByteBuffer.wrap(byteArray);
                while (buffer.hasRemaining()) {
                    writeFileChannel.write(buffer);
                }
                writeFileChannel.force(true);
            } catch (Exception e) {
                return false;
            }
            return true;
        }, MessageStoreExecutor.getInstance().bufferCommitExecutor);
    }
  • The line writeFileChannel.write(buffer); would allocate a DirectByteBuffer whose size is same with buffer, which means in uploading of IndexFile, the DirectByteBuffer's size will be about 570MB. This DirectByteBuffer is an element of an array of ByteBuffer of a ThreadLocal variable in sun.nio.ch.Util.class:
private static class BufferCache {
    private ByteBuffer[] buffers;
    private int count;
    private int start;
    ...
}

private static ThreadLocal<BufferCache> bufferCache;
  • The actual process of writeFileChannel.write(buffer) is:
  1. find a DirectByteBuffer which is big enough to store data of buffer, if there is not, allocate one;
  2. write data from DirectByteBuffer to pagecache;
  3. return the DirectByteBuffer to BufferCache and do not release it;
  • which means that the direct memory allocated would not be released until the thread is killed, which will never happen for teh thread runs commit0 is in a thread pool of MessageStoreExecutor.

  • Reproducing the bug with personal code is simple, writing a class like below, call FileWrite.run() and open jvisualvm, the increasing of direct buffer should be shown in Buffer Pools.

public class FileWrite {
    private FileChannel readFileChannel;
    private FileChannel writeFileChannel;
    private ExecutorService executor;
    private final String placeHolder = new String(new char[60000]).replace('\0', '1');

    public FileWrite(String path) throws Exception {
        this.readFileChannel = new RandomAccessFile(new File(path), "r").getChannel();
        this.writeFileChannel = new RandomAccessFile(new File(path), "rwd").getChannel();
        this.executor = new ThreadPoolExecutor(
                Math.max(16, Runtime.getRuntime().availableProcessors() * 4),
                Math.max(16, Runtime.getRuntime().availableProcessors() * 4),
                60 * 1000,
                TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(100));
    }

    public void run() throws InterruptedException {
        for (int i = 0;i < 100;++i) {
            writeAsync();
            Thread.sleep(5 * 1000)
        }
    }

    public CompletableFuture<Boolean> writeAsync() {
        return CompletableFuture.supplyAsync(this::write, executor);
    }
}

buffer

  • Atention that the size of thread pool will limit the increase of buffer usage, for BufferCache of each thread is reusable, the increase should stop as each thread in thread pool runs writeAsync() at least once.

  • For example, reduce the size of trhead pool to 4:

buffer_limit

Steps to Reproduce

2 choices:

  1. Deploy a rocketmq cluster with TieredStore and offload cold data to hdd with PosixFileSegment, keep on producing new messages and the direct memory usage should increase unlimitedly.
  2. Or run IndexStoreServiceTest.doCompactionTest and keep an eye on the direct buffer usage.

What Did You Expect to See?

The direct memory usage should not increase unlimitedly.

What Did You See Instead?

The direct memory usage keeps on increasing until OOM or all threads in commitExecutor run commit0 at least once.

Additional Context

There might be some solution of this bug:

  1. Reduce the size of bufferCommitExecutor;
  2. Limit the size of BufferCache by config -Djdk.nio.maxCachedBufferSize;
  3. Limit the size of inputStream in PosixFileSegment.commit0's parameter list;
@bxfjb
Copy link
Contributor Author

bxfjb commented Apr 12, 2024

We may introduce a DirectByteBuffer pool like TransientStorePool or use MappedByteBuffer

@caigy
Copy link
Contributor

caigy commented Apr 12, 2024

ByteStreams.toByteArray() creates large array in memory and is unnecessary. It seems FileChannel.transferFrom() with NIO channel would be more efficient for writing streams to a file.

@bxfjb
Copy link
Contributor Author

bxfjb commented Apr 12, 2024

ByteStreams.toByteArray() creates large array in memory and is unnecessary. It seems FileChannel.transferFrom() with NIO channel would be more efficient for writing streams to a file.

Thx for reply, I tried FileChannel.transferFrom() but find out some unittest failed:

@Test
public void consumeQueueTest() throws ClassNotFoundException, NoSuchMethodException {
    ...
    ByteBuffer cqItem1 = fileSegment.read(initPosition, unitSize);
    Assert.assertEquals(baseOffset, cqItem1.getLong());  // failed
    ...
}

java.lang.AssertionError:
Expected :1000
Actual :0

It turns out the cq data was not flush correctly by debugging, I wonder if the way I use FileChannel.transferFrom() is wrong:
Before:

                byte[] byteArray = ByteStreams.toByteArray(inputStream);
                writeFileChannel.position(position);
                ByteBuffer buffer = ByteBuffer.wrap(byteArray);
                while (buffer.hasRemaining()) {
                    writeFileChannel.write(buffer);
                }
                writeFileChannel.force(true);

After:

                ReadableByteChannel readableByteChannel = Channels.newChannel(inputStream);
                writeFileChannel.transferFrom(readableByteChannel, position, length);
                writeFileChannel.force(true);

@caigy
Copy link
Contributor

caigy commented Apr 15, 2024

ByteStreams.toByteArray() creates large array in memory and is unnecessary. It seems FileChannel.transferFrom() with NIO channel would be more efficient for writing streams to a file.

Thx for reply, I tried FileChannel.transferFrom() but find out some unittest failed:

@Test
public void consumeQueueTest() throws ClassNotFoundException, NoSuchMethodException {
    ...
    ByteBuffer cqItem1 = fileSegment.read(initPosition, unitSize);
    Assert.assertEquals(baseOffset, cqItem1.getLong());  // failed
    ...
}

java.lang.AssertionError:
Expected :1000
Actual :0

It turns out the cq data was not flush correctly by debugging, I wonder if the way I use FileChannel.transferFrom() is wrong: Before:

                byte[] byteArray = ByteStreams.toByteArray(inputStream);
                writeFileChannel.position(position);
                ByteBuffer buffer = ByteBuffer.wrap(byteArray);
                while (buffer.hasRemaining()) {
                    writeFileChannel.write(buffer);
                }
                writeFileChannel.force(true);

After:

                ReadableByteChannel readableByteChannel = Channels.newChannel(inputStream);
                writeFileChannel.transferFrom(readableByteChannel, position, length);
                writeFileChannel.force(true);

It seems the modification is not equivalent. writeFileChannel.position(position) has side effect, making writeFileChannel written from position. But writeFileChannel.transferFrom() exits when position is larger than the size of writeFileChannel.

You might extend the file of writeFileChannel to position and also check if the transferred bytes is equal to length.

@bxfjb
Copy link
Contributor Author

bxfjb commented Apr 15, 2024

ByteStreams.toByteArray() creates large array in memory and is unnecessary. It seems FileChannel.transferFrom() with NIO channel would be more efficient for writing streams to a file.

Thx for reply, I tried FileChannel.transferFrom() but find out some unittest failed:

@Test
public void consumeQueueTest() throws ClassNotFoundException, NoSuchMethodException {
    ...
    ByteBuffer cqItem1 = fileSegment.read(initPosition, unitSize);
    Assert.assertEquals(baseOffset, cqItem1.getLong());  // failed
    ...
}

java.lang.AssertionError:
Expected :1000
Actual :0

It turns out the cq data was not flush correctly by debugging, I wonder if the way I use FileChannel.transferFrom() is wrong: Before:

                byte[] byteArray = ByteStreams.toByteArray(inputStream);
                writeFileChannel.position(position);
                ByteBuffer buffer = ByteBuffer.wrap(byteArray);
                while (buffer.hasRemaining()) {
                    writeFileChannel.write(buffer);
                }
                writeFileChannel.force(true);

After:

                ReadableByteChannel readableByteChannel = Channels.newChannel(inputStream);
                writeFileChannel.transferFrom(readableByteChannel, position, length);
                writeFileChannel.force(true);

It seems the modification is not equivalent. writeFileChannel.position(position) has side effect, making writeFileChannel written from position. But writeFileChannel.transferFrom() exits when position is larger than the size of writeFileChannel.

You might extend the file of writeFileChannel to position and also check if the transferred bytes is equal to length.

Agree with you. But then I was wondering if the failed unittest necessary, it seems acting like:

  • set a empty file's commitPosition to 100
  • append some mocked ConsumeQueue Buffer to it
  • commitPosition is larger than the size of writeFileChannel, commit failed

Is it acceptable to remove step 1 in consumeQueueTest? @lizhimins 'cause the scenario seems nonexistent in production.

Furthermore, FileChannel.read() has the same problem with FileChannel.write(), but to return a ByteBuffer, calling it looks like inevitable. I didn't find out how to use FileChannel.transferTo() to replace it completely. I'd like to use MappedByteBuffer as a alternative.

@caigy
Copy link
Contributor

caigy commented Apr 16, 2024

@bxfjb If you want to extend the size of writeFileChannel to make FileChannel.transferTo() successful, you may try writing a byte at position - 1:

writeFileChannel.write(ByteBuffer.wrap(new byte[]{0}), position - 1);

@bxfjb
Copy link
Contributor Author

bxfjb commented Apr 18, 2024

@bxfjb If you want to extend the size of writeFileChannel to make FileChannel.transferTo() successful, you may try writing a byte at position - 1:

writeFileChannel.write(ByteBuffer.wrap(new byte[]{0}), position - 1);

Eventually I decide to use MappedByteBuffer for FileChannel.transferFrom()'s worse IO preformance in tests, could you pls review the #8036?

@lizhimins
Copy link
Member

lizhimins commented Apr 18, 2024

The default implementation in the current code is DefaultMetadataStore and PosixFileSegment.
In the original design, PosixFileSegment only as test cases and validation purposes. We used metadata management based on Rocksdb implementation and FileSegment based on object storage or HDFS implementation in out production. In order to achieve higher performance and lower storage costs, this part of the code will contribute in future pr.

@lizhimins
Copy link
Member

Adding a new file type for index looks good

@caigy
Copy link
Contributor

caigy commented Apr 18, 2024

@bxfjb If you want to extend the size of writeFileChannel to make FileChannel.transferTo() successful, you may try writing a byte at position - 1:

writeFileChannel.write(ByteBuffer.wrap(new byte[]{0}), position - 1);

Eventually I decide to use MappedByteBuffer for FileChannel.transferFrom()'s worse IO preformance in tests, could you pls review the #8036?

In general mmap has a greater impact on page cache and may cause resource contention with normal commit log. We need to evaluate the impact on overall performance carefully.

@lizhimins
Copy link
Member

@bxfjb If you want to extend the size of writeFileChannel to make FileChannel.transferTo() successful, you may try writing a byte at position - 1:

writeFileChannel.write(ByteBuffer.wrap(new byte[]{0}), position - 1);

Eventually I decide to use MappedByteBuffer for FileChannel.transferFrom()'s worse IO preformance in tests, could you pls review the #8036?

In general mmap has a greater impact on page cache and may cause resource contention with normal commit log. We need to evaluate the impact on overall performance carefully.

If the target storage is based on the POSIX protocol, using directIO is a good choice.

@bxfjb
Copy link
Contributor Author

bxfjb commented Apr 18, 2024

@bxfjb If you want to extend the size of writeFileChannel to make FileChannel.transferTo() successful, you may try writing a byte at position - 1:

writeFileChannel.write(ByteBuffer.wrap(new byte[]{0}), position - 1);

Eventually I decide to use MappedByteBuffer for FileChannel.transferFrom()'s worse IO preformance in tests, could you pls review the #8036?

In general mmap has a greater impact on page cache and may cause resource contention with normal commit log. We need to evaluate the impact on overall performance carefully.

Got it, I am doing stress tests to reveal the impact as possible as I can. So far it looks good under about 60k producer TPS.

@bxfjb
Copy link
Contributor Author

bxfjb commented Apr 18, 2024

@bxfjb If you want to extend the size of writeFileChannel to make FileChannel.transferTo() successful, you may try writing a byte at position - 1:

writeFileChannel.write(ByteBuffer.wrap(new byte[]{0}), position - 1);

Eventually I decide to use MappedByteBuffer for FileChannel.transferFrom()'s worse IO preformance in tests, could you pls review the #8036?

In general mmap has a greater impact on page cache and may cause resource contention with normal commit log. We need to evaluate the impact on overall performance carefully.

If the target storage is based on the POSIX protocol, using directIO is a good choice.

I chose MappedByteBuffer cuz that:

  • FileChannel.write()/read() is banned for memory leak
  • FileChannel.transferFrom()'s performance is unacceptable low, more specifically, about 200 messages are written per second

I wonder what do you mean about direct IO, does it mean DirectByteBuffer or something?

@lizhimins
Copy link
Member

对于 SSD 转 HDD 场景,当前设计中 PosixFileSegment 主要是为了测试场景,可以改进。如果用 MappedByteBuffer,把数据从 CommitLog 复制到 FileSegment,也会有 pagecache 浪费内存的问题。至于如何实现,可以搜索 Java directIO lib

For copy data from SSD to HDD scenario, the PosixFileSegment in the current design is mainly for testing scenarios and can be improved. If you use MappedByteBuffer to copy data from CommitLog to FileSegment, there will also be a problem of pagecache wasting memory. We can use Java directIO lib to slove this problem

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants