Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add operation-like helpers for queries #560

Open
chegoryu opened this issue Apr 29, 2024 · 0 comments
Open

Add operation-like helpers for queries #560

chegoryu opened this issue Apr 29, 2024 · 0 comments
Assignees
Labels
query tracker YQL/query tracker related

Comments

@chegoryu
Copy link
Contributor

Right now there is no sugar for working with queries.
Even query waiting must be implemented manually.

I want to have official sugar like this:

class Operation(object):
"""Holds information about started operation."""
def __init__(self, id,
type=None, finalization_actions=None,
abort_exceptions=(KeyboardInterrupt, TimeoutError), client=None):

I have my own partial implementation:

Code
# Some internal imports are skipped.

from yt.common import YtError
from yt.wrapper import YtClient
from yt.wrapper.exceptions_catcher import ExceptionCatcher
from yt.wrapper.operation_commands import TimeWatcher
from yt.wrapper.response_stream import ResponseStream
from yt.yson.yson_types import YsonMap

# https://github.com/ydb-platform/ydb/blob/9bcbbd617cfcf26e791b5f16cac239f3fa1bc632/ydb/library/yql/core/yql_execution.h#L11-L15
_YQL_OPERATION_PROGRESS_STATES = [
    "Started",
    "InProgress",
    "Finished",
    "Failed",
    "Aborted",
]
_YQL_OPERATION_PROGRESS_STATES_SNAKE_CASE = [
    "started",
    "in_progress",
    "finished",
    "failed",
    "aborted",
]

def _format_yql_progress(state_counters: dict[str, int]) -> str:
    result = []
    # Same as operation progress: https://github.com/ytsaurus/ytsaurus/blob/a3233e6e3ff64374a4f6cc7febc9302834beb4fa/yt/python/yt/wrapper/operation_commands.py#L390-L392
    for camel_case_state, snake_case_state in zip(_YQL_OPERATION_PROGRESS_STATES, _YQL_OPERATION_PROGRESS_STATES_SNAKE_CASE):
        value = state_counters.get(camel_case_state, 0)
        result.append(f"{snake_case_state}={value:<5}")

    return " ".join(result)

def _pretty_print_time(seconds: int | float) -> str:
    return time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime(seconds))


class YtQueryFailedError(YtError):
    def __init__(self, id: str, engine: TQueryEngine, state: str, error: YtError | None, url: str):
        message = "Query {0} {1}".format(id, state)
        attributes = {"id": id, "engine": engine, "state": state, "url": url}

        inner_errors = []
        if error is not None:
            inner_errors.append(error)

        super().__init__(message, attributes=attributes, inner_errors=inner_errors)


class YtQueryState:
    def __init__(self, name: str):
        self.name = name

    def is_finished(self) -> bool:
        return self.name in ("aborted", "completed", "failed")

    def is_unsuccessfully_finished(self) -> bool:
        return self.name in ("aborted", "failed")

    def is_running(self) -> bool:
        return self.name == "running"

    def is_starting(self) -> bool:
        return self.name in (
            "draft",
            "pending",
        )

    def __eq__(self, other: Any) -> bool:
        if not isinstance(other, YtQueryState):
            return False
        return self.name == str(other)

    def __ne__(self, other: Any) -> bool:
        return not (self == other)

    def __repr__(self) -> str:
        return self.name

    def __str__(self) -> str:
        return self.name


