Skip to content

Commit

Permalink
segmented batch profiles vs transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
Richard Rogers committed Apr 26, 2024
1 parent f86f8dd commit 55907da
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 33 deletions.
2 changes: 1 addition & 1 deletion python/whylogs/api/writer/whylabs.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ def write(

# Writable is some flavor of profile -- DatasetProfile, [Segmented]DatasetProfileView, or ResultSet

if self._transaction_id is not None:
if (self._transaction_id is not None) or kwargs.get("transaction"):
if self._reference_profile_name is not None:
return False, "Cannot send reference profiles in a transaction"
return self._get_writer(WhyLabsTransactionWriter).write(
Expand Down
32 changes: 1 addition & 31 deletions python/whylogs/api/writer/whylabs_batch_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,36 +69,6 @@ def __init__(
):
super().__init__(org_id, api_key, dataset_id, api_client, ssl_ca_cert, _timeout_seconds, whylabs_client)

def _write_segmented_result_set(self, file: SegmentedResultSet, **kwargs: Any) -> Tuple[bool, str]:
views = file.get_writables()
if not views:
logger.warning("Attempt to write a result set with no writables, nothing written!")
return True, ""

whylabs_tags = file.get_whylabs_tags()
utc_now = datetime.datetime.now(datetime.timezone.utc)
and_status: bool = True
messages: List[str] = list()
logger.debug(f"About to write {len(views)} files:")
transaction_id = self._whylabs_client.get_transaction_id() # type: ignore
self._whylabs_client._transaction_id = transaction_id # type: ignore
self._transaction_id = transaction_id
for view, tags in zip(views, whylabs_tags):
dataset_timestamp_epoch = self._get_dataset_epoch(view, utc_now)
profile_id, upload_url = self._whylabs_client.get_upload_url_transaction(dataset_timestamp_epoch, tags) # type: ignore
bool_status, message = self._upload_view(
view, profile_id, upload_url, dataset_timestamp_epoch, tags, **kwargs
)
messages.append(message)
and_status = and_status and bool_status

if and_status:
self._whylabs_client.commit_transaction(transaction_id) # type: ignore

logger.debug(f"Completed writing {len(views)} files!")
return and_status, "; ".join(messages) if and_status else "Failed to upload all segments"

"""
def _write_segmented_result_set(self, file: SegmentedResultSet, **kwargs: Any) -> Tuple[bool, str]:
views = file.get_writables()
if not views:
Expand All @@ -119,7 +89,6 @@ def _write_segmented_result_set(self, file: SegmentedResultSet, **kwargs: Any) -

logger.debug(f"Completed writing {len(views)} files!")
return and_status, ("; ".join(messages) if and_status else "Failed to upload all segments")
"""

# TODO: Sadly, we can't use Writer::_create_zip() because we have to fiddle with the
# views before serializing them. We could add a Writable fiddling call-back argument
Expand Down Expand Up @@ -168,6 +137,7 @@ def write(
self._whylabs_client = self._whylabs_client.option(**kwargs) # type: ignore

if isinstance(file, SegmentedResultSet):
# TODO: force zip if the number of segemtns is large
if kwargs.get("zip"):
return self._write_segmented_result_set_zip(file, **kwargs)
else:
Expand Down
2 changes: 1 addition & 1 deletion python/whylogs/api/writer/whylabs_reference_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def write(
self._whylabs_client = self._whylabs_client.option(**kwargs) # type: ignore

if isinstance(file, SegmentedResultSet):
if kwargs.get("zip"):
if kwargs.get("zip"): # TODO: force zip if the number of segemtns is large
return self._write_segmented_result_set_zip(file, **kwargs)
else:
return self._write_segmented_result_set(file, **kwargs)
Expand Down

0 comments on commit 55907da

Please sign in to comment.