Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] parseMessageMetadata error when broker entry metadata enable with high loading #22601

Open
2 of 3 tasks
semistone opened this issue Apr 26, 2024 · 53 comments · May be fixed by #22760
Open
2 of 3 tasks

[Bug] parseMessageMetadata error when broker entry metadata enable with high loading #22601

semistone opened this issue Apr 26, 2024 · 53 comments · May be fixed by #22760
Labels
type/bug The PR fixed a bug or issue reported a bug

Comments

@semistone
Copy link

semistone commented Apr 26, 2024

Search before asking

  • I searched in the issues and found nothing similar.

Read release policy

  • I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.

Version

  • 3.2.2
  • 3.1.2

Minimal reproduce step

publish event in about 6k QPS and 100Mbits/sec
with metaData
BatcherBuilder.KEY_BASED mode
and producer and send message by high concurrent/parallel producer process.
it happens only in almost real time consumer (almost zero backlog)

What did you expect to see?

no lost event

What did you see instead?

could see error log in broker and show
Failed to peek sticky key from the message metadata

it look like thread safe issue, because it happen randomly.
in 1M events, it only happen few times but the consumer will lose few events

Anything else?

the error similar to
#10967 but I think it's different issue.

the data in bookkeeper is correct.
I can download the event from bookkeeper and parse it successfully.
or consume the same event few minutes later and it could consume successfully.
but all subscriptions will get the same error in the same event in real time consumer(zero backlog).

I have traced source code.
it happens in
PersistentDispatcherMultipleConsumers.readEntriesComplete -> AbstractBaseDispatcher.filterEntriesForConsumer
-> Commands.peekAndCopyMessageMetadata

and I also print the ByteBuf contents,
it's I could clearly see the data isn't the same in bookkeeper

in normal event , the hex code usually start by 010e (magicCrc32c)

0000000      010e    9529    5fbc    0000    0a03    3a0a    6e69    7267

in one of our error event, the bytebuf have about 48 bytes strange data, then continue with normal data

0000000      0000    a610    0000    0000    0200    7239    0000    0000 <== from here
     
0000020      0200    1339    0000    0000    ea17    a8b0    8b8e    fa5e
        
0000040      2af0    2675    f645    1623    d17e    dc34    526d    ef44 <=== until here is garbage
          
0000060      010e    9529    5fbc    0000    0a03    3a0a    6e69    7267 <== from here is normal data

this is just an example, but sometimes the first few bytes are correct and something wrong after few bytes later.

I am still trying to debug when and how the ByteBuf returns incorrect data, and why it only happens during stress testing. It is still not easy to reproduce using the perf tool, but we can 100% reproduce it in our producer code.

Does anyone have any idea what could be causing this issue, and any suggestions on which library or class may have potential issues? Additionally, any suggestions on how to debug this issue or if I need to print any specific information to help identify the root cause would be appreciated. Thank you.

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@semistone semistone added the type/bug The PR fixed a bug or issue reported a bug label Apr 26, 2024
@lhotari
Copy link
Member

lhotari commented Apr 26, 2024

@semistone Just wondering if this could be related to apache/bookkeeper#4196?
There might are also other recent ByteBuf retain/release fixes such as #22393 .
In Bookkeeper, there's apache/bookkeeper#4289 pending release and apache/bookkeeper#4293 is pending review.

@semistone
Copy link
Author

we still try to compare what's the different between our producer and perf tool
will feedback later once we have any conclusion.

@semistone
Copy link
Author

semistone commented May 2, 2024

@lhotari
we do many tests
current broker setting is

maxMessageSize=5242880

and producer setting (small batch message and big max bytes)

batchingMaxMessages: 500
batchingMaxBytes: 3145728
batchingMaxPublishDelayMicros: 500

payload
98% < 3K bytes
2% between 10-20K bytes

then it will show that error and publish throughput isn't good.

but if we change to

batchingMaxMessages: 1000
batchingMaxBytes: 3145728
batchingMaxPublishDelayMicros: 1000

and filter all data bigger than 15K bytes
then that error disappear

so we decide to create
one batch publisher to publish data < 15000 bytes
and one chunk publisher to publish data >= 15000 bytes
then it worked and performance is also better than previous test

we still don't known why
but at least we have workaround solution now.

I don't how which batch producer configuration could fix this errors.
if you have any suggestions, we will still try it .

and we also publish in multi thread programs,
seems like it's not directly related to loading but related to payload size
but maybe if the publish rate is low, it's more difficult to reproduce .

we also tried to reproduce by perf tool but it didn't always happen.

thanks

@semistone
Copy link
Author

semistone commented May 8, 2024

