Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] 分层存储激活时,当向新创建的 Topic 中生产消息,能否正常转储取决于任一批消息生产的时间点是否正确 #8065

Open
3 tasks done
bxfjb opened this issue Apr 24, 2024 · 2 comments

Comments

@bxfjb
Copy link
Contributor

bxfjb commented Apr 24, 2024

Before Creating the Bug Report

  • I found a bug, not just asking a question, which should be created in GitHub Discussions.

  • I have searched the GitHub Issues and GitHub Discussions of this repository and believe that this is not a duplicate.

  • I have confirmed that this bug belongs to the current repository, not other repositories of RocketMQ.

Runtime platform environment

CentOS 7

RocketMQ version

develop

JDK Version

OpenJDK 8

Describe the Bug

表现

  • 分层存储激活时,当向新创建的 Topic 中生产消息,后续消息能否正常转储取决于任一批消息生产的时间点;
  • 日志:
2024-04-24 10:38:03 INFO ReputMessageService - Constructing Posix FileSegment, filePath: /home/work/hdd1/rocketmq/tieredstore/609b5c2c_c5tst-common-rocketmq/c5tst-common-rocketmq-raft1/order-test1/0/COMMIT_LOG/cfcd208400000000000000000000
2024-04-24 10:38:12 INFO MessageStoreDispatcher - Constructing Posix FileSegment, filePath: /home/work/hdd1/rocketmq/tieredstore/609b5c2c_c5tst-common-rocketmq/c5tst-common-rocketmq-raft1/order-test1/0/CONSUME_QUEUE/cfcd208400000000000000000000
2024-04-24 10:38:31 INFO TieredCommonExecutor_1 - Destroy Posix FileSegment, filePath: /home/work/hdd1/rocketmq/tieredstore/609b5c2c_c5tst-common-rocketmq/c5tst-common-rocketmq-raft1/order-test1/0/COMMIT_LOG/cfcd208400000000000000000000
2024-04-24 10:38:31 INFO TieredCommonExecutor_1 - Destroy Posix FileSegment, filePath: /home/work/hdd1/rocketmq/tieredstore/609b5c2c_c5tst-common-rocketmq/c5tst-common-rocketmq-raft1/order-test1/0/CONSUME_QUEUE/cfcd208400000000000000000000
2024-04-24 10:38:31 INFO TieredCommonExecutor_1 - FlatFileStore destroy file, topic=order-test1, queueId=0

2024-04-24 10:40:21 INFO ReputMessageService - Constructing Posix FileSegment, filePath: /home/work/hdd1/rocketmq/tieredstore/609b5c2c_c5tst-common-rocketmq/c5tst-common-rocketmq-raft1/order-test1/0/COMMIT_LOG/cfcd208400000000000000000000
2024-04-24 10:40:31 INFO TieredCommonExecutor_1 - Destroy Posix FileSegment, filePath: /home/work/hdd1/rocketmq/tieredstore/609b5c2c_c5tst-common-rocketmq/c5tst-common-rocketmq-raft1/order-test1/0/COMMIT_LOG/cfcd208400000000000000000000
2024-04-24 10:40:31 INFO TieredCommonExecutor_1 - FlatFileStore destroy file, topic=order-test1, queueId=0

