-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Exception] Add initial DeltaCatError support #290
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
⚠️ Performance Alert ⚠️
Possible performance regression was detected for benchmark.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 2
.
Benchmark suite | Current: 785962d | Previous: d77a208 | Ratio |
---|---|---|---|
deltacat/tests/compute/test_compact_partition_incremental.py::test_compact_partition_incremental[9-incremental-decimal-pk-multi-dup_V1] |
1.13810172851053 iter/sec (stddev: 0 ) |
2.294583661883228 iter/sec (stddev: 0 ) |
2.02 |
deltacat/tests/compute/test_compact_partition_incremental.py::test_compact_partition_incremental[10-incremental-decimal-pk-partitionless_V2] |
1.074566958595815 iter/sec (stddev: 0 ) |
2.253822151056081 iter/sec (stddev: 0 ) |
2.10 |
deltacat/tests/compute/test_compact_partition_incremental.py::test_compact_partition_incremental[12-incremental-decimal-single-hash-bucket_V2] |
1.311349877966825 iter/sec (stddev: 0 ) |
2.6295975732771018 iter/sec (stddev: 0 ) |
2.01 |
This comment was automatically generated by workflow using github-action-benchmark.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good so far. I didn't see code resource increase in the subsequent retries if Ray worker dies or out of memory occurs.
|
||
|
||
@parametrized | ||
def handle_exception(func, task_id): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we get task_id
inside this function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason for not resolving this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did't get your original comment. Are you asking if we are able to pass task_id
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of passing task_id to this function, can we fetch it inside this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This handle_exception
is used as a function annotation to all Ray remote tasks. Not sure how to fetch it inside?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just call get_current_ray_task_id
within this function :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I see what you mean, will update in next revision.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any update on this PR?
Can you look at this performance drop as well? |
9c7462b
to
5478ad9
Compare
The performance alert is gone after the address comment commit. Not consistent. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you reply on the comments you don't plan to address?
deltacat/aws/s3u.py
Outdated
) from e | ||
raise DownloadTableError( | ||
msg=f"Failed table download from: {s3_url}", s3_url=s3_url | ||
) from e | ||
except BaseException as e: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not resolved. Are you planning on resolving it?
task_id=task_id, | ||
) from e | ||
else: | ||
raise DependencyRayError() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just re-raise Ray e? You are losing all the stack trace and exception messages by raising a custom error at this point. Moreover, you are bypassing Ray's default retries on system errors.
6bbaf98
to
3abf95a
Compare
Co-authored-by: Raghavendra M Dani <draghave@amazon.com> Signed-off-by: pf <19919899+pfaraone@users.noreply.github.com>
Co-authored-by: Raghavendra M Dani <draghave@amazon.com> Signed-off-by: pf <19919899+pfaraone@users.noreply.github.com>
Co-authored-by: Raghavendra M Dani <draghave@amazon.com> Signed-off-by: pf <19919899+pfaraone@users.noreply.github.com>
ee5aa40
to
785962d
Compare
785962d
to
b51b9ea
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Took a fresh look again. Please address few final comments.
ConnectionError, | ||
TimeoutError, | ||
DaftTransientError, | ||
tenacity.RetryError, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this should have DeltaCAT RetryableError
only and probably tenacity.RetryError
(unless you re-raise the wrapped error) as all other errors will be catched by exception handler and converted to RetryableError
before Ray catches them.
return task_id, node_ip | ||
|
||
|
||
def _handle_ray_error(e: Exception): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def _handle_ray_error(e: Exception): | |
def _handle_ray_error(e: RayError): |
def mock_raise_exception(self, exception_to_raise): | ||
raise exception_to_raise | ||
|
||
def test_exception_handler(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need one test all top level branches atleast preferably the ones where we extract the wrapped exception and re-raise the error.
msg=f"Daft error occurred while executing task:{task_id} on node ip:{node_ip}.", | ||
task_id=task_id, | ||
node_ip=node_ip, | ||
) from e |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are we ignoring if error doesn't fall into any of the above?
) from e | ||
|
||
|
||
def _handle_daft_error(e: Exception): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here regarding typing.
class DeltaCatErrorNames(str, Enum): | ||
|
||
# Dependency Error | ||
GENERAL_DEPENDENCY_ERROR = "GeneralDependencyError" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A description would be helpful regarding what each error means.
This PR add initial DeltaCat exception support.
With this support:
DeltaCat exception can be easily identified by error code going forward.
Retryable DeltaCat error can be identified at task level and job level.