I tried to upgrade to bookkeeper 4.17.0
but still have the same issue :(

[pulsar@cockroach308 lib]$ ls |grep bookkeeper
org.apache.bookkeeper-bookkeeper-benchmark-4.17.0.jar
org.apache.bookkeeper-bookkeeper-common-4.17.0.jar
org.apache.bookkeeper-bookkeeper-common-allocator-4.17.0.jar
org.apache.bookkeeper-bookkeeper-perf-4.17.0.jar
org.apache.bookkeeper-bookkeeper-proto-4.17.0.jar
org.apache.bookkeeper-bookkeeper-server-4.17.0.jar
org.apache.bookkeeper-bookkeeper-slogger-api-4.17.0.jar
org.apache.bookkeeper-bookkeeper-slogger-slf4j-4.17.0.jar
org.apache.bookkeeper-bookkeeper-tools-4.17.0.jar
org.apache.bookkeeper-bookkeeper-tools-framework-4.17.0.jar
org.apache.bookkeeper-bookkeeper-tools-ledger-4.17.0.jar
org.apache.bookkeeper-circe-checksum-4.17.0.jar
org.apache.bookkeeper-cpu-affinity-4.17.0.jar
org.apache.bookkeeper.http-http-server-4.17.0.jar
org.apache.bookkeeper.http-vertx-http-server-4.17.0.jar
org.apache.bookkeeper-native-io-4.17.0.jar
org.apache.bookkeeper-statelib-4.17.0.jar
org.apache.bookkeeper.stats-bookkeeper-stats-api-4.17.0.jar
org.apache.bookkeeper.stats-codahale-metrics-provider-4.17.0.jar
org.apache.bookkeeper.stats-otel-metrics-provider-4.17.0.jar
org.apache.bookkeeper.stats-prometheus-metrics-provider-4.17.0.jar
org.apache.bookkeeper-stream-storage-cli-4.17.0.jar
org.apache.bookkeeper-stream-storage-java-client-4.17.0.jar
org.apache.bookkeeper-stream-storage-server-4.17.0.jar
org.apache.bookkeeper-stream-storage-service-api-4.17.0.jar
org.apache.bookkeeper-stream-storage-service-impl-4.17.0.jar
org.apache.bookkeeper.tests-stream-storage-tests-common-4.17.0.jar
org.apache.pulsar-pulsar-package-bookkeeper-storage-3.2.2.jar

@lhotari
Copy link
Member

lhotari commented May 8, 2024

we also tried to reproduce by perf tool but it didn't always happen.

@semistone Please share a way how to reproduce it. It's not a problem if it's not always consistent. Fixing this issue will be a lot easier if there's at least some way to reproduce.

@lhotari
Copy link
Member

lhotari commented May 8, 2024

I tried to upgrade to bookkeeper 4.17.0
but still have the same issue :(

@semistone Thanks for testing this.

@semistone
Copy link
Author

we also tried to reproduce by perf tool but it didn't always happen.

@semistone Please share a way how to reproduce it. It's not a problem if it's not always consistent. Fixing this issue will be a lot easier if there's at least some way to reproduce.

I will try to reproduce in perf tool.

@lhotari
Copy link
Member

lhotari commented May 8, 2024

@semistone since you have some way to reproduce this in your own tests, would you be able to test if this can be reproduced with dispatcherDispatchMessagesInSubscriptionThread=false?

pulsar/conf/broker.conf

Lines 435 to 436 in 80d4675

# Dispatch messages and execute broker side filters in a per-subscription thread
dispatcherDispatchMessagesInSubscriptionThread=true

It impacts this code:

// dispatch messages to a separate thread, but still in order for this subscription
// sendMessagesToConsumers is responsible for running broker-side filters
// that may be quite expensive
if (serviceConfig.isDispatcherDispatchMessagesInSubscriptionThread()) {
// setting sendInProgress here, because sendMessagesToConsumers will be executed
// in a separate thread, and we want to prevent more reads
acquireSendInProgress();
dispatchMessagesThread.execute(() -> {
if (sendMessagesToConsumers(readType, entries, false)) {
updatePendingBytesToDispatch(-size);
readMoreEntries();
} else {
updatePendingBytesToDispatch(-size);
}
});
} else {
if (sendMessagesToConsumers(readType, entries, true)) {
updatePendingBytesToDispatch(-size);
readMoreEntriesAsync();
} else {
updatePendingBytesToDispatch(-size);
}
}

@semistone
Copy link
Author

I almost could reproduce by perf tool
when very few payload > 30K bytes. others are 3K bytes
then
error happen when messageKeyGenerationMode=random
if without messageKeyGenerationMode, then error disappear

I guess in batch mode, payload size have some restriction.

let me confirm again tomorrow to make sure I didn't make any stupid mistake during my test.

@semistone
Copy link
Author

Hi @lhotari
I update perf tool in
https://github.com/semistone/pulsar/tree/debug_ssues_22601

it only include one commit which modify PerformanceProducer.java to include
big payload ( -bp 5 means 5 percent big payload)
and BatcherBuilder.KEY_BASED (-kb)

my consumer command is

bin/pulsar-perf  consume persistent://my-tenant/my-namespace/my-topic-1   --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationTls --auth-params '{"tlsCertFile":"conf/superuser.cer","tlsKeyFile":"conf/superuser.key.pem"}' -n 10 -sp Latest -ss angus_test --batch-index-ack   -st Key_Shared 

and producer command is

bin/pulsar-perf produce persistent://my-tenant/my-namespace/my-topic-1 -r 6000 -kb -s 2000 -bp 5  -bm 1000  -b 1 -mk random  --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationTls --auth-params '{"tlsCertFile":"conf/superuser.cer","tlsKeyFile":"conf/superuser.key.pem"}' 

that error happen when

Batch builder is KEY_BASE
with random event key
and few big payload (in my environment 3% could reproduce 10% will crash producer)

in my test
I use normal payload 2K bytes , big payload 20K bytes
if I removed any above conditions, that error will either reduced or disappear.

when it happen it will have WARN message in pulsar-broker.log

2024-05-09T01:12:35,246+0000 [pulsar-io-3-31] WARN  org.apache.pulsar.broker.service.ServerCnx - [/100.96.184.253:39710] Got exception java.lang.IllegalArgumentException: Invalid unknonwn tag type: 6
or 
2024-05-09T01:12:35,260+0000 [broker-topic-workers-OrderedExecutor-15-0] ERROR org.apache.pulsar.common.protocol.Commands - [persistent://budas/budas-preprod-internal/bud_stream_input-partition-1] [angus_test] Failed to peek sticky key from the message metadata
java.lang.IllegalArgumentException: Invalid unknonwn tag type: 4

unfortunately I can't preproduce in docker, I guess docker standalone is different from my pulsar cluster.
my pulsar cluster is
almost default config but with TLS auth in broker/bookkeeper/zookeeper.

Please help to check it, if have any problem to reproduce this issue in your environment,
then I will try to simplify my pulsar cluster to reproduce it.

Thanks

@semistone
Copy link
Author

@semistone since you have some way to reproduce this in your own tests, would you be able to test if this can be reproduced with dispatcherDispatchMessagesInSubscriptionThread=false?

I tested, still the same

@lhotari
Copy link
Member

lhotari commented May 9, 2024

@semistone since you have some way to reproduce this in your own tests, would you be able to test if this can be reproduced with dispatcherDispatchMessagesInSubscriptionThread=false?

I tested, still the same

@semistone Thanks for testing. That tells that it's not related to switching the thread in

// dispatch messages to a separate thread, but still in order for this subscription
// sendMessagesToConsumers is responsible for running broker-side filters
// that may be quite expensive
if (serviceConfig.isDispatcherDispatchMessagesInSubscriptionThread()) {
// setting sendInProgress here, because sendMessagesToConsumers will be executed
// in a separate thread, and we want to prevent more reads
acquireSendInProgress();
dispatchMessagesThread.execute(() -> {
if (sendMessagesToConsumers(readType, entries, false)) {
updatePendingBytesToDispatch(-size);
readMoreEntries();
} else {
updatePendingBytesToDispatch(-size);
}
});
} else {
if (sendMessagesToConsumers(readType, entries, true)) {
updatePendingBytesToDispatch(-size);
readMoreEntriesAsync();
} else {
updatePendingBytesToDispatch(-size);
}
}
.

@semistone
Copy link
Author

semistone commented May 17, 2024

@lhotari
I am checking when that byteBuf went wrong
and in
OpAddEntry.java
I verify data when construct this object and save original data

and during run(), I try to compare and print bytebuf

        if (ml.hasActiveCursors()) {
            // Avoid caching entries if no cursor has been created
            EntryImpl entry = EntryImpl.create(ledgerId, entryId, data);
            // ======= print log when data go wrong
            if (!Commands.hasChecksum(data)) {  
                log.warn("no checksum in OpAddEntry1 origin {} data  {}, now {} data {}", dataString,
                        new String(Base64.encodeBase64(orgData))
                        , data.toString(), new String(Base64.encodeBase64(entry.getData())));
            }

it show

2024-05-17T07:29:16,476+0000 [BookKeeperClientWorker-OrderedExecutor-12-0] WARN  org.apache.bookkeeper.mledger.impl.OpAddEntry - no checksum in OpAddEntry1 
origin PooledSlicedByteBuf(ridx: 13, widx: 2066, cap: 2066/2066, unwrapped: PooledUnsafeDirectByteBuf(ridx: 24211, widx: 24211, cap: 32768)) 

data  DgEJYVBqAAAAKwoMaHlicmlkLTQ0MC0wEI8cGPTz0av4MTILLTEyMDM1NzAyOTNI0A+IAQAxMjM0NTY3ODkxMDExMTIxMzE0MTUxNjE3MTgxOTIwMjEyMjIzMjQyNTI2MjcyODI5MzAzMTMyMzMzNDM1MzYzNzM4Mzk0MDQxNDI0MzQ0NDU0NjQ3NDg0OTUwNTE1MjUzNTQ1NTU2NTc1ODU5NjA2MTYyNjM2NDY1NjY2NzY4Njk3MDcxNzI3Mzc0NzU3Njc3Nzg3OTgwO... skip

,now PooledSlicedByteBuf(ridx: 13, widx: 2066, cap: 2066/2066, unwrapped: PooledUnsafeDirectByteBuf(ridx: 24211, widx: 24211, cap: 32768)) 

data MzY5NDY5NTY5NjY5NzY5ODY5OTcwMDcwMTcwMjcw0av4MTILLTEyMDM1NzAyOTNI0A+IAQAxMjM0NTY3ODkxMDExMTIxMzE0MTUxNjE3MTgxOTIwMjEyMjIzMjQyNTI2MjcyODI5MzAzMTMyMzMzNDM1MzYzNzM4Mzk0MDQxNDI0MzQ0NDU0NjQ3NDg0OTUwNTE1MjUzNTQ1NTU2NTc1ODU5NjA2MTYyNjM2NDY1NjY2NzY4Njk3MDcxNzI3Mzc0NzU3Njc3Nzg3OT
... skip 

the bytebuf object haven't changed, but the data in bytebuf have changed.
it seem like the first few bytes(about 20) will be overwrited and rest of the data is still ok

and it's PooledSlicedByteBuf and PooledUnsafeDirectByteBuf

do you have any idea how to find who change the data inside bytebuf ?

@semistone
Copy link
Author

semistone commented May 17, 2024

I also test again
if publish payload always 20K, it won't happen
only happen when normal is 2K but some data bigger than 16K( sound like netty receive buffer size but I also try to increase it)
and about 1000 qps

I also checked the data again
the wrong data look exactly like been overwrited by tail of previous payload

and it not related to batch mode
it happened when disable batch mode

@lhotari
Copy link
Member

lhotari commented May 17, 2024

do you have any idea how to find who change the data inside bytebuf ?

There's some thought about this in #22110. That's a WIP PR and it might not make sense in the end. Since then, I have found a pattern used in Netty to detect issues. I'll look that up.

@lhotari
Copy link
Member

lhotari commented May 18, 2024

unfortunately I can't preproduce in docker, I guess docker standalone is different from my pulsar cluster.
my pulsar cluster is
almost default config but with TLS auth in broker/bookkeeper/zookeeper.

@semistone do you also use Pulsar Proxy?

@semistone
Copy link
Author

unfortunately I can't preproduce in docker, I guess docker standalone is different from my pulsar cluster.
my pulsar cluster is
almost default config but with TLS auth in broker/bookkeeper/zookeeper.

@semistone do you also use Pulsar Proxy?

@lhotari no, I didn't

@lhotari
Copy link
Member

lhotari commented May 19, 2024

error happen when messageKeyGenerationMode=random if without messageKeyGenerationMode, then error disappear

This is a useful detail. When messageKeyGenerationMode is random and BatcherBuilder.KEY_BASED is used, each batch will be size of 1. This could hint that the problem is related to #16605 changes, main difference here:

if (messages.size() == 1) {
messageMetadata.clear();
messageMetadata.copyFrom(messages.get(0).getMessageBuilder());
ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, getCompressedBatchMetadataAndPayload());
updateAndReserveBatchAllocatedSize(encryptedPayload.capacity());
ByteBufPair cmd = producer.sendMessage(producer.producerId, messageMetadata.getSequenceId(),
1, null, messageMetadata, encryptedPayload);
final OpSendMsg op;
// Shouldn't call create(MessageImpl<?> msg, ByteBufPair cmd, long sequenceId, SendCallback callback),
// otherwise it will bring message out of order problem.
// Because when invoke `ProducerImpl.processOpSendMsg` on flush,
// if `op.msg != null && isBatchMessagingEnabled()` checks true, it will call `batchMessageAndSend` to flush
// messageContainers before publishing this one-batch message.
op = OpSendMsg.create(producer.rpcLatencyHistogram, messages, cmd, messageMetadata.getSequenceId(),
firstCallback, batchAllocatedSizeBytes);
// NumMessagesInBatch and BatchSizeByte will not be serialized to the binary cmd. It's just useful for the
// ProducerStats
op.setNumMessagesInBatch(1);
op.setBatchSizeByte(encryptedPayload.readableBytes());
// handle mgs size check as non-batched in `ProducerImpl.isMessageSizeExceeded`
if (op.getMessageHeaderAndPayloadSize() > getMaxMessageSize()) {
producer.semaphoreRelease(1);
producer.client.getMemoryLimitController().releaseMemory(
messages.get(0).getUncompressedSize() + batchAllocatedSizeBytes);
discard(new PulsarClientException.InvalidMessageException(
"Message size is bigger than " + getMaxMessageSize() + " bytes"));
return null;
}
lowestSequenceId = -1L;
return op;
}
ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, getCompressedBatchMetadataAndPayload());
.

@lhotari
Copy link
Member

lhotari commented May 21, 2024

@semistone would it be possible to share your broker.conf customizations? That could help reproduce the issue. I noticed --batch-index-ack in the pulsar-perf command line. I assume that you have at least acknowledgmentAtBatchIndexLevelEnabled=true in broker.conf?

@semistone
Copy link
Author

semistone commented May 21, 2024

@lhotari

acknowledgmentAtBatchIndexLevelEnabled

Yes, I enabled it
and I disable batch mode(-bd) in producer after I found it seem not related to batch mode

and after debugging, I found
If I add in OpAddEntry.java



        op.data = data.asReadOnly(); << make it read  only

then that issue seem disappear. but not sure is there any side affect or not.
and I don't known who could touch that bytebuf.
It have OpAddEntry.getData and OpAddEntry.setData method, but I don't see anyone touch it.

here is our broker.conf and I remove all of the password
broker.conf.zip

@semistone
Copy link
Author

I also debug in
PulsarDecoder.channelRead
print bytebuf object id and compare with the bytebuf in OpAddEntry

I don't see the same bytebuf object been reused during OpAddEntry.createNoRetainBuffer and OpAddEntry.run

@lhotari
Copy link
Member

lhotari commented May 21, 2024

then that issue seem disappear. but not sure is there any side affect or not.
and I don't known who could touch that bytebuf.
It have OpAddEntry.getData and OpAddEntry.setData method, but I don't see anyone touch it.

Interesting detail. does the problem also go away with op.data = data.duplicate(); ?

@semistone
Copy link
Author

data.duplicate();

I test it and It seems also work

I will repeat success/failure test later to confirm it again.

@lhotari
Copy link
Member

lhotari commented May 21, 2024

I've been trying to reproduce the issue with local microk8s cluster by deploying Pulsar with Apache Pulsar Helm chart using this values file: https://github.com/lhotari/pulsar-playground/blob/master/test-env/issue22601.yaml
and these scripts: https://github.com/lhotari/pulsar-playground/tree/master/issues/issue22601 .
I haven't yet been able to reproduce.

@lhotari
Copy link
Member

lhotari commented May 21, 2024

How many brokers and bookies do you have in the cluster where it reproduces?

@semistone
Copy link
Author

semistone commented May 21, 2024

I have 6 bookkeeper in 3 different data center and I left only one broker running for debug
but I tested only 1 bookkeeper before.

and it run on physical server and CentOS Linux release 7.9.2009

@semistone
Copy link
Author

I've been trying to reproduce the issue with local microk8s cluster by deploying Pulsar with Apache Pulsar Helm chart using this values file: https://github.com/lhotari/pulsar-playground/blob/master/test-env/issue22601.yaml and these scripts: https://github.com/lhotari/pulsar-playground/tree/master/issues/issue22601 . I haven't yet been able to reproduce.

I add big payload options
you need to use
-bp 2 to reproduce it.

@lhotari
Copy link
Member

lhotari commented May 21, 2024

-bp 2 to reproduce it.

I was using -bp 5 before, updated that to -bp 2. lhotari/pulsar-playground@63035e9

What gets logged when the issue reproduces?

@lhotari
Copy link
Member

lhotari commented May 21, 2024

do you happen to run with debug logging level when the issue reproduces?
(just wondering if debug logging code like

if (log.isDebugEnabled()) {
printSendCommandDebug(send, headersAndPayload);
}
has side effects, as it seems to have in
private void printSendCommandDebug(CommandSend send, ByteBuf headersAndPayload) {
headersAndPayload.markReaderIndex();
MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
headersAndPayload.resetReaderIndex();
if (log.isDebugEnabled()) {
log.debug("[{}] Received send message request. producer: {}:{} {}:{} size: {},"
+ " partition key is: {}, ordering key is {}, uncompressedSize is {}",
remoteAddress, send.getProducerId(), send.getSequenceId(), msgMetadata.getProducerName(),
msgMetadata.getSequenceId(), headersAndPayload.readableBytes(),
msgMetadata.hasPartitionKey() ? msgMetadata.getPartitionKey() : null,
msgMetadata.hasOrderingKey() ? msgMetadata.getOrderingKey() : null,
msgMetadata.getUncompressedSize());
}
}
)

@semistone
Copy link
Author

-bp 2 to reproduce it.

I was using -bp 5 before, updated that to -bp 2. lhotari/pulsar-playground@63035e9

What gets logged when the issue reproduces?

2024-05-21T08:31:49,202+0000 [broker-topic-workers-OrderedExecutor-5-0] ERROR org.apache.pulsar.common.protocol.Commands - [persistent://my-tenant/my-namespace/my-topic-1] [angus_test] Failed to peek sticky key from the message metadata
java.lang.IllegalStateException: Some required fields are missing

@semistone
Copy link
Author

semistone commented May 21, 2024

do you happen to run with debug logging level when the issue reproduces? (just wondering if debug logging code like

if (log.isDebugEnabled()) {
printSendCommandDebug(send, headersAndPayload);
}

has side effects, as it seems to have in

private void printSendCommandDebug(CommandSend send, ByteBuf headersAndPayload) {
headersAndPayload.markReaderIndex();
MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
headersAndPayload.resetReaderIndex();
if (log.isDebugEnabled()) {
log.debug("[{}] Received send message request. producer: {}:{} {}:{} size: {},"
+ " partition key is: {}, ordering key is {}, uncompressedSize is {}",
remoteAddress, send.getProducerId(), send.getSequenceId(), msgMetadata.getProducerName(),
msgMetadata.getSequenceId(), headersAndPayload.readableBytes(),
msgMetadata.hasPartitionKey() ? msgMetadata.getPartitionKey() : null,
msgMetadata.hasOrderingKey() ? msgMetadata.getOrderingKey() : null,
msgMetadata.getUncompressedSize());
}
}

)

when I print bytebuf.toString() , ridx seem didn't change

I didn't turn on debug log
and it happen only when it have consumer running and higher QPS (in my server it happen about 1000 QPS)
and -s 2000 (payload 2K)

@lhotari
Copy link
Member

lhotari commented May 21, 2024

and it happen only when it have consumer running and higher QPS (in my server it happen about 1000 QPS)
and -s 2000 (payload 2K)

consumer

2024-05-21T11:59:25,756+0300 [main] INFO  org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 12477226 msg --- 6003.690  msg/s --- 108.533 Mbit/s  --- Latency: mean: 14.833 ms - med: 14 - 95pct: 21 - 99pct: 40 - 99.9pct: 49 - 99.99pct: 53 - Max: 53

producer

2024-05-21T11:59:27,381+0300 [main] INFO  org.apache.pulsar.testclient.PerformanceProducer - Throughput produced: 12439999 msg ---   6007.4 msg/s ---    108.2 Mbit/s  --- failure      0.0 msg/s --- Latency: mean:   9.492 ms - med:   8.606 - 95pct:  12.098 - 99pct:  34.800 - 99.9pct:  42.303 - 99.99pct:  48.646 - Max:  48.649

I'm using the scripts in https://github.com/lhotari/pulsar-playground/tree/master/issues/issue22601 and those have -s 2000 and -bp 2 and it's using the patched pulsar-perf. However, I haven't been able to reproduce yet.

Which JVM version and parameters are you using when you are able to reproduce?

@semistone
Copy link
Author

semistone commented May 21, 2024

java --version
java 17.0.10 2024-01-16 LTS
Java(TM) SE Runtime Environment (build 17.0.10+11-LTS-240)
Java HotSpot(TM) 64-Bit Server VM (build 17.0.10+11-LTS-240, mixed mode, sharing)

since change readonly could fix it
I will debug who touch that object tomorrow.

@lhotari
Copy link
Member

lhotari commented May 21, 2024

which JVM args do you run with? For example heap size etc.

@lhotari
Copy link
Member

lhotari commented May 21, 2024

I will debug who touch that object tomorrow.

It's also possible that nothing touches it, but it's due to a multithreading issue. one useful experiment would be to make the data field volatile in org.apache.bookkeeper.mledger.impl.OpAddEntry class to see if it also fixes the issue.

Another experiment could be to temporarily disable the use of the RECYCLER in the OpAddEntry class. (replacing RECYCLER.get() with new OpAddEntry(null) and then commenting out recyclerHandle.recycle(this); in recycle method).

@lhotari
Copy link
Member

lhotari commented May 21, 2024

I was able to reproduce a few issues with the test setup. However, these might be different issues.

logs are at https://gist.github.com/lhotari/8302131cde5a0f0999e39f8fbd391f09 .

[pulsar-testenv-deployment-broker-2] 2024-05-21T13:17:12,146+0000 [pulsar-io-3-8] ERROR org.apache.pulsar.broker.service.Consumer - [PersistentSubscription{topic=persistent://my-tenant/my-namespace/my-topic-1, name=angus_test}] [4] Received ack for corrupted message at 94:98341 - Reason: ChecksumMismatch
[pulsar-testenv-deployment-broker-2] 2024-05-21T13:17:12,146+0000 [pulsar-io-3-8] ERROR org.apache.pulsar.broker.service.Consumer - [PersistentSubscription{topic=persistent://my-tenant/my-namespace/my-topic-1, name=angus_test}] [8] Received ack for corrupted message at 94:98373 - Reason: ChecksumMismatch

also

[pulsar-testenv-deployment-broker-2] 2024-05-21T13:17:12,862+0000 [pulsar-io-3-1] INFO  org.apache.pulsar.broker.service.persistent.PersistentSubscription - backlog for persistent://my-tenant/
my-namespace/my-topic-1 - 1076
[pulsar-testenv-deployment-broker-2] 2024-05-21T13:17:12,862+0000 [pulsar-io-3-1] INFO  org.apache.pulsar.broker.service.ServerCnx - [/192.168.122.1:42264] Created subscription on topic persis
tent://my-tenant/my-namespace/my-topic-1 / lari_test3
[pulsar-testenv-deployment-broker-2] 2024-05-21T13:17:12,862+0000 [pulsar-io-3-1] INFO  org.apache.pulsar.broker.service.ServerCnx - [[id: 0xaf5ded60, L:/10.1.179.90:6651 - R:/192.168.122.1:42
264]] Subscribing on topic persistent://my-tenant/my-namespace/my-topic-1 / lari_test3. consumerId: 5
[pulsar-testenv-deployment-broker-2] 2024-05-21T13:17:12,862+0000 [pulsar-io-3-1] INFO  org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://my-tenant/my-namespace/my-to
pic-1] Disabled replicated subscriptions controller
[pulsar-testenv-deployment-broker-2] 2024-05-21T13:17:12,862+0000 [pulsar-io-3-1] INFO  org.apache.pulsar.broker.service.persistent.
PersistentSubscription - backlog for persistent://my-tenant/my-namespace/my-topic-1 - 1076
[pulsar-testenv-deployment-broker-2] 2024-05-21T13:17:12,862+0000 [pulsar-io-3-1] INFO  org.apache.pulsar.broker.service.ServerCnx - [/192.168.122.1:42264] Created subscription on topic persistent://my-tenant/my-namespace/my-topic-1 / lari_test3
[pulsar-testenv-deployment-broker-2] 2024-05-21T13:17:13,188+0000 [pulsar-io-3-1] WARN  org.apache.pulsar.broker.service.ServerCnx - [/192.168.122.1:42264] Got exception java.lang.IllegalArgumentException: newPosition > limit: (2097 > 91)
[pulsar-testenv-deployment-broker-2]    at java.base/java.nio.Buffer.createPositionException(Buffer.java:341)
[pulsar-testenv-deployment-broker-2]    at java.base/java.nio.Buffer.position(Buffer.java:316)
[pulsar-testenv-deployment-broker-2]    at java.base/java.nio.ByteBuffer.position(ByteBuffer.java:1516)
[pulsar-testenv-deployment-broker-2]    at java.base/java.nio.HeapByteBuffer.get(HeapByteBuffer.java:185)
[pulsar-testenv-deployment-broker-2]    at io.netty.buffer.UnpooledHeapByteBuf.setBytes(UnpooledHeapByteBuf.java:268)
[pulsar-testenv-deployment-broker-2]    at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1113)
[pulsar-testenv-deployment-broker-2]    at io.netty.buffer.ReadOnlyByteBufferBuf.copy(ReadOnlyByteBufferBuf.java:431)
[pulsar-testenv-deployment-broker-2]    at io.netty.buffer.DuplicatedByteBuf.copy(DuplicatedByteBuf.java:210)
[pulsar-testenv-deployment-broker-2]    at io.netty.buffer.AbstractByteBuf.copy(AbstractByteBuf.java:1194)
[pulsar-testenv-deployment-broker-2]    at org.apache.pulsar.common.protocol.ByteBufPair$CopyingEncoder.write(ByteBufPair.java:149)
[pulsar-testenv-deployment-broker-2]    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:893)
[pulsar-testenv-deployment-broker-2]    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:875)
[pulsar-testenv-deployment-broker-2]    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:984)
[pulsar-testenv-deployment-broker-2]    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:868)
[pulsar-testenv-deployment-broker-2]    at org.apache.pulsar.broker.service.PulsarCommandSenderImpl.lambda$sendMessagesToConsumer$1(PulsarCommandSenderImpl.java:277)
[pulsar-testenv-deployment-broker-2]    at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
[pulsar-testenv-deployment-broker-2]    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
[pulsar-testenv-deployment-broker-2]    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
[pulsar-testenv-deployment-broker-2]    at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:413)
[pulsar-testenv-deployment-broker-2]    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
[pulsar-testenv-deployment-broker-2]    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
[pulsar-testenv-deployment-broker-2]    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
[pulsar-testenv-deployment-broker-2]    at java.base/java.lang.Thread.run(Thread.java:840)