class YtQuery:
    def __init__(
        self,
        id: str,
        client: YtClient,
        engine: TQueryEngine,
        stage: str | None = None,
        abort_exceptions: Iterable[Type[BaseException]] | None = None,
        finalization_actions: Iterable[Callable[[YtQueryState], None]] | None = None,
    ):
        self.id = id
        self.stage = stage
        self.client = client
        self.engine: TQueryEngine = engine
        self.client_config = get_config_from_yt_client(self.client)
        self.url = self.client_config.get_query_url(self.id)
        self.abort_exceptions: tuple[Type[BaseException], ...] = (
            tuple(abort_exceptions)
            if abort_exceptions is not None
            else (
                KeyboardInterrupt,
                TimeoutError,
            )
        )
        self.finalization_actions = finalization_actions or []

    @classmethod
    def from_query_id(
        cls,
        query_id: str,
        client: YtClient,
        stage: str | None = None,
    ) -> Self:
        return cls(
            id=query_id,
            client=client,
            engine=typing_cast(TQueryEngine, client.get_query(query_id, attributes=["engine"], stage=stage)["engine"]),
            stage=stage,
        )

    def get_info(self, attributes: list[str] | None = None, format: TFormat | None = None) -> YsonMap:
        return self.client.get_query(self.id, attributes=attributes, stage=self.stage, format=format)

    def get_state(self) -> YtQueryState:
        return YtQueryState(self.get_info(attributes=["state"])["state"])

    def get_result(
        self,
        result_index: int | None = None,
        format: TFormat | None = None,
    ) -> YsonMap:
        return self.client.get_query_result(self.id, result_index=result_index, stage=self.stage, format=format)

    def read_result(
        self,
        result_index: int | None = None,
        format: TFormat | None = None,
        raw: bool | None = None,
    ) -> ResponseStream:
        return self.client.read_query_result(
            self.id, result_index=result_index, stage=self.stage, format=format, raw=raw
        )

    def get_state_monitor(
        self, time_watcher: TimeWatcher, action: Callable[[], None] = lambda: None
    ) -> Iterable[YtQueryState]:
        last_state = None
        while True:
            action()

            state = self.get_state()
            yield state
            if state.is_finished():
                break

            if state != last_state:
                time_watcher.reset()
            last_state = state

            time_watcher.wait()

    def abort(self, message: str | None = None):
        state = self.get_state()
        if state.is_finished():
            return
        if state.name in ("aborting",):
            return

        # A race is possible here, but we can't do anything about it: https://github.com/ytsaurus/ytsaurus/issues/548
        return self.client.abort_query(self.id, message=message, stage=self.stage)

    def wait(self, check_result: bool = True, print_progress: bool = True, timeout_ms: int | None = None):
        start_time = time.time()
        deadline = start_time + timeout_ms / 1000.0 if timeout_ms is not None else None

        timeout = None if timeout_ms is None else timeout_ms / 1000.0
        query_poll_period = self.client_config.query_tracker_poll_period_ms / 1000.0
        if timeout is not None and query_poll_period > timeout:
            query_poll_period = timeout
        time_watcher = TimeWatcher(
            min_interval=query_poll_period / 10.0, max_interval=query_poll_period, slowdown_coef=0.2
        )

        def _print_info(state: YtQueryState):
            match self.engine:
                case "yql":
                    state_counters = defaultdict(int)
                    info = self.get_info(attributes=["progress"])
                    yql_progress = info.get("progress", {}).get("yql_progress", {})
                    for node in yql_progress.values():
                        state_counters[node["state"]] += 1

                    logger.info(f"query {self.id}: {_format_yql_progress(state_counters)}")
                case _:
                    logger.info(f"query {self.id}: {state}")

        print_info = _print_info if print_progress else lambda _: None

        def abort():
            state = None
            for state in self.get_state_monitor(TimeWatcher(1.0, 1.0, 0.0), self.abort):
                print_info(state)

            assert state is not None
            for finalize_function in self.finalization_actions:
                finalize_function(state)

        abort_on_sigint = self.client_config.query_tracker_abort_on_sigint

        with ExceptionCatcher(self.abort_exceptions, abort, enable=abort_on_sigint):
            state = None
            for state in self.get_state_monitor(time_watcher):
                print_info(state)
                if deadline is not None:
                    current_time = time.time()
                    if current_time > deadline:
                        raise TimeoutError(
                            f"query {self.id} timed out, start_time: {_pretty_print_time(start_time)}, current_time: {_pretty_print_time(current_time)}, timeout_ms: {timeout_ms}ms, deadline: {_pretty_print_time(deadline)}"
                        )

            assert state is not None
            for finalize_function in self.finalization_actions:
                finalize_function(state)

        if state.is_unsuccessfully_finished():
            if check_result:
                query_info = self.get_info(attributes=["error"])
                yt_error = None
                if query_info["error"]["code"] != 0:
                    yt_error = YtError.from_dict(query_info["error"])

                raise YtQueryFailedError(
                    id=self.id,
                    engine=self.engine,
                    state=str(state),
                    error=yt_error,
                    url=self.url,
                )
            else:
                logger.warning(f"query {self.id} finished unsuccessfully, but result is not checked, state: {state}")
        else:
            logger.info(f"query {self.id} completed, final state: {state}")


@func_with_param_spec_from(YtClient.start_query)
def start_query(
    client: YtClient,
    engine: TQueryEngine,
    query: str,
    *args: Any,
    stage: str | None = None,
    **kwargs: Any,
) -> YtQuery:
    client_config = get_config_from_yt_client(client)
    query_id = client.start_query(engine, query, *args, stage=stage, **kwargs)
    logger.info(f"{engine.upper()} query started: {client_config.get_query_url(query_id)}")
    return YtQuery(
        id=query_id,
        engine=engine,
        stage=stage,
        client=client,
    )


