Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafkamdm: make kafka version configurable #446

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions cfg/cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ type Route struct {
Topic string // also used by Google PubSub
Codec string // also used by Google PubSub
PartitionBy string
KafkaVersion string
TLSEnabled bool
TLSSkipVerify bool
TLSClientCert string
Expand Down
2 changes: 2 additions & 0 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ topic | Y | string | N/A |
codec | Y | string | N/A | which compression to use. possible values: none, gzip, snappy
partitionBy | Y | string | N/A | which fields to shard by. possible values are: byOrg, bySeries, bySeriesWithTags
schemasFile | Y | string | N/A |
kafkaVersion | N | string | "" | Kafka version in semver format. All brokers must be this version or newer. (fallback to oldest stable version 1.0.0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

go.mod pins sarama to v1.23.0
That version's ParseKafkaVersion function returns V0_8_2_0 if no version could be parsed

prefix | N | string | "" |
notPrefix | N | string | "" |
sub | N | string | "" |
Expand Down Expand Up @@ -248,6 +249,7 @@ brokers = ['kafka:9092']
topic = 'mdm'
codec = 'snappy'
partitionBy = 'bySeriesWithTags'
kafkaVersion = '2.0.0'
schemasFile = 'conf/storage-schemas.conf'
tlsEnabled = true
tlsSkipVerify = false
Expand Down
11 changes: 10 additions & 1 deletion imperatives/imperatives.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ const (
optSASLEnabled
optSASLUsername
optSASLPassword
optKafkaVersion
optUnspoolSleep
optPickle
optSpool
Expand Down Expand Up @@ -131,6 +132,7 @@ var tokens = []toki.Def{
{Token: optSASLEnabled, Pattern: "saslEnabled="},
{Token: optSASLUsername, Pattern: "saslUsername="},
{Token: optSASLPassword, Pattern: "saslPassword="},
{Token: optKafkaVersion, Pattern: "kafkaVersion="},
{Token: optUnspoolSleep, Pattern: "unspoolsleep="},
{Token: optPickle, Pattern: "pickle="},
{Token: optSpool, Pattern: "spool="},
Expand Down Expand Up @@ -696,6 +698,7 @@ func readAddRouteKafkaMdm(s *toki.Scanner, table Table) error {
var blocking = false
var tlsEnabled, tlsSkipVerify bool
var tlsClientCert, tlsClientKey string
var kafkaVersion string
var saslEnabled bool
var saslUsername, saslPassword string

Expand Down Expand Up @@ -806,12 +809,18 @@ func readAddRouteKafkaMdm(s *toki.Scanner, table Table) error {
return errFmtAddRouteKafkaMdm
}
saslPassword = string(t.Value)
case optKafkaVersion:
t = s.Next()
if t.Token != word {
return errFmtAddRouteKafkaMdm
}
kafkaVersion = string(t.Value)
default:
return fmt.Errorf("unexpected token %d %q", t.Token, t.Value)
}
}

route, err := route.NewKafkaMdm(key, matcher, topic, codec, schemasFile, partitionBy, brokers, bufSize, orgId, flushMaxNum, flushMaxWait, timeout, blocking, tlsEnabled, tlsSkipVerify, tlsClientCert, tlsClientKey, saslEnabled, saslUsername, saslPassword)
route, err := route.NewKafkaMdm(key, matcher, topic, codec, schemasFile, partitionBy, kafkaVersion, brokers, bufSize, orgId, flushMaxNum, flushMaxWait, timeout, blocking, tlsEnabled, tlsSkipVerify, tlsClientCert, tlsClientKey, saslEnabled, saslUsername, saslPassword)
if err != nil {
return err
}
Expand Down
23 changes: 15 additions & 8 deletions route/kafkamdm.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ type KafkaMdm struct {
schemas persister.WhisperSchemas
blocking bool
dispatch func(chan []byte, []byte, metrics.Gauge, metrics.Counter)

orgId int // organisation to publish data under
kafkaVersion sarama.KafkaVersion
orgId int // organisation to publish data under

bufSize int // amount of messages we can buffer up. each message is about 100B. so 1e7 is about 1GB.
flushMaxNum int
Expand All @@ -52,7 +52,7 @@ type KafkaMdm struct {

// NewKafkaMdm creates a special route that writes to a grafana.net datastore
// We will automatically run the route and the destination
func NewKafkaMdm(key string, matcher matcher.Matcher, topic, codec, schemasFile, partitionBy string, brokers []string, bufSize, orgId, flushMaxNum, flushMaxWait, timeout int, blocking bool, tlsEnabled, tlsSkipVerify bool, tlsClientCert, tlsClientKey string, saslEnabled bool, saslUsername, saslPassword string) (Route, error) {
func NewKafkaMdm(key string, matcher matcher.Matcher, topic, codec, schemasFile, partitionBy, kafkaVersion string, brokers []string, bufSize, orgId, flushMaxNum, flushMaxWait, timeout int, blocking bool, tlsEnabled, tlsSkipVerify bool, tlsClientCert, tlsClientKey string, saslEnabled bool, saslUsername, saslPassword string) (Route, error) {
schemas, err := getSchemas(schemasFile)
if err != nil {
return nil, err
Expand All @@ -69,10 +69,9 @@ func NewKafkaMdm(key string, matcher matcher.Matcher, topic, codec, schemasFile,
blocking: blocking,
orgId: orgId,

bufSize: bufSize,
flushMaxNum: flushMaxNum,
flushMaxWait: time.Duration(flushMaxWait) * time.Millisecond,

bufSize: bufSize,
flushMaxNum: flushMaxNum,
flushMaxWait: time.Duration(flushMaxWait) * time.Millisecond,
numErrFlush: stats.Counter("dest=" + cleanAddr + ".unit=Err.type=flush"),
numOut: stats.Counter("dest=" + cleanAddr + ".unit=Metric.direction=out"),
durationTickFlush: stats.Timer("dest=" + cleanAddr + ".what=durationFlush.type=ticker"),
Expand All @@ -96,11 +95,19 @@ func NewKafkaMdm(key string, matcher matcher.Matcher, topic, codec, schemasFile,
log.Fatalf("kafkaMdm %q: failed to initialize partitioner. %s", r.key, err)
}

kfkVersion, err := sarama.ParseKafkaVersion(kafkaVersion)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

defaulting to an empty string and showing an error if the user doesn't specify a version, seems messy.
Why don't we default to sarama.MinVersion.String() ?

if err != nil {
log.Warnf("kafkaMdm %q: failed to parse kafka version fallback to default version. %s", r.key, err)
//ParseKafkaVersion() will return an error and sarama.DefaultVersion
//this means that we have the same behavior that we had before this was configurable.
}
r.kafkaVersion = kfkVersion

// We are looking for strong consistency semantics.
// Because we don't change the flush settings, sarama will try to produce messages
// as fast as possible to keep latency low.
config := sarama.NewConfig()

config.Version = r.kafkaVersion
if tlsEnabled {
tlsConfig, err := tls.NewConfig(tlsClientCert, tlsClientKey)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion table/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -845,7 +845,7 @@ func (table *Table) InitRoutes(config cfg.Config, meta toml.MetaData) error {
orgId = routeConfig.OrgId
}

route, err := route.NewKafkaMdm(routeConfig.Key, matcher, routeConfig.Topic, routeConfig.Codec, routeConfig.SchemasFile, routeConfig.PartitionBy, routeConfig.Brokers, bufSize, orgId, flushMaxNum, flushMaxWait, timeout, routeConfig.Blocking, routeConfig.TLSEnabled, routeConfig.TLSSkipVerify, routeConfig.TLSClientCert, routeConfig.TLSClientKey, routeConfig.SASLEnabled, routeConfig.SASLUsername, routeConfig.SASLPassword)
route, err := route.NewKafkaMdm(routeConfig.Key, matcher, routeConfig.Topic, routeConfig.Codec, routeConfig.SchemasFile, routeConfig.PartitionBy, routeConfig.KafkaVersion, routeConfig.Brokers, bufSize, orgId, flushMaxNum, flushMaxWait, timeout, routeConfig.Blocking, routeConfig.TLSEnabled, routeConfig.TLSSkipVerify, routeConfig.TLSClientCert, routeConfig.TLSClientKey, routeConfig.SASLEnabled, routeConfig.SASLUsername, routeConfig.SASLPassword)
if err != nil {
log.Error(err.Error())
return fmt.Errorf("error adding route '%s'", routeConfig.Key)
Expand Down