Skip to content

Commit

Permalink
- Increase min number of retries on errors to 3
Browse files Browse the repository at this point in the history
- Retry on ssl.SSLException
- Backoff retry time on SSLExceptions
  • Loading branch information
nvoxland committed Apr 27, 2024
1 parent 008c897 commit 6cfc9ad
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 1 deletion.
28 changes: 27 additions & 1 deletion deeplake/core/storage/s3.py
Expand Up @@ -5,6 +5,7 @@
import boto3
import botocore # type: ignore
import posixpath
import ssl
from typing import Dict, Optional, Tuple, Type
from datetime import datetime, timezone
from botocore.session import ComponentLocator
Expand Down Expand Up @@ -41,6 +42,7 @@
EndpointConnectionError,
IncompleteReadError,
SSLError,
ssl.SSLError,
)

try:
Expand Down Expand Up @@ -190,8 +192,10 @@ def __setitem__(self, path, content):
self._set(path, content)
except CONNECTION_ERRORS as err:
tries = self.num_tries
retry_wait = 0
for i in range(1, tries + 1):
always_warn(f"Encountered connection error, retry {i} out of {tries}")
retry_wait = self._retry_wait_and_extend(retry_wait, err)
try:
self._set(path, content)
always_warn(
Expand Down Expand Up @@ -280,8 +284,11 @@ def get_bytes(
return self._get_bytes(path, start_byte, end_byte)
except CONNECTION_ERRORS as err:
tries = self.num_tries
retry_wait = 0
for i in range(1, tries + 1):
always_warn(f"Encountered connection error, retry {i} out of {tries}")
retry_wait = self._retry_wait_and_extend(retry_wait, err)

try:
ret = self._get_bytes(path, start_byte, end_byte)
always_warn(
Expand Down Expand Up @@ -322,8 +329,11 @@ def __delitem__(self, path):
self._del(path)
except CONNECTION_ERRORS as err:
tries = self.num_tries
retry_wait = 0
for i in range(1, tries + 1):
always_warn(f"Encountered connection error, retry {i} out of {tries}")
retry_wait = self._retry_wait_and_extend(retry_wait, err)

try:
self._del(path)
always_warn(
Expand All @@ -338,7 +348,7 @@ def __delitem__(self, path):

@property
def num_tries(self):
return min(ceil((time.time() - self.start_time) / 300), 5)
return max(3, min(ceil((time.time() - self.start_time) / 300), 5))

def _keys_iterator(self):
self._check_update_creds()
Expand Down Expand Up @@ -645,8 +655,11 @@ def get_object_from_full_url(self, url: str):
return self._get(path, bucket)
except CONNECTION_ERRORS as err:
tries = self.num_tries
retry_wait = 0
for i in range(1, tries + 1):
always_warn(f"Encountered connection error, retry {i} out of {tries}")
retry_wait = self._retry_wait_and_extend(retry_wait, err)

try:
ret = self._get(path, bucket)
always_warn(
Expand Down Expand Up @@ -685,8 +698,11 @@ def set_items(self, items: dict):
self._set_items(items)
except CONNECTION_ERRORS as err:
tries = self.num_tries
retry_wait = 0
for i in range(1, tries + 1):
always_warn(f"Encountered connection error, retry {i} out of {tries}")
retry_wait = self._retry_wait_and_extend(retry_wait, err)

try:
self._set_items(items)
always_warn(
Expand All @@ -713,3 +729,13 @@ def get_items(self, keys):
yield key, future.result()
else:
yield key, exception

def _retry_wait_and_extend(self, retry_wait: int, err: Exception):
if not (isinstance(err, ssl.SSLError) or isinstance(err, SSLError)):
return 0

time.sleep(retry_wait)

if retry_wait == 0:
return 1
return retry_wait * 2
24 changes: 24 additions & 0 deletions deeplake/core/storage/tests/test_storage_provider.py
@@ -1,4 +1,9 @@
import json
import ssl
import time
from unittest.mock import patch

from deeplake.core import S3Provider
from deeplake.tests.path_fixtures import gcs_creds
from deeplake.tests.common import is_opt_true
from deeplake.tests.storage_fixtures import (
Expand Down Expand Up @@ -229,3 +234,22 @@ def test_azure_empty_blob(azure_storage):
azure_storage.get_object_from_full_url(f"{azure_storage.root}/empty_blob")
== b""
)


@pytest.mark.slow
def test_s3_backoff():
runs = 0
s3 = S3Provider("s3://mock")

def fake_set_items(items: dict):
nonlocal runs
runs = runs + 1

raise ssl.SSLError

start = time.time()
with patch("deeplake.core.storage.s3.S3Provider._set_items", wraps=fake_set_items):
with pytest.raises(Exception):
s3.set_items({"test": "test"})
assert runs == 4
assert 3 < time.time() - start < 5

0 comments on commit 6cfc9ad

Please sign in to comment.