New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[refactor] #4039: Enhance ClientCli Wait Mechanism for Transaction Completion in pytests and Remove Fixed Sleeps #4340
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -3,16 +3,19 @@ | |||||||||||
commands for interacting with Iroha blockchain using the Iroha command-line client. | ||||||||||||
""" | ||||||||||||
|
||||||||||||
import json | ||||||||||||
import shlex | ||||||||||||
import subprocess | ||||||||||||
import threading | ||||||||||||
from json import JSONDecoder | ||||||||||||
from pathlib import Path | ||||||||||||
from time import monotonic, sleep | ||||||||||||
from typing import Callable | ||||||||||||
from typing import Callable, Dict, Any | ||||||||||||
|
||||||||||||
import allure # type: ignore | ||||||||||||
|
||||||||||||
from common.helpers import extract_hash, read_isi_from_json, write_isi_to_json | ||||||||||||
from common.settings import BASE_DIR, CLIENT_CLI_PATH, PATH_CONFIG_CLIENT_CLI, ROOT_DIR | ||||||||||||
from common.settings import BASE_DIR, CLIENT_CLI_PATH, PATH_CONFIG_CLIENT_CLI | ||||||||||||
from src.client_cli.configuration import Config | ||||||||||||
|
||||||||||||
|
||||||||||||
|
@@ -35,6 +38,9 @@ def __init__(self, config: Config): | |||||||||||
self.stderr = None | ||||||||||||
self.transaction_hash = None | ||||||||||||
self._timeout = 20 | ||||||||||||
self.event_data: Dict[str, Any] = {} | ||||||||||||
self.event_data_lock = threading.Lock() | ||||||||||||
self.should_continue_listening = True | ||||||||||||
|
||||||||||||
def __enter__(self): | ||||||||||||
""" | ||||||||||||
|
@@ -53,6 +59,37 @@ def __exit__(self, exc_type, exc_val, exc_tb): | |||||||||||
""" | ||||||||||||
self.reset() | ||||||||||||
|
||||||||||||
def start_listening_to_events(self, peers_ports): | ||||||||||||
""" | ||||||||||||
Initializes listening to events on all peers. | ||||||||||||
""" | ||||||||||||
self.transaction_status = {} | ||||||||||||
self.threads = [] | ||||||||||||
for port in peers_ports: | ||||||||||||
self.config.update_torii_url(port) | ||||||||||||
thread = threading.Thread(target=self.listen_to_events, args=(port,)) | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||||||||||||
self.threads.append(thread) | ||||||||||||
thread.start() | ||||||||||||
|
||||||||||||
def listen_to_events(self, config_path): | ||||||||||||
""" | ||||||||||||
Listens to the events using the specified configuration file and stores them. | ||||||||||||
""" | ||||||||||||
command = [self.BASE_PATH] + ["--config=" + config_path, "events", "pipeline"] | ||||||||||||
with subprocess.Popen(command, stdout=subprocess.PIPE, text=True) as process: | ||||||||||||
while self.should_continue_listening: | ||||||||||||
output = process.stdout.readline() | ||||||||||||
if not output: | ||||||||||||
break | ||||||||||||
with self.event_data_lock: | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think you need a lock here, python |
||||||||||||
if config_path in self.event_data: | ||||||||||||
self.event_data[config_path] += output | ||||||||||||
else: | ||||||||||||
self.event_data[config_path] = output | ||||||||||||
Comment on lines
+85
to
+88
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of default dict you can use
Suggested change
|
||||||||||||
|
||||||||||||
def stop_listening(self): | ||||||||||||
self.should_continue_listening = False | ||||||||||||
|
||||||||||||
def wait_for(self, condition: Callable[[], bool], timeout=None): | ||||||||||||
""" | ||||||||||||
Wait for a certain condition to be met, specified by the expected and actual values. | ||||||||||||
|
@@ -278,7 +315,7 @@ def register_trigger(self, account): | |||||||||||
trigger_data = read_isi_from_json(str(json_template_path)) | ||||||||||||
trigger_data[0]["Register"]["Trigger"]["action"]["authority"] = str(account) | ||||||||||||
|
||||||||||||
json_temp_file_path = Path(ROOT_DIR) / "isi_register_trigger.json" | ||||||||||||
json_temp_file_path = Path(CLIENT_CLI_PATH) / "isi_register_trigger.json" | ||||||||||||
write_isi_to_json(trigger_data, str(json_temp_file_path)) | ||||||||||||
|
||||||||||||
self._execute_pipe( | ||||||||||||
|
@@ -306,7 +343,7 @@ def unregister_asset(self, asset_id): | |||||||||||
asset_data = read_isi_from_json(str(json_template_path)) | ||||||||||||
asset_data[0]["Unregister"]["Asset"]["object_id"] = str(asset_id) | ||||||||||||
|
||||||||||||
json_temp_file_path = Path(ROOT_DIR) / "isi_unregister_asset.json" | ||||||||||||
json_temp_file_path = Path(CLIENT_CLI_PATH) / "isi_unregister_asset.json" | ||||||||||||
write_isi_to_json(asset_data, str(json_temp_file_path)) | ||||||||||||
|
||||||||||||
self._execute_pipe( | ||||||||||||
|
@@ -338,7 +375,10 @@ def execute(self, command=None): | |||||||||||
if command is None: | ||||||||||||
command = self.command | ||||||||||||
else: | ||||||||||||
command = [self.BASE_PATH] + self.BASE_FLAGS + shlex.split(command) | ||||||||||||
if isinstance(command, str): | ||||||||||||
command = [self.BASE_PATH] + self.BASE_FLAGS + shlex.split(command) | ||||||||||||
elif isinstance(command, list): | ||||||||||||
command = [self.BASE_PATH] + self.BASE_FLAGS + command | ||||||||||||
|
||||||||||||
if "|" in command: | ||||||||||||
pipe_index = command.index("|") | ||||||||||||
|
@@ -373,6 +413,7 @@ def _execute_single(self, command): | |||||||||||
""" | ||||||||||||
Executes a single command. | ||||||||||||
""" | ||||||||||||
print(" ".join(command) + "\n") | ||||||||||||
with subprocess.Popen( | ||||||||||||
command, | ||||||||||||
stdout=subprocess.PIPE, | ||||||||||||
|
@@ -384,6 +425,48 @@ def _execute_single(self, command): | |||||||||||
self.transaction_hash = extract_hash(self.stdout) | ||||||||||||
self._attach_allure_reports() | ||||||||||||
|
||||||||||||
def wait_for_transaction_commit(self, transaction_hash, timeout=1): | ||||||||||||
""" | ||||||||||||
Waits for the transaction with the given hash to be committed in all configs. | ||||||||||||
""" | ||||||||||||
|
||||||||||||
def is_transaction_committed(): | ||||||||||||
return self.is_transaction_committed(transaction_hash) | ||||||||||||
|
||||||||||||
try: | ||||||||||||
self.wait_for(is_transaction_committed, timeout) | ||||||||||||
return True | ||||||||||||
except TimeoutError: | ||||||||||||
return False | ||||||||||||
|
||||||||||||
def is_transaction_committed(self, transaction_hash): | ||||||||||||
""" | ||||||||||||
Checks if the transaction with the given hash is committed in all configs. | ||||||||||||
""" | ||||||||||||
with self.event_data_lock: | ||||||||||||
for config_path, data in self.event_data.items(): | ||||||||||||
if not self._check_commit_in_output(transaction_hash, data): | ||||||||||||
return False | ||||||||||||
return True | ||||||||||||
|
||||||||||||
def _check_commit_in_output(self, transaction_hash, output): | ||||||||||||
""" | ||||||||||||
Parses the output to check if the transaction with the given hash is committed. | ||||||||||||
""" | ||||||||||||
decoder = JSONDecoder() | ||||||||||||
idx = 0 | ||||||||||||
try: | ||||||||||||
while idx < len(output): | ||||||||||||
obj, idx_next = decoder.raw_decode(output[idx:]) | ||||||||||||
idx += idx_next | ||||||||||||
match obj: | ||||||||||||
case {"Pipeline": {"entity_kind": "Transaction", "status": "Committed", | ||||||||||||
"hash": hash}} if hash == transaction_hash: | ||||||||||||
return True | ||||||||||||
except json.JSONDecodeError: | ||||||||||||
return False | ||||||||||||
return False | ||||||||||||
|
||||||||||||
def _attach_allure_reports(self): | ||||||||||||
""" | ||||||||||||
Attaches stdout and stderr to Allure reports. | ||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This part looks like every thread will see only latest call to
update_torii_url
.