2024-04-24 10:41:40 INFO ReputMessageService - Constructing Posix FileSegment, filePath: /home/work/hdd1/rocketmq/tieredstore/609b5c2c_c5tst-common-rocketmq/c5tst-common-rocketmq-raft1/order-test1/0/COMMIT_LOG/cfcd208400000000000000000000
2024-04-24 10:41:52 INFO MessageStoreDispatcher - Constructing Posix FileSegment, filePath: /home/work/hdd1/rocketmq/tieredstore/609b5c2c_c5tst-common-rocketmq/c5tst-common-rocketmq-raft1/order-test1/0/CONSUME_QUEUE/cfcd208400000000000000000000
2024-04-24 10:42:12 INFO MessageStoreDispatcher - MessageDispatcher#dispatch, topic=order-test1, queueId=0, offset=0-3000, current=0, remain=3000
  • 共生产了三批消息,每批 1000 条
  1. 10:38:03 第一批消息生产到 broker,冷存中创建了 commitlog 文件,稍后 10:38:12 冷存中创建了 consumequeue 文件,但FlatFileStore中对应的的FlatMessageFile在10:38:31被删除,所以转储没有进行;
  2. 10:40:21 第二批消息生产到 broker,冷存中创建了 commitlog 文件,没有创建 consumequeue,FlatMessageFile在10:40:31被删除,转储同样没有进行;
  3. 10:41:40 第三批消息生产到 broker,冷存中创建了 commitlog 文件,稍后 10:41:52 冷存中创建了 consumequeue 文件,并在 10:42:12 进行了正常转储,共 3000 条,没有消息丢失;

分析

经分析,问题抽象如下:

  • 分层存储中存在两个定时任务:dispatch 任务与destroyExpiredFile任务,定时周期分别为 20s 与 60s,也就是说两次destroyExpiredFile之间存在三次dispatch
  • Bug 的具体表现如下时间轴:
    timeline drawio
  1. 如果冷存中Topic没有写入数据且消息的生产时间在时间段2、3,那么这个Topic的转储不能正常进行;
  2. 如果任意一批消息的生产时间在时间段1,那么包括之前卡住的转储都能够正常进行,也就是说不会丢消息;

dispatch 任务

  • 生产消息时:
    MessageStoreDispatcherImpl#dispatch()中,初始化了要转储的 MessageQueue 对应的 FlatMessageFile,也包括其中的 commitlog 与 consumequeue,二者的构造函数如下:
public FlatCommitLogFile(FileSegmentFactory fileSegmentFactory, String filePath) {
    super(fileSegmentFactory, FileSegmentType.COMMIT_LOG, filePath);
    this.initOffset(0L);
}

initOffset() 实际上在冷存中创建了文件,实际上是向FlatMessageFile.commitLog.fileSegmentTable 中 add 了一个空的 FileSegment

public FlatConsumeQueueFile(FileSegmentFactory fileSegmentFactory, String filePath) {
    super(fileSegmentFactory, FileSegmentType.CONSUME_QUEUE, filePath);
}

可以看到没有调用initOffset(),也就是说此时consumeQueue.fileSegmentTable 长度为 0,解释了为什么第一次生产消息时只创建了 commitlog 没有创建 consumequeue;

  • consumequeue 的创建在 FlatMessageFile 初始化后的第一次 dispatchWithSemaphore 中:
// If set to max offset here, some written messages may be lost
if (!flatFile.isFlatFileInit()) {  // return !this.consumeQueue.fileSegmentTable.isEmpty();
    currentOffset = Math.max(minOffsetInQueue,
        maxOffsetInQueue - storeConfig.getTieredStoreGroupCommitSize());
    flatFile.initOffset(currentOffset);
    // this.commitLog.initOffset(0L);
    // this.consumeQueue.initOffset(offset * MessageFormatUtil.CONSUME_QUEUE_UNIT_SIZE);
    return CompletableFuture.completedFuture(true);
}

这行代码 flatFile.initOffset(currentOffset); 在冷存中创建了 consumequeue 文件,之后直接返回,等待 20s 后下一次的dispatch进行真正的转储;

destroyExpiredFile任务

  • 清理过期文件逻辑
    如下FlatAppendFile#destroyExpiredFile():
