New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add shutdown() and shutdownNow() #673
Changes from 8 commits
7598071
11ceaf0
2e032f2
fc77332
f2c76a1
f416d06
ae5bd94
734bd7e
3f85e64
060a6ca
bdd64df
903f794
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -337,15 +337,18 @@ private void closeStream(final Throwable throwable) { | |
new Runnable() { | ||
@Override | ||
public void run() { | ||
listener.onEvent( | ||
null, | ||
throwable instanceof FirestoreException | ||
? (FirestoreException) throwable | ||
: FirestoreException.forApiException( | ||
new ApiException( | ||
throwable, | ||
GrpcStatusCode.of(getStatus(throwable).getCode()), | ||
false))); | ||
if (throwable instanceof FirestoreException) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These changes are necessary becuse of grpc interrupting the channel while this could still be listenting? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, and the exceptions thrown then do no have status codes. This is all based on local debugging. |
||
listener.onEvent(null, (FirestoreException) throwable); | ||
} else { | ||
Status status = getStatus(throwable); | ||
FirestoreException firestoreException = | ||
FirestoreException.forApiException( | ||
new ApiException( | ||
throwable, | ||
GrpcStatusCode.of(status != null ? status.getCode() : Code.UNKNOWN), | ||
false)); | ||
listener.onEvent(null, firestoreException); | ||
} | ||
} | ||
}); | ||
} | ||
|
@@ -383,31 +386,36 @@ private void initStream() { | |
new Runnable() { | ||
@Override | ||
public void run() { | ||
if (!isActive.get()) { | ||
return; | ||
} | ||
|
||
synchronized (Watch.this) { | ||
try { | ||
if (!isActive.get()) { | ||
return; | ||
} | ||
|
||
Preconditions.checkState(stream == null); | ||
synchronized (Watch.this) { | ||
if (!isActive.get()) { | ||
return; | ||
} | ||
|
||
current = false; | ||
nextAttempt = backoff.createNextAttempt(nextAttempt); | ||
Preconditions.checkState(stream == null); | ||
|
||
Tracing.getTracer().getCurrentSpan().addAnnotation(TraceUtil.SPAN_NAME_LISTEN); | ||
stream = firestore.streamRequest(Watch.this, firestore.getClient().listenCallable()); | ||
current = false; | ||
nextAttempt = backoff.createNextAttempt(nextAttempt); | ||
|
||
ListenRequest.Builder request = ListenRequest.newBuilder(); | ||
request.setDatabase(firestore.getDatabaseName()); | ||
request.setAddTarget(target); | ||
if (resumeToken != null) { | ||
request.getAddTargetBuilder().setResumeToken(resumeToken); | ||
} | ||
Tracing.getTracer().getCurrentSpan().addAnnotation(TraceUtil.SPAN_NAME_LISTEN); | ||
stream = | ||
firestore.streamRequest(Watch.this, firestore.getClient().listenCallable()); | ||
|
||
ListenRequest.Builder request = ListenRequest.newBuilder(); | ||
request.setDatabase(firestore.getDatabaseName()); | ||
request.setAddTarget(target); | ||
if (resumeToken != null) { | ||
request.getAddTargetBuilder().setResumeToken(resumeToken); | ||
} | ||
|
||
stream.onNext(request.build()); | ||
stream.onNext(request.build()); | ||
} | ||
} catch (Throwable throwable) { | ||
onError(throwable); | ||
} | ||
} | ||
}, | ||
|
@@ -549,6 +557,10 @@ private List<DocumentChange> computeSnapshot(Timestamp readTime) { | |
private static boolean isPermanentError(Throwable throwable) { | ||
Status status = getStatus(throwable); | ||
|
||
if (status == null) { | ||
return true; | ||
} | ||
|
||
switch (status.getCode()) { | ||
case CANCELLED: | ||
case UNKNOWN: | ||
|
@@ -563,20 +575,20 @@ private static boolean isPermanentError(Throwable throwable) { | |
} | ||
} | ||
|
||
/** Extracts the GRPC status code if available. Returns UNKNOWN for non-GRPC exceptions. */ | ||
/** Extracts the GRPC status code if available. Returns `null` for non-GRPC exceptions. */ | ||
@Nullable | ||
private static Status getStatus(Throwable throwable) { | ||
Status status = Status.UNKNOWN; | ||
|
||
if (throwable instanceof StatusRuntimeException) { | ||
status = ((StatusRuntimeException) throwable).getStatus(); | ||
return ((StatusRuntimeException) throwable).getStatus(); | ||
} else if (throwable instanceof StatusException) { | ||
status = ((StatusException) throwable).getStatus(); | ||
return ((StatusException) throwable).getStatus(); | ||
} | ||
return status; | ||
return null; | ||
} | ||
|
||
/** Determines whether we need to initiate a longer backoff due to system overload. */ | ||
private static boolean isResourceExhaustedError(Throwable throwable) { | ||
return getStatus(throwable).getCode().equals(Code.RESOURCE_EXHAUSTED); | ||
Status status = getStatus(throwable); | ||
return status != null && status.getCode().equals(Code.RESOURCE_EXHAUSTED); | ||
}; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm surprised this doesn't still call
firestoreClient.close()
. It looks like inGrpcFirestoreRpc
you factored things to keep the existing behavior forclose()
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have an opinion here - I do think it makes sense to keep the existing semantics, especially since we are adding a couple of new methods. If I call
close()
here the newly added shutdown tests time out though. Let me know if you want me to go back to calling close(), in which case I will remove the tests.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think keeping the existing semantic is probably beneficial.
For updating the test we may be able to do something like:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated. Thanks for the reviews.