Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka: adding best effort and consistent mode. #843

Merged
merged 2 commits into from
May 17, 2024

Conversation

sravotto
Copy link
Contributor

@sravotto sravotto commented May 2, 2024

This change adds transactional support to the Kafka connector for changefeeds.

This change uses the conveyor package to deliver mutation to the target database in any of the supported modes of operations.

The conveyor tracks resolved timestamps across all the partitions with a topic, and ensures that the checkpoint used by cdc-sink is advanced based on the minimal resolved timestamp received on all the partitions.

A second commit, refactors the configuration for better readability.


This change is Reviewable

@codecov-commenter
Copy link

codecov-commenter commented May 2, 2024

Codecov Report

Attention: Patch coverage is 76.34409% with 22 lines in your changes are missing coverage. Please review.

Project coverage is 75.78%. Comparing base (4dcfcdd) to head (c6c4efb).

Files Patch % Lines
internal/source/kafka/consumer.go 68.62% 10 Missing and 6 partials ⚠️
internal/source/kafka/conn.go 57.14% 3 Missing ⚠️
internal/source/kafka/config.go 91.30% 1 Missing and 1 partial ⚠️
internal/source/kafka/provider.go 90.90% 0 Missing and 1 partial ⚠️

❗ Your organization needs to install the Codecov GitHub app to enable full functionality.

Additional details and impacted files
@@            Coverage Diff             @@
##           master     #843      +/-   ##
==========================================
- Coverage   75.91%   75.78%   -0.13%     
==========================================
  Files         226      226              
  Lines       10938    10962      +24     
==========================================
+ Hits         8304     8308       +4     
- Misses       1902     1917      +15     
- Partials      732      737       +5     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@sravotto sravotto changed the title kafka: adding best effort and consistent mode. [DRAFT] kafka: adding best effort and consistent mode. May 2, 2024
@sravotto sravotto force-pushed the sr8_kafka_transactional branch 2 times, most recently from c6c4efb to de7bf09 Compare May 9, 2024 14:58
Copy link
Member

@bobvawter bobvawter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 1 of 6 files at r3, 13 of 13 files at r6, all commit messages.
Reviewable status: all files reviewed, 6 unresolved discussions (waiting on @BramGruneir and @sravotto)


internal/source/kafka/consumer.go line 47 at r6 (raw file):

	schema    ident.Schema      // The target schema.
	timeRange hlc.Range         // The time range for incoming mutations.
	fromState []*partitionState // The initial offsets for each partitions.

Sort order.


internal/source/kafka/consumer.go line 69 at r6 (raw file):

		session.MarkOffset(marker.topic, marker.partition, marker.offset, "start")
	}
	c.mu.done = make(map[string]bool)

Not knowing how Setup is called, and from which goroutine(s), you should go ahead and lock the mutex to avoid any surprises later on.


internal/source/kafka/consumer.go line 110 at r6 (raw file):

			payload, err := c.accumulate(toProcess, message)
			if err != nil {
				log.Error(err)

Use log.WithError(err).Error("message") Otherwise, the error object isn't going to be presented to the logging hooks as an error object.


internal/source/kafka/injector.go line 27 at r6 (raw file):

	"github.com/cockroachdb/replicator/internal/conveyor"
	scriptRuntime "github.com/cockroachdb/replicator/internal/script"
	"github.com/cockroachdb/replicator/internal/sequencer/retire"

Did you run crlfmt? I think you'd get a code-quality warning if the imports aren't sorted.

go run github.com/cockroachdb/crlfmt -w -ignore _gen.go .


internal/source/kafka/injector.go line 53 at r6 (raw file):

		tgt.Set,
		retire.Set,
		conveyor.Set,

Alphabetize.


internal/source/kafka/provider.go line 30 at r6 (raw file):

	ProvideConn,
	ProvideEagerConfig,
	ProvideConveyorConfig,

Sort.

@sravotto sravotto force-pushed the sr8_kafka_transactional branch 2 times, most recently from aadec92 to c569c3a Compare May 15, 2024 02:14
Copy link
Contributor Author

@sravotto sravotto left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 10 of 13 files reviewed, 6 unresolved discussions (waiting on @bobvawter and @BramGruneir)


internal/source/kafka/consumer.go line 47 at r6 (raw file):

Previously, bobvawter (Bob Vawter) wrote…

Sort order.

Done.


internal/source/kafka/consumer.go line 69 at r6 (raw file):

Previously, bobvawter (Bob Vawter) wrote…

Not knowing how Setup is called, and from which goroutine(s), you should go ahead and lock the mutex to avoid any surprises later on.

Done.


internal/source/kafka/consumer.go line 110 at r6 (raw file):

Previously, bobvawter (Bob Vawter) wrote…

Use log.WithError(err).Error("message") Otherwise, the error object isn't going to be presented to the logging hooks as an error object.

Done.


internal/source/kafka/injector.go line 27 at r6 (raw file):

Previously, bobvawter (Bob Vawter) wrote…

Did you run crlfmt? I think you'd get a code-quality warning if the imports aren't sorted.

go run github.com/cockroachdb/crlfmt -w -ignore _gen.go .

it seems to me that they are properly sorted.


internal/source/kafka/injector.go line 53 at r6 (raw file):

Previously, bobvawter (Bob Vawter) wrote…

Alphabetize.

Done.


internal/source/kafka/provider.go line 30 at r6 (raw file):

Previously, bobvawter (Bob Vawter) wrote…

Sort.

Done.

@sravotto sravotto changed the title [DRAFT] kafka: adding best effort and consistent mode. kafka: adding best effort and consistent mode. May 15, 2024
@sravotto sravotto marked this pull request as ready for review May 15, 2024 14:39
Copy link
Member

@bobvawter bobvawter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm:

Reviewed 3 of 3 files at r7, all commit messages.
Reviewable status: all files reviewed, 1 unresolved discussion (waiting on @BramGruneir and @sravotto)


internal/source/kafka/conn_test.go line 40 at r7 (raw file):

		done       bool
		timestamps ident.Map[hlc.Time]
		ensure     ident.Map[bool]

Sort.

This change adds transactional support to the Kafka connector for changefeeds.

This change uses the conveyor package to deliver mutation to the target database
in any of the supported modes of operations.

The conveyor tracks resolved timestamps across all the partitions with a topic,
and ensures that the checkpoint used by cdc-sink is advanced  based on the minimal
resolved timestamp received on all the partitions.
This change refactors the Kafka configuration, moving all the parameters required
by SASL into a substructure.
Copy link
Contributor Author

@sravotto sravotto left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

Reviewable status: 12 of 13 files reviewed, 1 unresolved discussion (waiting on @bobvawter and @BramGruneir)


internal/source/kafka/conn_test.go line 40 at r7 (raw file):

Previously, bobvawter (Bob Vawter) wrote…

Sort.

Done.

@sravotto sravotto added this pull request to the merge queue May 17, 2024
Merged via the queue into master with commit 071d3fe May 17, 2024
48 of 49 checks passed
@sravotto sravotto deleted the sr8_kafka_transactional branch May 17, 2024 01:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants