Skip to content

Commit

Permalink
Refactor status code logic to be more readable (#1132)
Browse files Browse the repository at this point in the history
  • Loading branch information
lzchen committed Jun 24, 2022
1 parent 0919b61 commit d258dfb
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 130 deletions.
2 changes: 2 additions & 0 deletions contrib/opencensus-ext-azure/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

- Shutdown Statsbeat when hitting error/exception threshold
([#1127](https://github.com/census-instrumentation/opencensus-python/pull/1127))
- Fix failure counting statsbeat - refactor status code logic in transport
([#1132](https://github.com/census-instrumentation/opencensus-python/pull/1132))

## 1.1.4
Released 2022-04-20
Expand Down
214 changes: 113 additions & 101 deletions contrib/opencensus-ext-azure/opencensus/ext/azure/common/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,119 +152,37 @@ def _transmit(self, envelopes):
return status

text = 'N/A'
data = None
status_code = 0
try:
text = response.text
status_code = response.status_code
except Exception as ex:
if not self._is_stats_exporter():
logger.warning('Error while reading response body %s.', ex)
else:
try:
data = json.loads(text)
except Exception:
pass
if self._check_stats_collection():
_requests_map['exception'] = _requests_map.get('exception', 0) + 1 # noqa: E501
return TransportStatusCode.DROP

if self._is_stats_exporter() and \
not state.get_statsbeat_shutdown() and \
not state.get_statsbeat_initial_success():
# If statsbeat exporter, record initialization as success if
# appropriate status code is returned
if _reached_ingestion_status_code(response.status_code):
if _reached_ingestion_status_code(status_code):
state.set_statsbeat_initial_success(True)
elif _statsbeat_failure_reached_threshold():
# If ingestion threshold during statsbeat initialization is
# reached, return back code to shut it down
return TransportStatusCode.STATSBEAT_SHUTDOWN

if response.status_code == 200:
if status_code == 200: # Success
self._consecutive_redirects = 0
if self._check_stats_collection():
with _requests_lock:
_requests_map['success'] = _requests_map.get('success', 0) + 1 # noqa: E501
return TransportStatusCode.SUCCESS
# Status code not 200, 439 or 402 counts as failures
if self._check_stats_collection():
if response.status_code != 439 and response.status_code != 402:
with _requests_lock:
_requests_map['failure'] = _requests_map.get('failure', 0) + 1 # noqa: E501
if response.status_code == 206: # Partial Content
if data:
try:
resend_envelopes = []
for error in data['errors']:
if error['statusCode'] in (
429, # Too Many Requests
500, # Internal Server Error
503, # Service Unavailable
):
resend_envelopes.append(envelopes[error['index']])
else:
logger.error(
'Data drop %s: %s %s.',
error['statusCode'],
error['message'],
envelopes[error['index']],
)
if resend_envelopes:
if self._check_stats_collection():
with _requests_lock:
_requests_map['retry'] = _requests_map.get('retry', 0) + 1 # noqa: E501
self.storage.put(resend_envelopes)
except Exception as ex:
if not self._is_stats_exporter():
logger.error(
'Error while processing %s: %s %s.',
response.status_code,
text,
ex,
)
return TransportStatusCode.DROP
# cannot parse response body, fallback to retry
if response.status_code in (
206, # Partial Content
429, # Too Many Requests
500, # Internal Server Error
503, # Service Unavailable
):
if not self._is_stats_exporter():
logger.warning(
'Transient server side error %s: %s.',
response.status_code,
text,
)
# server side error (retryable)
if self._check_stats_collection():
with _requests_lock:
_requests_map['retry'] = _requests_map.get('retry', 0) + 1 # noqa: E501
return TransportStatusCode.RETRY
# Authentication error
if response.status_code == 401:
if not self._is_stats_exporter():
logger.warning(
'Authentication error %s: %s.',
response.status_code,
text,
)
if self._check_stats_collection():
with _requests_lock:
_requests_map['retry'] = _requests_map.get('retry', 0) + 1 # noqa: E501
return TransportStatusCode.RETRY
# Forbidden error
# Can occur when v2 endpoint is used while AI resource is configured
# with disableLocalAuth
if response.status_code == 403:
if not self._is_stats_exporter():
logger.warning(
'Forbidden error %s: %s.',
response.status_code,
text,
)
if self._check_stats_collection():
with _requests_lock:
_requests_map['retry'] = _requests_map.get('retry', 0) + 1 # noqa: E501
return TransportStatusCode.RETRY
# Redirect
if response.status_code in (307, 308):
elif _status_code_is_redirect(status_code): # Redirect
# for statsbeat, these are not tracked as success nor failures
self._consecutive_redirects += 1
if self._consecutive_redirects < _MAX_CONSECUTIVE_REDIRECTS:
if response.headers:
Expand All @@ -290,20 +208,114 @@ def _transmit(self, envelopes):
if self._check_stats_collection():
with _requests_lock:
_requests_map['exception'] = _requests_map.get('exception', 0) + 1 # noqa: E501
# Other, server side error (non-retryable)
if not self._is_stats_exporter():
logger.error(
'Non-retryable server side error %s: %s.',
response.status_code,
text,
)
if self._check_stats_collection():
if response.status_code == 402 or response.status_code == 439:
return TransportStatusCode.DROP
elif _status_code_is_throttle(status_code): # Throttle
if self._check_stats_collection():
# 402: Monthly Quota Exceeded (new SDK)
# 439: Monthly Quota Exceeded (old SDK) <- Currently OC SDK
with _requests_lock:
_requests_map['throttle'] = _requests_map.get('throttle', 0) + 1 # noqa: E501
return TransportStatusCode.DROP
return TransportStatusCode.DROP
elif _status_code_is_retryable(status_code): # Retry
if not self._is_stats_exporter():
if status_code == 401: # Authentication error
logger.warning(
'Authentication error %s: %s.',
status_code,
text,
)
elif status_code == 403:
# Forbidden error
# Can occur when v2 endpoint is used while AI resource is configured # noqa: E501
# with disableLocalAuth
logger.warning(
'Forbidden error %s: %s.',
status_code,
text,
)
else:
logger.warning(
'Transient server side error %s: %s. Retrying.',
status_code,
text,
)
if self._check_stats_collection():
with _requests_lock:
_requests_map['retry'] = _requests_map.get('retry', 0) + 1 # noqa: E501
return TransportStatusCode.RETRY
elif status_code == 206: # Partial Content
data = None
try:
data = json.loads(text)
except Exception as ex:
if not self._is_stats_exporter():
logger.warning('Error while reading response body %s for partial content.', ex) # noqa: E501
if self._check_stats_collection():
_requests_map['exception'] = _requests_map.get('exception', 0) + 1 # noqa: E501
return TransportStatusCode.DROP
if data:
try:
resend_envelopes = []
for error in data['errors']:
if _status_code_is_retryable(error['statusCode']):
resend_envelopes.append(envelopes[error['index']])
if self._check_stats_collection():
with _requests_lock:
_requests_map['retry'] = _requests_map.get('retry', 0) + 1 # noqa: E501
else:
logger.error(
'Data drop %s: %s %s.',
error['statusCode'],
error['message'],
envelopes[error['index']],
)
if self.storage and resend_envelopes:
self.storage.put(resend_envelopes)
except Exception as ex:
if not self._is_stats_exporter():
logger.error(
'Error while processing %s: %s %s.',
status_code,
text,
ex,
)
if self._check_stats_collection():
_requests_map['exception'] = _requests_map.get('exception', 0) + 1 # noqa: E501
return TransportStatusCode.DROP
# cannot parse response body, fallback to retry
else:
# 400 and 404 will be tracked as failure count
# 400 - Invalid - The server cannot or will not process the request due to the invalid telemetry (invalid data, iKey) # noqa: E501
# 404 - Ingestion is allowed only from stamp specific endpoint - must update connection string # noqa: E501
if self._check_stats_collection():
with _requests_lock:
_requests_map['failure'] = _requests_map.get('failure', 0) + 1 # noqa: E501
# Other, server side error (non-retryable)
if not self._is_stats_exporter():
logger.error(
'Non-retryable server side error %s: %s.',
status_code,
text,
)
return TransportStatusCode.DROP


def _status_code_is_redirect(status_code):
return status_code in (307, 308)


def _status_code_is_throttle(status_code):
return status_code in (402, 439)


def _status_code_is_retryable(status_code):
return status_code in (
401, # Unauthorized
403, # Forbidden
429, # Too many requests
500, # Internal server error
503, # Service unavailable
)


def _reached_ingestion_status_code(status_code):
Expand Down

0 comments on commit d258dfb

Please sign in to comment.