New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: retry pdml transaction on EOS internal error #360
Changes from 1 commit
8a1d690
d3683b0
88f4bb5
b145058
b8da6c1
3be1346
5edd3eb
1ff9092
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -20,6 +20,7 @@ | |||
|
||||
import com.google.api.gax.grpc.GrpcStatusCode; | ||||
import com.google.api.gax.rpc.DeadlineExceededException; | ||||
import com.google.api.gax.rpc.InternalException; | ||||
import com.google.api.gax.rpc.ServerStream; | ||||
import com.google.api.gax.rpc.UnavailableException; | ||||
import com.google.cloud.spanner.SessionImpl.SessionTransaction; | ||||
|
@@ -55,23 +56,6 @@ class PartitionedDMLTransaction implements SessionTransaction { | |||
this.rpc = rpc; | ||||
} | ||||
|
||||
private ByteString initTransaction() { | ||||
final BeginTransactionRequest request = | ||||
BeginTransactionRequest.newBuilder() | ||||
.setSession(session.getName()) | ||||
.setOptions( | ||||
TransactionOptions.newBuilder() | ||||
.setPartitionedDml(TransactionOptions.PartitionedDml.getDefaultInstance())) | ||||
.build(); | ||||
Transaction txn = rpc.beginTransaction(request, session.getOptions()); | ||||
if (txn.getId().isEmpty()) { | ||||
throw SpannerExceptionFactory.newSpannerException( | ||||
ErrorCode.INTERNAL, | ||||
"Failed to init transaction, missing transaction id\n" + session.getName()); | ||||
} | ||||
return txn.getId(); | ||||
} | ||||
|
||||
/** | ||||
* Executes the {@link Statement} using a partitioned dml transaction with automatic retry if the | ||||
* transaction was aborted. The update method uses the ExecuteStreamingSql RPC to execute the | ||||
|
@@ -127,20 +111,24 @@ long executeStreamingPartitionedUpdate(final Statement statement, final Duration | |||
} | ||||
} | ||||
break; | ||||
} catch (UnavailableException e) { | ||||
} catch (UnavailableException | InternalException e) { | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In this catch block we retry on EOS exception. |
||||
// Retry the stream in the same transaction if the stream breaks with | ||||
// UnavailableException and we have a resume token. Otherwise, we just retry the | ||||
// entire transaction. | ||||
if (!ByteString.EMPTY.equals(resumeToken)) { | ||||
log.log( | ||||
Level.FINER, | ||||
"Retrying PartitionedDml stream using resume token '" | ||||
+ resumeToken.toStringUtf8() | ||||
+ "' because of broken stream", | ||||
e); | ||||
if (shouldResumeOrRestartTransaction(e)) { | ||||
if (!ByteString.EMPTY.equals(resumeToken)) { | ||||
log.log( | ||||
Level.FINER, | ||||
"Retrying PartitionedDml stream using resume token '" | ||||
+ resumeToken.toStringUtf8() | ||||
+ "' because of broken stream", | ||||
e); | ||||
} else { | ||||
throw new com.google.api.gax.rpc.AbortedException( | ||||
e, GrpcStatusCode.of(Code.ABORTED), true); | ||||
} | ||||
} else { | ||||
throw new com.google.api.gax.rpc.AbortedException( | ||||
e, GrpcStatusCode.of(Code.ABORTED), true); | ||||
throw e; | ||||
} | ||||
} | ||||
} | ||||
|
@@ -174,4 +162,27 @@ public void invalidate() { | |||
// No-op method needed to implement SessionTransaction interface. | ||||
@Override | ||||
public void setSpan(Span span) {} | ||||
|
||||
private ByteString initTransaction() { | ||||
final BeginTransactionRequest request = | ||||
BeginTransactionRequest.newBuilder() | ||||
.setSession(session.getName()) | ||||
.setOptions( | ||||
TransactionOptions.newBuilder() | ||||
.setPartitionedDml(TransactionOptions.PartitionedDml.getDefaultInstance())) | ||||
.build(); | ||||
Transaction txn = rpc.beginTransaction(request, session.getOptions()); | ||||
if (txn.getId().isEmpty()) { | ||||
throw SpannerExceptionFactory.newSpannerException( | ||||
ErrorCode.INTERNAL, | ||||
"Failed to init transaction, missing transaction id\n" + session.getName()); | ||||
} | ||||
return txn.getId(); | ||||
} | ||||
|
||||
private boolean shouldResumeOrRestartTransaction(Exception e) { | ||||
return e instanceof UnavailableException | ||||
|| (e instanceof InternalException | ||||
&& e.getMessage().contains("Received unexpected EOS on DATA frame from server")); | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is hacky, but unfortunately we do not get a specific exception for this error, so we have to proxy through the exception message. This follows the approach taken in the bigquery client: https://github.com/googleapis/java-bigquerystorage/pull/263/files. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The same error is also retried for streaming queries, and this check could probably use the existing check in Line 233 in b264100
|
||||
} | ||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method was just moved down (after the public methods).