def run_query(
    client: YtClient,
    engine: TQueryEngine,
    query: str,
    settings: dict[str, Any] | None = None,
    files: list[TQueryFile] | None = None,
    stage: str | None = None,
    annotations: dict[str, Any] | None = None,
    access_control_object: str | None = None,
    sync: bool = True,
) -> YtQuery:
    """
    Start query.

    :param engine: one of "ql", "yql".
    :type engine: str
    :param query: text of a query
    :type query: str
    :param settings: a dictionary of settings
    :type settings: dict or None
    :param files: a YSON list of files, each of which is represented by a map with keys "name", "content", "type". Field "type" is one of "raw_inline_data", "url"
    :type files: list or None
    :param stage: query tracker stage, defaults to "production"
    :type stage: str
    :param annotations: a dictionary of annotations
    :type stage: dict or None
    :param access_control_object: access control object name
    :type access_control_object: str or None
    :param sync: wait for query to finish
    :type sync: bool

    """
    yt_query = start_query(
        client,
        engine,
        query,
        settings=settings,
        files=files,
        stage=stage,
        annotations=annotations,
        access_control_object=access_control_object,
    )
    if sync:
        yt_query.wait()

    return yt_query
Log examples
2024-04-29T19:53:19.655 INFO[query] YQL query started: https://my.super.cluster/my_cluster/queries/fdfd51c4-774ad1c3-1919dde1-61e86113
2024-04-29T19:57:15.675 INFO[query] query 18e150e1-6882d4d5-80b70d56-d52042a7: started=0     in_progress=0     finished=0     failed=0     aborted=0    
2024-04-29T19:57:16.238 INFO[query] query 18e150e1-6882d4d5-80b70d56-d52042a7: started=0     in_progress=0     finished=0     failed=0     aborted=0    
2024-04-29T19:57:16.790 INFO[query] query 18e150e1-6882d4d5-80b70d56-d52042a7: started=0     in_progress=0     finished=0     failed=0     aborted=0    
2024-04-29T19:57:17.341 INFO[query] query 18e150e1-6882d4d5-80b70d56-d52042a7: started=0     in_progress=0     finished=0     failed=0     aborted=0    
2024-04-29T19:57:17.891 INFO[query] query 18e150e1-6882d4d5-80b70d56-d52042a7: started=0     in_progress=0     finished=0     failed=0     aborted=0    
2024-04-29T19:57:18.443 INFO[query] query 18e150e1-6882d4d5-80b70d56-d52042a7: started=0     in_progress=1     finished=2     failed=0     aborted=0    
2024-04-29T19:57:18.996 INFO[query] query 18e150e1-6882d4d5-80b70d56-d52042a7: started=0     in_progress=1     finished=2     failed=0     aborted=0    
2024-04-29T19:57:19.549 INFO[query] query 18e150e1-6882d4d5-80b70d56-d52042a7: started=0     in_progress=1     finished=2     failed=0     aborted=0    
2024-04-29T19:57:20.102 INFO[query] query 18e150e1-6882d4d5-80b70d56-d52042a7: started=0     in_progress=1     finished=2     failed=0     aborted=0    
2024-04-29T19:57:20.756 INFO[query] query 18e150e1-6882d4d5-80b70d56-d52042a7: started=0     in_progress=1     finished=2     failed=0     aborted=0    
2024-04-29T19:57:21.528 INFO[query] query 18e150e1-6882d4d5-80b70d56-d52042a7: started=0     in_progress=1     finished=2     failed=0     aborted=0    
2024-04-29T19:57:22.444 INFO[query] query 18e150e1-6882d4d5-80b70d56-d52042a7: started=0     in_progress=1     finished=2     failed=0     aborted=0    
2024-04-29T19:57:23.531 INFO[query] query 18e150e1-6882d4d5-80b70d56-d52042a7: started=0     in_progress=1     finished=2     failed=0     aborted=0    
2024-04-29T19:57:24.826 INFO[query] query 18e150e1-6882d4d5-80b70d56-d52042a7: started=0     in_progress=1     finished=2     failed=0     aborted=0    
2024-04-29T19:57:26.369 INFO[query] query 18e150e1-6882d4d5-80b70d56-d52042a7: started=0     in_progress=1     finished=2     failed=0     aborted=0    
2024-04-29T19:57:28.211 INFO[query] query 18e150e1-6882d4d5-80b70d56-d52042a7: started=0     in_progress=1     finished=2     failed=0     aborted=0    
2024-04-29T19:57:30.411 INFO[query] query 18e150e1-6882d4d5-80b70d56-d52042a7: started=0     in_progress=1     finished=3     failed=0     aborted=0    
2024-04-29T19:57:33.041 INFO[query] query 18e150e1-6882d4d5-80b70d56-d52042a7: started=0     in_progress=1     finished=3     failed=0     aborted=0    
2024-04-29T19:57:36.188 INFO[query] query 18e150e1-6882d4d5-80b70d56-d52042a7: started=0     in_progress=1     finished=3     failed=0     aborted=0    
2024-04-29T19:57:39.958 INFO[query] query 18e150e1-6882d4d5-80b70d56-d52042a7: started=0     in_progress=1     finished=3     failed=0     aborted=0    
2024-04-29T19:57:44.468 INFO[query] query 18e150e1-6882d4d5-80b70d56-d52042a7: started=0     in_progress=1     finished=3     failed=0     aborted=0    
2024-04-29T19:57:49.519 INFO[query] query 18e150e1-6882d4d5-80b70d56-d52042a7: started=0     in_progress=1     finished=4     failed=0     aborted=0    
2024-04-29T19:57:54.571 INFO[query] query 18e150e1-6882d4d5-80b70d56-d52042a7: started=0     in_progress=1     finished=4     failed=0     aborted=0    
2024-04-29T19:57:59.621 INFO[query] query 18e150e1-6882d4d5-80b70d56-d52042a7: started=0     in_progress=1     finished=7     failed=0     aborted=0    
2024-04-29T19:58:04.672 INFO[query] query 18e150e1-6882d4d5-80b70d56-d52042a7: started=0     in_progress=1     finished=7     failed=0     aborted=0    
2024-04-29T19:58:09.724 INFO[query] query 18e150e1-6882d4d5-80b70d56-d52042a7: started=0     in_progress=0     finished=14    failed=0     aborted=0    
2024-04-29T19:58:09.724 INFO[query] query 18e150e1-6882d4d5-80b70d56-d52042a7 completed, final state: completed
2024-04-29T20:08:16.506 INFO[query] YQL query started: https://my.super.cluster/my_cluster/queries/cd6cdc93-6b1497-f4288744-d9873cf1
2024-04-29T20:08:16.619 INFO[query] query cd6cdc93-6b1497-f4288744-d9873cf1: started=0     in_progress=0     finished=0     failed=0     aborted=0    
2024-04-29T20:08:17.231 INFO[query] query cd6cdc93-6b1497-f4288744-d9873cf1: started=0     in_progress=0     finished=0     failed=0     aborted=0    
2024-04-29T20:08:17.840 INFO[query] query cd6cdc93-6b1497-f4288744-d9873cf1: started=0     in_progress=0     finished=0     failed=0     aborted=0    
2024-04-29T20:08:18.448 INFO[query] query cd6cdc93-6b1497-f4288744-d9873cf1: started=0     in_progress=0     finished=0     failed=0     aborted=0    
2024-04-29T20:08:19.059 INFO[query] query cd6cdc93-6b1497-f4288744-d9873cf1: started=0     in_progress=0     finished=0     failed=0     aborted=0    
Traceback (most recent call last):
# ...
  File "base/py/yt/query.py", line 258, in wait
    raise YtQueryFailedError(
twix.base.py.yt.query.YtQueryFailedError: Query cd6cdc93-6b1497-f4288744-d9873cf1 failed
    Query cd6cdc93-6b1497-f4288744-d9873cf1 failed
        YQL plugin call failed
            There are some issues
                Parse Sql
                    Unexpected token 'kw' : cannot match to any predicted input...
***** Details:
Query cd6cdc93-6b1497-f4288744-d9873cf1 failed    
    origin          eba930bd-f815-4432-8ee7-7b4d3720af8b on 2024-04-29T20:08:19.117577Z    
    id              cd6cdc93-6b1497-f4288744-d9873cf1    
    engine          yql    
    state           failed    
    url             https://my.super.cluster/my_cluster/queries/cd6cdc93-6b1497-f4288744-d9873cf1
Query cd6cdc93-6b1497-f4288744-d9873cf1 failed    
    origin          eu-north1-c-4ct4-30b.hw.my.super.cluster on 2024-04-29T20:08:18.747946Z (pid 1, tid 126d4914eb502001, fid fffee13649df9f03)    
    thread          Control    
    trace_id        93f21203-1701aa6d-52014f8e-92a63e18    
    span_id         966787753879437561    
    query_id        cd6cdc93-6b1497-f4288744-d9873cf1
YQL plugin call failed    
# ...

But it is incomplete (only simple query wrapper and wait method for it).

@Kontakter Kontakter added the query tracker YQL/query tracker related label Apr 30, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
query tracker YQL/query tracker related
Projects
None yet
Development

No branches or pull requests

3 participants