Skip to content

Commit

Permalink
Merge #123228
Browse files Browse the repository at this point in the history
123228: changefeedccl: make mock pulsar sink synchronous r=asg0451 a=rharding6373

Although the comments in mock pulsar sink say that emitting messages and flushing are synchronous, the use of a buffered channel to transmit messages from the mock producer to the test feed makes it asynchronous. This leads to problems where the consumer (the test) may not have stored the data in a "durable" way before the changefeed job completes. As a result, tests can be flaky as it does not always look like all rows have been emitted.

This PR makes the mock pulsar sink synchronous by using an unbuffered channel.

Fixes: #119289
Informs: #118899

Epic: CRDB-9180

Release note: None

Co-authored-by: rharding6373 <harding@cockroachlabs.com>
  • Loading branch information
craig[bot] and rharding6373 committed May 6, 2024
2 parents 367b681 + 97dcb3d commit e6ade03
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 3 deletions.
14 changes: 12 additions & 2 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5941,6 +5941,9 @@ func TestChangefeedContinuousTelemetry(t *testing.T) {
for i := 0; i < 5; i++ {
beforeCreate := timeutil.Now()
sqlDB.Exec(t, fmt.Sprintf(`INSERT INTO foo VALUES (%d) RETURNING cluster_logical_timestamp()`, i))
// Read the event from the sink.
_, err := foo.Next()
require.NoError(t, err)
verifyLogsWithEmittedBytesAndMessages(t, jobID, beforeCreate.UnixNano(), interval.Nanoseconds(), false)
}
}
Expand Down Expand Up @@ -6007,12 +6010,20 @@ func TestChangefeedContinuousTelemetryDifferentJobs(t *testing.T) {
beforeInsert := timeutil.Now()
sqlDB.Exec(t, `INSERT INTO foo VALUES (1)`)
sqlDB.Exec(t, `INSERT INTO foo2 VALUES (1)`)
// Read the events from the sinks.
_, err := foo.Next()
require.NoError(t, err)
_, err = foo2.Next()
require.NoError(t, err)
verifyLogsWithEmittedBytesAndMessages(t, job1, beforeInsert.UnixNano(), interval.Nanoseconds(), false)
verifyLogsWithEmittedBytesAndMessages(t, job2, beforeInsert.UnixNano(), interval.Nanoseconds(), false)
require.NoError(t, foo.Close())

beforeInsert = timeutil.Now()
sqlDB.Exec(t, `INSERT INTO foo2 VALUES (2)`)
// Read the events from the sinks.
_, err = foo2.Next()
require.NoError(t, err)
verifyLogsWithEmittedBytesAndMessages(t, job2, beforeInsert.UnixNano(), interval.Nanoseconds(), false)
require.NoError(t, foo2.Close())
}
Expand Down Expand Up @@ -7539,8 +7550,7 @@ func TestChangefeedOnlyInitialScanCSV(t *testing.T) {
}
}

// TODO(#119289): re-enable pulsar
cdcTest(t, testFn, feedTestEnterpriseSinks, feedTestOmitSinks("pulsar"))
cdcTest(t, testFn, feedTestEnterpriseSinks)
}

func TestChangefeedOnlyInitialScanCSVSinkless(t *testing.T) {
Expand Down
5 changes: 4 additions & 1 deletion pkg/ccl/changefeedccl/testfeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2584,7 +2584,8 @@ type mockPulsarServer struct {

func makeMockPulsarServer() *mockPulsarServer {
return &mockPulsarServer{
msgCh: make(chan *pulsar.ProducerMessage, 2048),
// TODO(#118899): Make msgCh a buffered channel for async message sending.
msgCh: make(chan *pulsar.ProducerMessage),
}
}

Expand Down Expand Up @@ -2638,12 +2639,14 @@ func (p *mockPulsarProducer) SendAsync(
m *pulsar.ProducerMessage,
f func(pulsar.MessageID, *pulsar.ProducerMessage, error),
) {
log.Infof(ctx, "SendAsync start %d", len(p.pulsarServer.msgCh))
select {
case <-ctx.Done():
f(nil, m, ctx.Err())
case p.pulsarServer.msgCh <- m:
f(nil, m, nil)
}
log.Infof(ctx, "SendAsync end")
}

func (p *mockPulsarProducer) LastSequenceID() int64 {
Expand Down

0 comments on commit e6ade03

Please sign in to comment.