Skip to content

Commit

Permalink
Merge pull request #8 from nats-io/1.1.1
Browse files Browse the repository at this point in the history
1.1.1 [IMPROVED] Can use both NATS creds and client cert at the same time
  • Loading branch information
jnmoyne committed Sep 13, 2023
2 parents 044e54c + 338f73a commit 6884e30
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 21 deletions.
2 changes: 1 addition & 1 deletion load_balanced/build.sbt
@@ -1,6 +1,6 @@

name := "nats-spark-connector"
version := "1.1.0"
version := "1.1.1"
scalaVersion := "2.12.14"

val sparkVersion = "3.3.0"
Expand Down
19 changes: 8 additions & 11 deletions load_balanced/src/main/scala/natsconnector/NatsConfig.scala
Expand Up @@ -70,7 +70,7 @@ class NatsConfig(isSource: Boolean) {
// ============== JetStream stream Config Values
// TODO: add replication configuration
var streamName: Option[String] = None // configurable
var storageType: StorageType = StorageType.Memory // configurable
var storageType: StorageType = StorageType.File // configurable
var streamSubjects: Option[String] = None // configurable
var durable: Option[String] = None // configurable
val ackPolicy: AckPolicy =
Expand Down Expand Up @@ -187,9 +187,9 @@ class NatsConfig(isSource: Boolean) {

try {
val typeOfStorage: String = parameters("nats.storage.type")
// storage type default is set to 'memory'
if (typeOfStorage.trim().toLowerCase.equals("file"))
this.storageType = StorageType.File
// storage type default is set to 'file'
if (typeOfStorage.trim().toLowerCase.equals("memory"))
this.storageType = StorageType.Memory
} catch {
case e: NoSuchElementException =>
}
Expand Down Expand Up @@ -356,19 +356,16 @@ class NatsConfig(isSource: Boolean) {
builder = builder.maxReconnects(-1)
}

if (
System.getenv("NATS_NKEY") != null && System.getenv("NATS_NKEY") != ""
) {
if (System.getenv("NATS_NKEY") != null && System.getenv("NATS_NKEY") != "") {
val handler: AuthHandler = new SampleAuthHandler(
System.getenv("NATS_NKEY")
)
builder.authHandler(handler)
} else if (
System.getenv("NATS_CREDS") != null && System.getenv("NATS_CREDS") != ""
) {
} else if (System.getenv("NATS_CREDS") != null && System.getenv("NATS_CREDS") != "") {
builder.authHandler(Nats.credentials(System.getenv("NATS_CREDS")));
} else if (System.getenv("NATS_TLS_KEY_STORE") != null && System.getenv("NATS_TLS_KEY_STORE") != "" && System.getenv("NATS_TLS_TRUST_STORE") != null && System.getenv("NATS_TLS_TRUST_STORE") != "") {
}

if (System.getenv("NATS_TLS_KEY_STORE") != null && System.getenv("NATS_TLS_KEY_STORE") != "" && System.getenv("NATS_TLS_TRUST_STORE") != null && System.getenv("NATS_TLS_TRUST_STORE") != "") {
val tlsAlgo = if (System.getenv("NATS_TLS_ALGO") != null && System.getenv("NATS_TLS_ALGO") != "") {
System.getenv("NATS_TLS_ALGO")
} else "SunX509"
Expand Down
Binary file not shown.
2 changes: 1 addition & 1 deletion partitioned/build.sbt
@@ -1,6 +1,6 @@

name := "nats-spark-connector"
version := "1.1.0"
version := "1.1.1"
scalaVersion := "2.12.14"

val sparkVersion = "3.3.0"
Expand Down
13 changes: 5 additions & 8 deletions partitioned/src/main/scala/natsconnector/NatsConfig.scala
Expand Up @@ -211,18 +211,16 @@ class NatsConfig(isSource:Boolean) {
builder = builder.maxReconnects(-1)
}

if (
System.getenv("NATS_NKEY") != null && System.getenv("NATS_NKEY") != ""
) {
if (System.getenv("NATS_NKEY") != null && System.getenv("NATS_NKEY") != "") {
val handler: AuthHandler = new SampleAuthHandler(
System.getenv("NATS_NKEY")
)
builder.authHandler(handler)
} else if (
System.getenv("NATS_CREDS") != null && System.getenv("NATS_CREDS") != ""
) {
} else if (System.getenv("NATS_CREDS") != null && System.getenv("NATS_CREDS") != "") {
builder.authHandler(Nats.credentials(System.getenv("NATS_CREDS")));
} else if (System.getenv("NATS_TLS_KEY_STORE") != null && System.getenv("NATS_TLS_KEY_STORE") != "" && System.getenv("NATS_TLS_TRUST_STORE") != null && System.getenv("NATS_TLS_TRUST_STORE") != "") {
}

if (System.getenv("NATS_TLS_KEY_STORE") != null && System.getenv("NATS_TLS_KEY_STORE") != "" && System.getenv("NATS_TLS_TRUST_STORE") != null && System.getenv("NATS_TLS_TRUST_STORE") != "") {

val tlsAlgo = if (System.getenv("NATS_TLS_ALGO") != null && System.getenv("NATS_TLS_ALGO") != "") {
System.getenv("NATS_TLS_ALGO")
Expand All @@ -240,7 +238,6 @@ class NatsConfig(isSource:Boolean) {
System.getenv("NATS_TLS_TRUST_STORE_PASSWORD").toCharArray
} else "".toCharArray


val ctx = javax.net.ssl.SSLContext.getInstance(Options.DEFAULT_SSL_PROTOCOL)

val keyStore = KeyStore.getInstance(instanceType)
Expand Down
Binary file not shown.

0 comments on commit 6884e30

Please sign in to comment.