public void destroyExpiredFile(long expireTimestamp) {
    fileSegmentLock.writeLock().lock();
    try {
        while (!fileSegmentTable.isEmpty()) {

            // first remove expired file from fileSegmentTable
            // then close and delete expired file
            FileSegment fileSegment = fileSegmentTable.get(0);

            if (fileSegment.getMaxTimestamp() != Long.MAX_VALUE &&
                fileSegment.getMaxTimestamp() > expireTimestamp) {
                log.debug("FileSegment has not expired, filePath={}, fileType={}, " +
                        "offset={}, expireTimestamp={}, maxTimestamp={}", filePath, fileType,
                    fileSegment.getBaseOffset(), expireTimestamp, fileSegment.getMaxTimestamp());
                break;
            }

            fileSegment.destroyFile();
            if (!fileSegment.exists()) {
                fileSegmentTable.remove(0);
                metadataStore.deleteFileSegment(filePath, fileType, fileSegment.getBaseOffset());
            }
        }
    } finally {
        fileSegmentLock.writeLock().unlock();
    }
}

1.主要问题在于 if (fileSegment.getMaxTimestamp() != Long.MAX_VALUE && fileSegment.getMaxTimestamp() > expireTimestamp) 这一行,因为 commitlog 与 consumequeue 在没有写入消息时 maxTimestamp 默认为 Long.MAX_VALUE,(@lizhimins 这里这个判断条件有什么用意吗) 因此FlatMessageFile的初始化时间到下一次清理过期文件之间至少要执行两次dispatch,一次用于生成 consumequeue,一次用于修改 maxTimestamp 为正常值,否则尚未执行转储的 FlatMessageFile 会被清理;
2.但仅修改时间戳的限制这里的话,那么FlatMessageFile的初始化时间到下一次清理过期文件之间仍然至少要执行一次dispatch,用于生成 consumequeue,否则FlatMessageFile会被FlatStore#load()中的以下代码删除:

if (flatFile.consumeQueue.fileSegmentTable.isEmpty()) {
    this.destroyFile(flatFile.getMessageQueue());
}

解决方案

删去 if (fileSegment.getMaxTimestamp() != Long.MAX_VALUE 这一行,并在 FlatMessageFile 初始化时立即创建 commitlog 与 consumequeue文件

Steps to Reproduce

开启分层存储,按照时间轴,在不同的时间点生产消息

What Did You Expect to See?

消息正常转储,FlatMessageFile不被删除

What Did You See Instead?

消息被转储前FlatMessageFile被删除

Additional Context

No response

@lizhimins
Copy link
Member

分析的很具体,依次回复几个问题:

  1. 对于冷存储中的 CommitLog 可以从 0 开始,而 cq 必须与本地位点对齐,只能在 dispatch 时才能确定初始化的位点,因此不会在 factory 中初始化指定,这一点也是为了保证与旧存储格式的兼容。

  2. fileSegment.getMaxTimestamp() != Long.MAX_VALUE 这个条件还有一个原因是我发现之前同学实现的旧代码中有缺陷,没有修改 segment metadata 中的 time,导致整个 flatfile 无法过期删除。

  3. load 里面初始化的 destory 的间隔目前写的是1分钟,可以改为1小时。

@bxfjb
Copy link
Contributor Author

bxfjb commented Apr 26, 2024

分析的很具体,依次回复几个问题:

  1. 对于冷存储中的 CommitLog 可以从 0 开始,而 cq 必须与本地位点对齐,只能在 dispatch 时才能确定初始化的位点,因此不会在 factory 中初始化指定,这一点也是为了保证与旧存储格式的兼容。
  2. fileSegment.getMaxTimestamp() != Long.MAX_VALUE 这个条件还有一个原因是我发现之前同学实现的旧代码中有缺陷,没有修改 segment metadata 中的 time,导致整个 flatfile 无法过期删除。
  3. load 里面初始化的 destory 的间隔目前写的是1分钟,可以改为1小时。

修改 destroy 间隔应该可以有效规避这个问题
另外 2 提到的问题现在还存在吗,如果不存在的话是否可以去掉这个条件,因为按照 dispatch 的逻辑,如果一个 FlatMessageFile 存在,那么意味着应该有消息等待被转移到冷存,删除一个刚刚创建的 FlatMessageFile 似乎不太合理

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants