Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

After application restart, total number of processed messages at the end is not same as input messages #11

Open
vivekgh24 opened this issue Nov 27, 2023 · 1 comment
Labels
defect Suspected defect such as a bug or regression

Comments

@vivekgh24
Copy link

vivekgh24 commented Nov 27, 2023

What version were you using?

nats-server: v2.0.0
Jetstream Version: 2.10.4

What environment was the server running in?

Local - On Eclipse IDE running in my system

Is this defect reproducible?

  1. Run below Java code to pull messages from NATS Jetstream , process and save them
private static void sparkNatsTester() {
		 
	        SparkSession spark = SparkSession.builder()
	                .appName("spark-with-nats")
	                .master("local")
	                  .config("spark.jars",
	                  "libs/nats-spark-connector-balanced_2.12-1.1.4.jar,"+"libs/jnats-2.17.1.jar"
	                  )
	                 .config("spark.sql.streaming.checkpointLocation","tmp/checkpoint")
	      	        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
	      	      	.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
	                  .getOrCreate();
	        System.out.println("sparkSession : "+ spark);
	        Dataset<Row> df = spark.readStream()
	                .format("nats")
	                .option("nats.host", "localhost")
	                .option("nats.port", 4222)
	                .option("nats.stream.name", "newstream")
	                .option("nats.stream.subjects", "newsub")
	                .option("nats.durable.name", "cons1")
	                .option("nats.msg.ack.wait.secs", 120)
	                .load();
	        StreamingQuery query;
	        try {
	            query = df.withColumn("date_only", from_unixtime(unix_timestamp(col("dateTime"),  "MM/dd/yyyy - HH:mm:ss Z"), "MM/dd/yyyy"))
	            		.writeStream()
	                      .outputMode("append")
	                      .partitionBy("date_only")
	                      .format("delta")
	                      .option("path", "tmp/outputdelta")
	                      .start();
	            query.awaitTermination();
	        } catch (Exception e) {
	            e.printStackTrace();
	        } 
	    }
  1. From NATS CLI, run bellow command to push 10k messages for testing
    nats pub newsub --count=10000 "test #{{Count}}"

  2. While spark application is processing the messages, stop the spark application.

  3. After sometime restart the spark application.

Given the capability you are leveraging, describe your expectation?

Since 10k messages were pushed to NATS Jetstream as input, after spark application processed all the messages ( after in-between stop and restarting the spark application) the number of processed messages in the output folder "tmp/outputdelta" should be exactly 10k . That is no. of input messages should have been same as no. of output/processed messages.

Given the expectation, what is the defect you are observing?

No. of output/processed messages in the output folder is always greater than the no. of input messages . In above scenario output messages = 10100 where as only 10000 messages were pushed to NATS Jetstream as input. 100 messages were duplicated !

Observations :
As I can see in the tmp\outputdelta_delta_log folder , the last file which got generated before stopping the application contains below :

{"commitInfo":{"timestamp":1701081648739,"operation":"STREAMING UPDATE","operationParameters":{"outputMode":"Append","queryId":"193fec24-78ea-4ac0-87af-7b6232e748ff","epochId":"20"},"readVersion":19,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numRemovedFiles":"0","numOutputRows":"100","numOutputBytes":"1753","numAddedFiles":"1"},"engineInfo":"Apache-Spark/3.3.3 Delta-Lake/2.3.0","txnId":"36a435e7-f775-4f80-a637-48a40c23dc87"}}
{"txn":{"appId":"193fec24-78ea-4ac0-87af-7b6232e748ff","version":20,"lastUpdated":1701081648739}}
{"add":{"path":"date_only=11%252F27%252F2023/part-00000-5c9955af-06c7-4da6-991a-2412f033cd38.c000.snappy.parquet","partitionValues":{"date_only":"11/27/2023"},"size":1753,"modificationTime":1701081648739,"dataChange":true,"stats":"{\**"numRecords\":100**,\"minValues\":{\"subject\":\"newsub\",\"dateTime\":\"11/27/2023 - 16:10:16 +0530\",\"content\":\"**test #3001**\"},\"maxValues\":{\"subject\":\"newsub\",\"dateTime\":\"11/27/2023 - 16:10:16 +0530\",\"content\":\"**test #3100**\"},\"nullCount\":{\"subject\":0,\"dateTime\":0,\"content\":0}}"}}

And the first file which got generated right after restarting the spark application contains below :

{"commitInfo":{"timestamp":1701081772472,"operation":"STREAMING UPDATE","operationParameters":{"outputMode":"Append","queryId":"f8d338e3-9eb2-49a2-96df-2a4c0e368285","epochId":"10"},"readVersion":30,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numRemovedFiles":"0","numOutputRows":"1000","numOutputBytes":"6250","numAddedFiles":"1"},"engineInfo":"Apache-Spark/3.3.3 Delta-Lake/2.3.0","txnId":"d1a5e794-6827-4c3d-bfa0-e1fd9e3474c9"}}
{"txn":{"appId":"f8d338e3-9eb2-49a2-96df-2a4c0e368285","version":10,"lastUpdated":1701081772472}}
{"add":{"path":"date_only=11%252F27%252F2023/part-00000-8e45d89b-6426-43f5-8144-ee826874dcfb.c000.snappy.parquet","partitionValues":{"date_only":"11/27/2023"},"size":6250,"modificationTime":1701081772455,"dataChange":true,"stats":"{\"**numRecords\":1000**,\"minValues\":{\"subject\":\"newsub\",\"dateTime\":\"11/27/2023 - 16:10:16 +0530\",\"content\":\"**test #3001**\"},\"maxValues\":{\"subject\":\"newsub\",\"dateTime\":\"11/27/2023 - 16:10:16 +0530\",\"content\":\"**test #4000**\"},\"nullCount\":{\"subject\":0,\"dateTime\":0,\"content\":0}}"}}

As you can see, before application was stopped, it had processed 3100 messages. But after spark application was restarted , it again processed from 3000 instead of 3100. That's where we see 100 messages were duplicated in the output folder.
Also I'm using Durable consumer which resends the messages if acknowledgment wasn't done after consumption. So looks like for the last microbatch which was processed just before app shutdown ( 3000 to 3100 ) acknowledgment wasn't done so it resent those messages again which got duplicated
How to fix this issue ?

@vivekgh24 vivekgh24 added the defect Suspected defect such as a bug or regression label Nov 27, 2023
@vivekgh24
Copy link
Author

@jnmoyne @gcolliso @sergiosennder @scottf @bruth Can you please help us on this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
defect Suspected defect such as a bug or regression
Projects
None yet
Development

No branches or pull requests

1 participant