Navigation Menu

Skip to content

Commit

Permalink
fix: fix a NullPtr when user closes a writer without connection being…
Browse files Browse the repository at this point in the history
… ever established (#1454)
  • Loading branch information
yirutang committed Dec 29, 2021
1 parent cb8b0ad commit b774f5d
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 2 deletions.
Expand Up @@ -27,6 +27,11 @@

/** Exceptions for Storage Client Libraries. */
public final class Exceptions {
public static class WriterClosedException extends Exception {
public WriterClosedException(String streamName) {
super("Writer closed on: " + streamName);
}
}
/** Main Storage Exception. Might contain map of streams to errors for that stream. */
public static class StorageException extends RuntimeException {

Expand Down
Expand Up @@ -502,11 +502,13 @@ private AppendRowsRequest prepareRequestBasedOnPosition(
}

private void cleanupInflightRequests() {
Throwable finalStatus;
Throwable finalStatus = new Exceptions.WriterClosedException(streamName);
Deque<AppendRequestAndResponse> localQueue = new LinkedList<AppendRequestAndResponse>();
this.lock.lock();
try {
finalStatus = this.connectionFinalStatus;
if (this.connectionFinalStatus != null) {
finalStatus = this.connectionFinalStatus;
}
while (!this.inflightRequestQueue.isEmpty()) {
localQueue.addLast(pollInflightRequestQueue());
}
Expand Down
Expand Up @@ -638,4 +638,12 @@ public void testRetryAfterAllRecordsInflight() throws Exception {
assertEquals(1, appendFuture2.get().getAppendResult().getOffset().getValue());
}
}

@Test
public void testWriterClosedStream() throws Exception {
try (StreamWriter writer = getTestStreamWriter()) {
// Writer is closed without any traffic.
TimeUnit.SECONDS.sleep(1);
}
}
}

0 comments on commit b774f5d

Please sign in to comment.