The way I reproduced this was that I used the scripts in https://github.com/lhotari/pulsar-playground/tree/master/issues/issue22601 . I modified consume.sh to take a subscription name as a parameter.
I created multiple subscriptions. ./consume.sh lari_test, ./consume.sh lari_test2 and ./consume.sh lari_test3 and after creating a consumer, I killed it immediately to let some backlog collect since the producer was producing.
After a few simultaneous restarts of the consumers, the problems reproduced immediately.

@lhotari
Copy link
Member

lhotari commented May 21, 2024

also got this type of exception

[pulsar-testenv-deployment-broker-2] 2024-05-21T13:31:48,622+0000 [pulsar-io-3-10] WARN  org.apache.pulsar.broker.service.ServerCnx - [/192.168.122.1:48166] Got exception java.nio.Buf
ferUnderflowException
[pulsar-testenv-deployment-broker-2]    at java.base/java.nio.HeapByteBuffer.get(HeapByteBuffer.java:183)
[pulsar-testenv-deployment-broker-2]    at io.netty.buffer.UnpooledHeapByteBuf.setBytes(UnpooledHeapByteBuf.java:268)
[pulsar-testenv-deployment-broker-2]    at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1113)
[pulsar-testenv-deployment-broker-2]    at io.netty.buffer.ReadOnlyByteBufferBuf.copy(ReadOnlyByteBufferBuf.java:431)
[pulsar-testenv-deployment-broker-2]    at io.netty.buffer.DuplicatedByteBuf.copy(DuplicatedByteBuf.java:210)
[pulsar-testenv-deployment-broker-2]    at io.netty.buffer.AbstractByteBuf.copy(AbstractByteBuf.java:1194)
[pulsar-testenv-deployment-broker-2]    at org.apache.pulsar.common.protocol.ByteBufPair$CopyingEncoder.write(ByteBufPair.java:149)
[pulsar-testenv-deployment-broker-2]    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:893)
[pulsar-testenv-deployment-broker-2]    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:875)
[pulsar-testenv-deployment-broker-2]    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:984)
[pulsar-testenv-deployment-broker-2]    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:868)
[pulsar-testenv-deployment-broker-2]    at org.apache.pulsar.broker.service.PulsarCommandSenderImpl.lambda$sendMessagesToConsumer$1(PulsarCommandSenderImpl.java:277)
[pulsar-testenv-deployment-broker-2]    at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
[pulsar-testenv-deployment-broker-2]    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
[pulsar-testenv-deployment-broker-2]    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
[pulsar-testenv-deployment-broker-2]    at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:413)
[pulsar-testenv-deployment-broker-2]    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
[pulsar-testenv-deployment-broker-2]    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
[pulsar-testenv-deployment-broker-2]    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
[pulsar-testenv-deployment-broker-2]    at java.base/java.lang.Thread.run(Thread.java:840)

