Skip to content

Commit

Permalink
1.1.2 [IMPROVED] Defines the stream only if not already defined
Browse files Browse the repository at this point in the history
Also adds setting the stream's replica count
  • Loading branch information
jnmoyne committed Sep 15, 2023
1 parent 0593dcc commit a986fe9
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 5 deletions.
2 changes: 1 addition & 1 deletion load_balanced/build.sbt
@@ -1,6 +1,6 @@

name := "nats-spark-connector"
version := "1.1.1"
version := "1.1.2"
scalaVersion := "2.12.14"

val sparkVersion = "3.3.0"
Expand Down
24 changes: 22 additions & 2 deletions load_balanced/src/main/scala/natsconnector/NatsConfig.scala
Expand Up @@ -68,7 +68,7 @@ class NatsConfig(isSource: Boolean) {
// If zero messages then subsciber will wait messageReceiveWaitTime before giving up.

// ============== JetStream stream Config Values
// TODO: add replication configuration
var replicationCount = 1 // configurable
var streamName: Option[String] = None // configurable
var storageType: StorageType = StorageType.File // configurable
var streamSubjects: Option[String] = None // configurable
Expand Down Expand Up @@ -194,6 +194,14 @@ class NatsConfig(isSource: Boolean) {
case e: NoSuchElementException =>
}

try {
val numReplicas = parameters("nats.storage.replicas").toInt
// num of replicas is set to '1' by default
this.replicationCount = numReplicas
} catch {
case e: NoSuchElementException =>
}

this.nc = {
this.server = Some(s"nats://${this.host}:${this.port}")
this.options = Some(createConnectionOptions(this.server.get, this.allowReconnect))
Expand Down Expand Up @@ -256,6 +264,7 @@ class NatsConfig(isSource: Boolean) {
.builder()
.name(this.streamName.get)
.storageType(this.storageType)
.replicas(this.replicationCount)
// .subjects(subjects.asJava)
.subjects(this.streamSubjects.get.replace(" ", "").split(",").toList.asJava)
.retentionPolicy(this.retentionPolicy)
Expand All @@ -265,8 +274,19 @@ class NatsConfig(isSource: Boolean) {
val logger:Logger = NatsLogger.logger
logger.info(sc.toJson())
}

// Add or use an existing stream.
val si: StreamInfo = jsm.addStream(sc)
val si: StreamInfo = try {
jsm.getStreamInfo(this.streamName.get)
// if the stream is already defined, don't try to define it
} catch {
case e: JetStreamApiException =>
if (e.getApiErrorCode == 10059) {
// stream not found, define it
jsm.addStream(sc)
} else throw e
}

if(this.isLocal) {
val logger:Logger = NatsLogger.logger
logger.info(si.getConfiguration())
Expand Down
3 changes: 2 additions & 1 deletion load_balanced/src/test/scala/natstest/NatsTestDriver.scala
Expand Up @@ -19,7 +19,8 @@ object NatsTestDriver extends App {
"nats.host" -> "localhost",
"nats.port" -> "4222",
"nats.msg.ack.wait.secs" -> "10",
"nats.durable.name" -> "Durable"
"nats.durable.name" -> "Durable",
"nats.storage.replicas" -> "1"
)
NatsConfigSource.config.setConnection(parameters)
NatsConfigSink.config.streamName = Some("TestStream")
Expand Down
Binary file not shown.
2 changes: 1 addition & 1 deletion partitioned/build.sbt
@@ -1,6 +1,6 @@

name := "nats-spark-connector"
version := "1.1.1"
version := "1.1.2"
scalaVersion := "2.12.14"

val sparkVersion = "3.3.0"
Expand Down
Binary file not shown.

0 comments on commit a986fe9

Please sign in to comment.