From 1d2ae21710c336b8a33dc8490ea1716dbbe84ed9 Mon Sep 17 00:00:00 2001 From: Mike Wasson <3992422+MikeWasson@users.noreply.github.com> Date: Thu, 31 Dec 2020 01:27:02 +0000 Subject: [PATCH] Added retry with exponential backoff --- .../bigquerystorage/WriteCommittedStream.java | 45 ++++++++++++++++--- 1 file changed, 40 insertions(+), 5 deletions(-) diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java index 1fc6b0f45b..5b374dbc03 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java @@ -16,6 +16,7 @@ package com.example.bigquerystorage; +import com.google.api.client.util.*; import com.google.api.core.ApiFuture; import com.google.cloud.bigquery.*; import com.google.cloud.bigquery.storage.v1beta2.*; @@ -27,10 +28,20 @@ public class WriteCommittedStream { - public static Status.Code getStatusCode(StatusRuntimeException e) { + static Status.Code getStatusCode(StatusRuntimeException e) { return e.getStatus().getCode(); } + // Returns true if the operation should be retried. + static Boolean isRetryable(ExecutionException e) { + Throwable cause = e.getCause(); + if (cause instanceof StatusRuntimeException) { + Status status = ((StatusRuntimeException)cause).getStatus(); + return (status == Status.ABORTED); + } + return false; + } + public static void runWriteCommittedStream() { // TODO(developer): Replace these variables before running the sample. String projectId = "MY_PROJECT_ID"; @@ -85,7 +96,7 @@ public static void writeCommittedStream(String projectId, String datasetName, St System.out.println("Appended records successfully."); } catch (Exception e) { - System.out.println("Failed to append records. \n" + e.toString()); + System.out.println("Failed to append records.\n" + e.toString()); } } @@ -100,17 +111,41 @@ public static void writeToDefaultStream(String projectId, String datasetName, St try (JsonStreamWriter writer = JsonStreamWriter.newBuilder(parent.toString(), schema).createDefaultStream().build()) { + ExponentialBackOff backoff = new ExponentialBackOff(); + for (int i = 0; i < 10; i++) { JSONObject record = new JSONObject(); record.put("col1", String.format("record %03d", i)); JSONArray jsonArr = new JSONArray(); jsonArr.put(record); - ApiFuture future = writer.append(jsonArr); - AppendRowsResponse response = future.get(); + backoff.reset(); + Boolean retry = true; + while (retry) { + try { + + ApiFuture future = writer.append(jsonArr); + AppendRowsResponse response = future.get(); + retry = false; + + } catch (ExecutionException ex) { + // If the error is retryable, retry the operation with exponential backoff. + // Don't retry past the maximum retry interval. + long backOffMillis = backoff.nextBackOffMillis(); + if (isRetryable(ex) && backOffMillis != BackOff.STOP) { + Thread.sleep(backOffMillis); + } + else { + throw ex; + } + } + } } + + System.out.println("Appended records successfully."); + } catch (Exception e) { - System.out.println(e); + System.out.println("Failed to append records.\n" + e.toString()); } } }