@lhotari
Copy link
Member

lhotari commented May 21, 2024

This Got exception java.lang.IllegalArgumentException: newPosition > limit: (2094 > 88) issue also reproduces with Pulsar 3.2.3:

[pulsar-testenv-deployment-broker-0] 2024-05-21T16:11:09,314+0000 [pulsar-io-3-11] WARN  org.apache.pulsar.broker.service.ServerCnx - [/192.168.122.1:51732] Got exception java.lang.Il
legalArgumentException: newPosition > limit: (2094 > 88)
[pulsar-testenv-deployment-broker-0]    at java.base/java.nio.Buffer.createPositionException(Buffer.java:341)
[pulsar-testenv-deployment-broker-0]    at java.base/java.nio.Buffer.position(Buffer.java:316)
[pulsar-testenv-deployment-broker-0]    at java.base/java.nio.ByteBuffer.position(ByteBuffer.java:1516)
[pulsar-testenv-deployment-broker-0]    at java.base/java.nio.HeapByteBuffer.get(HeapByteBuffer.java:185)
[pulsar-testenv-deployment-broker-0]    at io.netty.buffer.UnpooledHeapByteBuf.setBytes(UnpooledHeapByteBuf.java:268)
[pulsar-testenv-deployment-broker-0]    at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1113)
[pulsar-testenv-deployment-broker-0]    at io.netty.buffer.ReadOnlyByteBufferBuf.copy(ReadOnlyByteBufferBuf.java:431)
[pulsar-testenv-deployment-broker-0]    at io.netty.buffer.DuplicatedByteBuf.copy(DuplicatedByteBuf.java:210)
[pulsar-testenv-deployment-broker-0]    at io.netty.buffer.AbstractByteBuf.copy(AbstractByteBuf.java:1194)
[pulsar-testenv-deployment-broker-0]    at org.apache.pulsar.common.protocol.ByteBufPair$CopyingEncoder.write(ByteBufPair.java:149)
[pulsar-testenv-deployment-broker-0]    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:893)
[pulsar-testenv-deployment-broker-0]    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:875)
[pulsar-testenv-deployment-broker-0]    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:984)
[pulsar-testenv-deployment-broker-0]    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:868)
[pulsar-testenv-deployment-broker-0]    at org.apache.pulsar.broker.service.PulsarCommandSenderImpl.lambda$sendMessagesToConsumer$1(PulsarCommandSenderImpl.java:277)
[pulsar-testenv-deployment-broker-0]    at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
[pulsar-testenv-deployment-broker-0]    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
[pulsar-testenv-deployment-broker-0]    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
[pulsar-testenv-deployment-broker-0]    at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:413)
[pulsar-testenv-deployment-broker-0]    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
[pulsar-testenv-deployment-broker-0]    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
[pulsar-testenv-deployment-broker-0]    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
[pulsar-testenv-deployment-broker-0]    at java.base/java.lang.Thread.run(Thread.java:840)

