You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
// If set to max offset here, some written messages may be lost
if (!flatFile.isFlatFileInit()) { // return !this.consumeQueue.fileSegmentTable.isEmpty();
currentOffset = Math.max(minOffsetInQueue,
maxOffsetInQueue - storeConfig.getTieredStoreGroupCommitSize());
flatFile.initOffset(currentOffset);
// this.commitLog.initOffset(0L);
// this.consumeQueue.initOffset(offset * MessageFormatUtil.CONSUME_QUEUE_UNIT_SIZE);
return CompletableFuture.completedFuture(true);
}
Before Creating the Bug Report
I found a bug, not just asking a question, which should be created in GitHub Discussions.
I have searched the GitHub Issues and GitHub Discussions of this repository and believe that this is not a duplicate.
I have confirmed that this bug belongs to the current repository, not other repositories of RocketMQ.
Runtime platform environment
CentOS 7
RocketMQ version
develop
JDK Version
OpenJDK 8
Describe the Bug
表现
FlatFileStore
中对应的的FlatMessageFile
在10:38:31被删除,所以转储没有进行;FlatMessageFile
在10:40:31被删除,转储同样没有进行;分析
经分析,问题抽象如下:
dispatch
任务与destroyExpiredFile
任务,定时周期分别为 20s 与 60s,也就是说两次destroyExpiredFile
之间存在三次dispatch
;dispatch
任务在
MessageStoreDispatcherImpl#dispatch()
中,初始化了要转储的MessageQueue
对应的FlatMessageFile
,也包括其中的 commitlog 与 consumequeue,二者的构造函数如下:initOffset()
实际上在冷存中创建了文件,实际上是向FlatMessageFile.commitLog.fileSegmentTable
中 add 了一个空的FileSegment
;可以看到没有调用
initOffset()
,也就是说此时consumeQueue.fileSegmentTable
长度为 0,解释了为什么第一次生产消息时只创建了 commitlog 没有创建 consumequeue;FlatMessageFile
初始化后的第一次dispatchWithSemaphore
中:这行代码
flatFile.initOffset(currentOffset);
在冷存中创建了 consumequeue 文件,之后直接返回,等待 20s 后下一次的dispatch
进行真正的转储;destroyExpiredFile
任务如下
FlatAppendFile#destroyExpiredFile()
:1.主要问题在于
if (fileSegment.getMaxTimestamp() != Long.MAX_VALUE && fileSegment.getMaxTimestamp() > expireTimestamp)
这一行,因为 commitlog 与 consumequeue 在没有写入消息时maxTimestamp
默认为Long.MAX_VALUE
,(@lizhimins 这里这个判断条件有什么用意吗) 因此FlatMessageFile
的初始化时间到下一次清理过期文件之间至少要执行两次dispatch
,一次用于生成 consumequeue,一次用于修改maxTimestamp
为正常值,否则尚未执行转储的FlatMessageFile
会被清理;2.但仅修改时间戳的限制这里的话,那么
FlatMessageFile
的初始化时间到下一次清理过期文件之间仍然至少要执行一次dispatch
,用于生成 consumequeue,否则FlatMessageFile
会被FlatStore#load()
中的以下代码删除:解决方案
删去
if (fileSegment.getMaxTimestamp() != Long.MAX_VALUE
这一行,并在FlatMessageFile
初始化时立即创建 commitlog 与 consumequeue文件Steps to Reproduce
开启分层存储,按照时间轴,在不同的时间点生产消息
What Did You Expect to See?
消息正常转储,
FlatMessageFile
不被删除What Did You See Instead?
消息被转储前
FlatMessageFile
被删除Additional Context
No response
The text was updated successfully, but these errors were encountered: