Skip to content

Commit

Permalink
Added dynamic client id with namespace + shard (#141)
Browse files Browse the repository at this point in the history
  • Loading branch information
Helena Edelson authored and Helena Edelson committed Feb 20, 2018
1 parent 9d454fd commit 0ed3a31
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 4 deletions.
14 changes: 12 additions & 2 deletions kafka/src/main/scala/filodb/kafka/KafkaSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class SourceConfig(conf: Config, shard: Int)
extends KafkaSettings(conf, ConsumerConfig.configNames.asScala.toSet) {

import ConsumerConfig._
import KafkaSettings.{ConsumerFilters => filterNot}

require(resolved.hasPath("filo-record-converter"),
"'record-converter' must not be empty. Configure a custom converter.")
Expand All @@ -34,15 +35,19 @@ class SourceConfig(conf: Config, shard: Int)
val id = resolved.as[Option[String]](GROUP_ID_CONFIG).getOrElse("filodb.consumer")
s"$id$shard"
}
val ClientId = {
val id = resolved.as[Option[String]](CLIENT_ID_CONFIG).getOrElse("filodb.client")
s"$id$shard"
}

override def config: Map[String, AnyRef] = {
val defaults = Map(
GROUP_ID_CONFIG -> GroupId,
CLIENT_ID_CONFIG -> ClientId,
AUTO_OFFSET_RESET_CONFIG -> AutoOffsetReset,
ENABLE_AUTO_COMMIT_CONFIG -> EnableAutoCommit.toString)

defaults ++ super.config.filterNot { case (k, _) =>
k == GROUP_ID_CONFIG || k == AUTO_OFFSET_RESET_CONFIG || k == ENABLE_AUTO_COMMIT_CONFIG }
defaults ++ super.config.filterNot { case (k, _) => filterNot contains k }
}
}

Expand Down Expand Up @@ -145,5 +150,10 @@ object KafkaSettings {
val StatusTimeout = moduleConfig.as[FiniteDuration]("tasks.status-timeout")
val GracefulStopTimeout = moduleConfig.as[FiniteDuration]("tasks.lifecycle.shutdown-timeout")

val ConsumerFilters = {
import ConsumerConfig._
Set(GROUP_ID_CONFIG, CLIENT_ID_CONFIG, AUTO_OFFSET_RESET_CONFIG, ENABLE_AUTO_COMMIT_CONFIG)
}

}

1 change: 1 addition & 0 deletions kafka/src/test/resources/sourceconfig.conf
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ sourceconfig {

bootstrap.servers = "localhost:9092"
group.id = "org.example.cluster1.consumer"
client.id = "org.example.cluster1.client"
key.serializer = "org.apache.kafka.common.serialization.LongSerializer"
value.serializer = "org.example.CustomSerializer"
key.deserializer = "org.apache.kafka.common.serialization.LongDeserializer"
Expand Down
11 changes: 9 additions & 2 deletions kafka/src/test/scala/filodb/kafka/KafkaConsumerConfigSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,15 @@ class KafkaConsumerConfigSpec extends KafkaSpec {
source.EnableAutoCommit shouldEqual false
source.AutoOffsetReset shouldEqual "latest"
source.GroupId shouldEqual "filodb.consumer0"
source.ClientId shouldEqual "filodb.client0"
source.asConfig.getString(AUTO_OFFSET_RESET_CONFIG) shouldEqual "latest"
source.asConfig.getBoolean(ENABLE_AUTO_COMMIT_CONFIG) shouldEqual false
source.asConfig.getString(GROUP_ID_CONFIG) shouldEqual "filodb.consumer0"
source.asConfig.getString(CLIENT_ID_CONFIG) shouldEqual "filodb.client0"
source.asProps.getProperty(AUTO_OFFSET_RESET_CONFIG) shouldEqual "latest"
source.asProps.getProperty(ENABLE_AUTO_COMMIT_CONFIG) shouldEqual "false"
source.asProps.getProperty(GROUP_ID_CONFIG) shouldEqual "filodb.consumer0"
source.asProps.getProperty(CLIENT_ID_CONFIG) shouldEqual "filodb.client0"
}

"have the expected Config" in {
Expand All @@ -67,7 +72,8 @@ class KafkaConsumerConfigSpec extends KafkaSpec {
props.getProperty("my.custom.client.namespace") shouldEqual "custom.value"
props.getProperty(AUTO_OFFSET_RESET_CONFIG) shouldEqual "latest"
props.getProperty(ENABLE_AUTO_COMMIT_CONFIG) shouldEqual "false"
props.getProperty(GROUP_ID_CONFIG).startsWith("org.example.cluster1.consumer1") shouldEqual true
props.getProperty(GROUP_ID_CONFIG) shouldEqual "org.example.cluster1.consumer1"
props.getProperty(CLIENT_ID_CONFIG) shouldEqual "org.example.cluster1.client1"
props.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG) shouldEqual "localhost:9092"
props.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG) shouldEqual classOf[LongDeserializer].getName
props.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG) shouldEqual classOf[CustomDeserializer].getName
Expand All @@ -79,7 +85,8 @@ class KafkaConsumerConfigSpec extends KafkaSpec {
consumerCfg.autoOffsetReset.toString.toLowerCase shouldEqual "latest"
consumerCfg.enableAutoCommit shouldEqual false
consumerCfg.bootstrapServers should be(List("localhost:9092"))
consumerCfg.groupId.contains("org.example.cluster1.consumer1") shouldEqual true
consumerCfg.groupId shouldEqual "org.example.cluster1.consumer1"
consumerCfg.clientId shouldEqual "org.example.cluster1.client1"
}
}
}

0 comments on commit 0ed3a31

Please sign in to comment.