I can reproduce consistently:

  1. run ./consume.sh lari_test, ./consume.sh lari_test2 and ./consume.sh lari_test3 and terminate after subscriptions are created.
  2. start ./produce.sh and observe with ./logs.sh
  3. start ./consume.sh lari_test, ./consume.sh lari_test2 and ./consume.sh lari_test3at about the same time

This might be related to the parseMessageMetadata error, however it can also be another issue.

@semistone
Copy link
Author

my JVM option is

/usr/bin/java -Dlog4j.shutdownHookEnabled=false -cp /opt/pulsar/hybrid/conf:::/opt/pulsar/hybrid/lib/*: -Dlog4j.configurationFile=log4j2.yaml -Djute.maxbuffer=10485760 -Djava.net.preferIPv4Stack=true -Dzookeeper.clientTcpKeepAlive=true -Dio.netty.tryReflectionSetAccessible=true --add-opens java.base/java.io=ALL-UNNAMED --add-opens java.base/java.util.zip=ALL-UNNAMED --add-opens java.base/java.nio=ALL-UNNAMED --add-opens java.base/jdk.internal.misc=ALL-UNNAMED --add-opens java.base/sun.net=ALL-UNNAMED --add-opens java.management/sun.management=ALL-UNNAMED --add-opens jdk.management/com.sun.management.internal=ALL-UNNAMED --add-opens java.base/jdk.internal.platform=ALL-UNNAMED -Xms2g -Xmx2g -XX:MaxDirectMemorySize=4g -XX:+UseZGC -XX:+PerfDisableSharedMem -XX:+AlwaysPreTouch -Xlog:async -Xlog:gc*,safepoint:/opt/pulsar/hybrid/logs/pulsar_gc_%p.log:time,uptime,tags:filecount=10,filesize=20M -Dzookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty -Dzookeeper.client.secure=true -Dzookeeper.ssl.keyStore.location=conf/keystore.jks -Dzookeeper.ssl.keyStore.password=xxxx  -Dzookeeper.ssl.trustStore.location=conf/truststore.jks -Dzookeeper.ssl.trustStore.password=xxx  -DsecureClientPort=2184 -Dpulsar.allocator.exit_on_oom=true -Dio.netty.recycler.maxCapacity.default=1000 -Dio.netty.recycler.linkCapacity=1024 -Dpulsar.log.appender=RollingFile -Dpulsar.log.dir=/opt/pulsar/hybrid/logs -Dpulsar.log.level=info -Dpulsar.log.root.level=info -Dpulsar.log.immediateFlush=false -Dpulsar.routing.appender.default=Console -Dlog4j2.is.webapp=false -Dpulsar.functions.process.container.log.dir=/opt/pulsar/hybrid/logs -Dpulsar.functions.java.instance.jar=/opt/pulsar/hybrid/instances/java-instance.jar -Dpulsar.functions.python.instance.file=/opt/pulsar/hybrid/instances/python-instance/python_instance_main.py -Dpulsar.functions.extra.dependencies.dir=/opt/pulsar/hybrid/instances/deps -Dpulsar.functions.instance.classpath=/opt/pulsar/hybrid/conf:::/opt/pulsar/hybrid/lib/*: -Dpulsar.functions.log.conf=/opt/pulsar/hybrid/conf/functions_log4j2.xml -Dbookkeeper.metadata.bookie.drivers=org.apache.pulsar.metadata.bookkeeper.PulsarMetadataBookieDriver -Dbookkeeper.metadata.client.drivers=org.apache.pulsar.metadata.bookkeeper.PulsarMetadataClientDriver -Dpulsar.log.file=pulsar-broker.log org.apache.pulsar.PulsarBrokerStarter --broker-conf /opt/pulsar/hybrid/conf/broker.conf

my server have
Intel(R) Xeon(R) E-2278G CPU @ 3.40GHz
CPUCORECOUNT: 8
CPUTHREADCOUNT:16

I feel it may related to cpu speed.

I will debug a little deeper to see anything strange.

@lhotari
Copy link
Member

lhotari commented May 22, 2024

I couldn't reproduce with Pulsar Standalone, but I have a way with a local Microk8s cluster where I could also attach a debugger. With break points in java.lang.IllegalArgumentException and java.nio.BufferUnderflowException, I can see the problem.

image

This issue happens when .copy() is called on this line:

There's a feature in Netty that .copy() isn't thread safe. If it's called from multiple threads at a time, there will be a race condition. This happens here in Netty code:

https://github.com/netty/netty/blob/243de91df2e9a9bf0ad938f54f76063c14ba6e3d/buffer/src/main/java/io/netty/buffer/ReadOnlyByteBufferBuf.java#L412-L433

io.netty.buffer.ReadOnlyByteBufferBuf#internalNioBuffer() returns a shared instance which gets corrupted.
One could argue that this is a bug in ReadOnlyByteBufferBuf. At least this this extremely surprising behavior.

.copy() was added in #2401 . It looks like the root cause wasn't
properly fixed and the problem moved to a different location.

In Netty, the SslHandler will access the underlying ByteBuffer instances directly. This leads to a similar multi-threading problem as the use of .copy().

I think that the problem is now clear where it happens, but the solution to fix this isn't yet known.

@lhotari
Copy link
Member

lhotari commented May 22, 2024

Looking up in Netty issue tracker.

Found these issues that provide a lot of context:

It seems that this is a long time issue and it has been partially fixed. However, it's not fixed for many locations in the Netty code base and it's not safe to share ByteBuf instances in all cases. We need to find a workaround for the Pulsar use cases.
In Pulsar, the sharing happens in this case at least via the broker cache (RangeEntryCacheManagerImpl) and the pending reads manager (PendingReadsManager).

@eolivelli
Copy link
Contributor

This is a great finding

@semistone
Copy link
Author

semistone commented May 22, 2024

Thanks for helping check this issue ,
I don't have any progress today :(
I could see there is another io thread is using that unwrap bytebuf

but at least it could reproduce in your side...

and I repeat double check that

op.data = data.asReadOnly();
op.data = data.duplicate();

this two behavior.
I rollback most of my debug code and only change that one.
I still see some error when replace by data.duplicate();
but op.data = data.asReadOnly();
seem quite stable to me in both client and server side.
and I could at least reach 6000 QPS without WARN/ERROR message
before was 1000 QPS to see that error.

Our cluster is for BCP, so the bookkeepers are spread across three data centers, and the network latency is about 10ms.

@lhotari
Copy link
Member

lhotari commented May 22, 2024

@semistone I have created a PR #22760 to fix the problem. It's currently in draft state since I'm currently testing the solution to verify that it mitigates the problem.

@semistone
Copy link
Author

@semistone I have created a PR #22760 to fix the problem. It's currently in draft state since I'm currently testing the solution to verify that it mitigates the problem.

seem change to read only actually change the behavior... :(
please ignore my previous comments.

I will check and test it tomorrow
thanks

@lhotari
Copy link
Member

lhotari commented May 22, 2024

I have tested #22760 with the repro case and scripts that are based on the modified pulsar-perf and other details provided by @semistone. The issue no longer reproduces in my microk8s test setup.

@lhotari
Copy link
Member

lhotari commented May 22, 2024

seem change to read only actually change the behavior... :(
please ignore my previous comments.

.duplicate() or .retainedDuplicate() should be sufficient to replace the use of .copy() in ByteBufPair. I'm asking Netty maintainers for some advice and keeping PR #22760 as draft until there's a reply.
.copy() itself has a bug which I've reported as netty/netty#14068, however I don't currently see a reason why a copy would be needed at all.

@semistone
Copy link
Author

semistone commented May 23, 2024

I tested with that patch..
rollback all libs to version 3.2.2 and only replace pulsar-testclient.jar and pulsar-common.jar
I also tried to run my test in one data center only.
unfortunately it seems still reproducible.
I guess there might be two different issues or something wrong in my test procedure :(
let's wait and see.

I may try to recreate my cluster this week as standalone server to check it again

@lhotari
Copy link
Member

lhotari commented May 23, 2024

I tested with that patch.. rollback all libs to version 3.2.2 and only replace pulsar-testclient.jar and pulsar-common.jar I also tried to run my test in one data center only. unfortunately it seems still reproducible. I guess there might be two different issues or something wrong in my test procedure :( let's wait and see.

I may try to recreate my cluster this week as standalone server to check it again

@semistone It's better to base on 3.2.3 since there are also other issues where which could result in the same symptoms.
I have been testing with 3.2.3 + patches, branch-3.2...lhotari:pulsar:lh-3.2.3-patched-pulsar-perf .

I'm also hitting other issues when running the repro scripts. Key_Shared subscription gets stuck very often.
What I do is that I run ./produce.sh for a few seconds and then starts ./start_consuming.sh which will run 3 different ./consume.sh processes in parallel with different subscriptions. That was the way how I triggered the problems, however I never was able to reproduce the partMessageMetadata error, so it's possible that you are facing yet another issue.

@semistone would you be able to share more details of your repro sequence? I shared complete details of my setup and repro steps in https://github.com/lhotari/pulsar-playground/tree/master/issues/issue22601 so that others could build upon it if they wish. You could decide to share what you wish, but obviously having more details will be helpful.

@semistone
Copy link
Author

I tested with that patch.. rollback all libs to version 3.2.2 and only replace pulsar-testclient.jar and pulsar-common.jar I also tried to run my test in one data center only. unfortunately it seems still reproducible. I guess there might be two different issues or something wrong in my test procedure :( let's wait and see.
I may try to recreate my cluster this week as standalone server to check it again

@semistone It's better to base on 3.2.3 since there are also other issues where which could result in the same symptoms. I have been testing with 3.2.3 + patches, branch-3.2...lhotari:pulsar:lh-3.2.3-patched-pulsar-perf .

I'm also hitting other issues when running the repro scripts. Key_Shared subscription gets stuck very often. What I do is that I run ./produce.sh for a few seconds and then starts ./start_consuming.sh which will run 3 different ./consume.sh processes in parallel with different subscriptions. That was the way how I triggered the problems, however I never was able to reproduce the partMessageMetadata error, so it's possible that you are facing yet another issue.

@semistone would you be able to share more details of your repro sequence? I shared complete details of my setup and repro steps in https://github.com/lhotari/pulsar-playground/tree/master/issues/issue22601 so that others could build upon it if they wish. You could decide to share what you wish, but obviously having more details will be helpful.

I will do it later this week.

@lhotari
Copy link
Member

lhotari commented May 23, 2024

It's likely that this is a Bookkeeper client issue. The PR #22760 might be fixing a different issue.
There's a fix apache/bookkeeper#4289 which will be included in 4.16.6 . Pulsar 3.0.5 and Pulsar 3.2.3 are on Bookkeeper 4.16.5 and this fix isn't included.
There's also a pending PR apache/bookkeeper#4293 .

When using TLS between Brokers and Bookies, Bookkeeper will use V3 bookkeeper protocol. This implementation is different from the V2 protocol which is used by default in Pulsar. That is one additional reason why certain bugs appear only when TLS is enabled.

@lhotari
Copy link
Member

lhotari commented May 23, 2024

I finally found the root cause in Bookkeeper client. The client had several buffer leaks since the ByteBufList lifecycle was incorrectly handled and write promises were dropped and ignored when authentication was in progress. The fix is in apache/bookkeeper#4293.

@semistone
Copy link
Author

semistone commented May 25, 2024

I write repproduce steps and some investigate history in
https://github.com/semistone/personal_notes/blob/main/pulsar_issue_22601/Test.md

I tested from install step and confirmed it only happen when bookkeeper TLS enable

I will test that patch next week
Thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants