Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[refactor] #4039: Enhance ClientCli Wait Mechanism for Transaction Completion in pytests and Remove Fixed Sleeps #4340

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/iroha2-dev-pr.yml
Expand Up @@ -8,6 +8,7 @@ on:
- '**.json'
- '**.toml'
- '**.lock'
- '**.py'
- '.github/workflows/iroha2-dev-pr.yml'

concurrency:
Expand Down
14 changes: 7 additions & 7 deletions client_cli/pytests/common/helpers.py
Expand Up @@ -29,15 +29,15 @@ def extract_hash(stdout):
return match.group(1) if match else None


def get_peers_config_files(path_to_configs):
def get_peers_ports_list(port_min=8080, port_max=8083):
"""
Returns a list of config file paths from the given directory.
Returns a list of peer ports within the specified range.

port_min (int): The minimum port number in the range. Default is 8080.
port_max (int): The maximum port number in the range. Default is 8083.
"""
config_files = []
for entry in os.listdir(path_to_configs):
if entry.endswith(".json") and "config_to_peer" in entry:
config_files.append(os.path.join(path_to_configs, entry))
return config_files

return [port for port in range(port_min, port_max + 1)]


def read_isi_from_json(file_path):
Expand Down
3 changes: 0 additions & 3 deletions client_cli/pytests/common/settings.py
Expand Up @@ -11,11 +11,8 @@

BASE_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

ROOT_DIR = os.environ.get("CLIENT_CLI_DIR", BASE_DIR)

PATH_CONFIG_CLIENT_CLI = os.environ["CLIENT_CLI_CONFIG"]
CLIENT_CLI_PATH = os.environ["CLIENT_CLI_BINARY"]
PEERS_CONFIGS_PATH = os.path.join(ROOT_DIR, "peers_configs")

PORT_MIN = int(os.getenv("TORII_API_PORT_MIN", "8080"))
PORT_MAX = int(os.getenv("TORII_API_PORT_MAX", "8083"))
6 changes: 3 additions & 3 deletions client_cli/pytests/src/client_cli/__init__.py
Expand Up @@ -3,9 +3,9 @@
"""

from common.settings import PATH_CONFIG_CLIENT_CLI, PORT_MAX, PORT_MIN
from src.client_cli.client_cli import ClientCli
from src.client_cli.configuration import Config
from src.client_cli.iroha import Iroha
from .client_cli import ClientCli
from .configuration import Config
from .iroha import Iroha

config = Config(PORT_MIN, PORT_MAX)
config.load(PATH_CONFIG_CLIENT_CLI)
Expand Down
93 changes: 88 additions & 5 deletions client_cli/pytests/src/client_cli/client_cli.py
Expand Up @@ -3,16 +3,19 @@
commands for interacting with Iroha blockchain using the Iroha command-line client.
"""

import json
import shlex
import subprocess
import threading
from json import JSONDecoder
from pathlib import Path
from time import monotonic, sleep
from typing import Callable
from typing import Callable, Dict, Any

import allure # type: ignore

from common.helpers import extract_hash, read_isi_from_json, write_isi_to_json
from common.settings import BASE_DIR, CLIENT_CLI_PATH, PATH_CONFIG_CLIENT_CLI, ROOT_DIR
from common.settings import BASE_DIR, CLIENT_CLI_PATH, PATH_CONFIG_CLIENT_CLI
from src.client_cli.configuration import Config


Expand All @@ -35,6 +38,9 @@ def __init__(self, config: Config):
self.stderr = None
self.transaction_hash = None
self._timeout = 20
self.event_data: Dict[str, Any] = {}
self.event_data_lock = threading.Lock()
self.should_continue_listening = True

def __enter__(self):
"""
Expand All @@ -53,6 +59,37 @@ def __exit__(self, exc_type, exc_val, exc_tb):
"""
self.reset()

def start_listening_to_events(self, peers_ports):
"""
Initializes listening to events on all peers.
"""
self.transaction_status = {}
self.threads = []
for port in peers_ports:
self.config.update_torii_url(port)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part looks like every thread will see only latest call to update_torii_url.

thread = threading.Thread(target=self.listen_to_events, args=(port,))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

listen_to_events expects config_path but you are passing port here, looks suspicious

self.threads.append(thread)
thread.start()

def listen_to_events(self, config_path):
"""
Listens to the events using the specified configuration file and stores them.
"""
command = [self.BASE_PATH] + ["--config=" + config_path, "events", "pipeline"]
with subprocess.Popen(command, stdout=subprocess.PIPE, text=True) as process:
while self.should_continue_listening:
output = process.stdout.readline()
if not output:
break
with self.event_data_lock:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need a lock here, python dict is thread-safe due to GIL.

if config_path in self.event_data:
self.event_data[config_path] += output
else:
self.event_data[config_path] = output
Comment on lines +85 to +88
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of default dict you can use deafultdict(str) here, this way code could be simplified:

Suggested change
if config_path in self.event_data:
self.event_data[config_path] += output
else:
self.event_data[config_path] = output
self.event_data[config_path] += output


def stop_listening(self):
self.should_continue_listening = False

def wait_for(self, condition: Callable[[], bool], timeout=None):
"""
Wait for a certain condition to be met, specified by the expected and actual values.
Expand Down Expand Up @@ -278,7 +315,7 @@ def register_trigger(self, account):
trigger_data = read_isi_from_json(str(json_template_path))
trigger_data[0]["Register"]["Trigger"]["action"]["authority"] = str(account)

json_temp_file_path = Path(ROOT_DIR) / "isi_register_trigger.json"
json_temp_file_path = Path(CLIENT_CLI_PATH) / "isi_register_trigger.json"
write_isi_to_json(trigger_data, str(json_temp_file_path))

self._execute_pipe(
Expand Down Expand Up @@ -306,7 +343,7 @@ def unregister_asset(self, asset_id):
asset_data = read_isi_from_json(str(json_template_path))
asset_data[0]["Unregister"]["Asset"]["object_id"] = str(asset_id)

json_temp_file_path = Path(ROOT_DIR) / "isi_unregister_asset.json"
json_temp_file_path = Path(CLIENT_CLI_PATH) / "isi_unregister_asset.json"
write_isi_to_json(asset_data, str(json_temp_file_path))

self._execute_pipe(
Expand Down Expand Up @@ -338,7 +375,10 @@ def execute(self, command=None):
if command is None:
command = self.command
else:
command = [self.BASE_PATH] + self.BASE_FLAGS + shlex.split(command)
if isinstance(command, str):
command = [self.BASE_PATH] + self.BASE_FLAGS + shlex.split(command)
elif isinstance(command, list):
command = [self.BASE_PATH] + self.BASE_FLAGS + command

if "|" in command:
pipe_index = command.index("|")
Expand Down Expand Up @@ -373,6 +413,7 @@ def _execute_single(self, command):
"""
Executes a single command.
"""
print(" ".join(command) + "\n")
with subprocess.Popen(
command,
stdout=subprocess.PIPE,
Expand All @@ -384,6 +425,48 @@ def _execute_single(self, command):
self.transaction_hash = extract_hash(self.stdout)
self._attach_allure_reports()

def wait_for_transaction_commit(self, transaction_hash, timeout=1):
"""
Waits for the transaction with the given hash to be committed in all configs.
"""

def is_transaction_committed():
return self.is_transaction_committed(transaction_hash)

try:
self.wait_for(is_transaction_committed, timeout)
return True
except TimeoutError:
return False

def is_transaction_committed(self, transaction_hash):
"""
Checks if the transaction with the given hash is committed in all configs.
"""
with self.event_data_lock:
for config_path, data in self.event_data.items():
if not self._check_commit_in_output(transaction_hash, data):
return False
return True

def _check_commit_in_output(self, transaction_hash, output):
"""
Parses the output to check if the transaction with the given hash is committed.
"""
decoder = JSONDecoder()
idx = 0
try:
while idx < len(output):
obj, idx_next = decoder.raw_decode(output[idx:])
idx += idx_next
match obj:
case {"Pipeline": {"entity_kind": "Transaction", "status": "Committed",
"hash": hash}} if hash == transaction_hash:
return True
except json.JSONDecodeError:
return False
return False

def _attach_allure_reports(self):
"""
Attaches stdout and stderr to Allure reports.
Expand Down
71 changes: 35 additions & 36 deletions client_cli/pytests/src/client_cli/configuration.py
Expand Up @@ -4,17 +4,17 @@

import tomlkit
import glob
import json
import os
import random
from urllib.parse import urlparse


class Config:
"""
Configuration class to handle Iroha network configuration. The class provides methods for loading
the configuration from a file, accessing the configuration values, and randomising Torii URL
to access different peers.
Configuration class to handle Iroha network configuration.
The class provides methods for loading the configuration from a file,
accessing the configuration values,
and randomising Torii URL to access different peers.

:param port_min: The minimum port number for the TORII_API_URL.
:type port_min: int
Expand All @@ -35,48 +35,30 @@ def load(self, path_config_client_cli):

:param path_config_client_cli: The path to the configuration file.
:type path_config_client_cli: str
:raises IOError: If the file does not exist.
:raises IOError: If the file does not exist or is not a file.
:raises ValueError: If the configuration file is invalid.
"""
if not os.path.exists(path_config_client_cli):
raise IOError(f"No config file found at {path_config_client_cli}")

if not os.path.isfile(path_config_client_cli):
raise IOError(f"The path is not a file: {path_config_client_cli}")

with open(path_config_client_cli, "r", encoding="utf-8") as config_file:
self._config = tomlkit.load(config_file)
self.file = path_config_client_cli

def generate_by_peers(self, peers_configs_dir):
"""
Generate configuration files for each port in the range from port_min to port_max.
"""
if self._config is None:
raise ValueError(
"No configuration loaded. Use load() method to load the configuration."
)

if self.port_min >= self.port_max:
raise ValueError("port_min must be less than port_max.")

os.makedirs(peers_configs_dir, exist_ok=True)
try:
with open(path_config_client_cli, "r", encoding="utf-8") as config_file:
self._config = tomlkit.load(config_file)
except Exception as e:
raise ValueError(f"Error reading configuration file: {e}")

for port in range(self.port_min, self.port_max + 1):
config_copy = self._config.copy()
config_copy["TORII_API_URL"] = f"http://localhost:{port}"
file_name = f"config_to_peer_{port}.json"
file_path = os.path.join(peers_configs_dir, file_name)
with open(file_path, "w", encoding="utf-8") as config_file:
json.dump(config_copy, config_file, indent=4)
self.file = path_config_client_cli

def select_random_peer_config(self):
def select_random_peer_config(self, peers_configs_dir):
"""
Select and load a random configuration file generated by the generate_by_peers method.
This updates the current configuration to the one chosen.

:return: None
"""
peers_configs = glob.glob("path/to/peers/configs/*.json")
peers_configs = glob.glob(peers_configs_dir + "/*.toml")
if not peers_configs:
raise ValueError(
"Peer configuration files not found. First generate them using generate_by_peers."
Expand All @@ -88,17 +70,34 @@ def select_random_peer_config(self):

def randomise_torii_url(self):
"""
Update Torii URL.
Randomise the Torii URL port.
Note that in order for update to take effect,
`self.env` should be used when executing the client cli.

:return: None
"""
parsed_url = urlparse(self._config["torii_url"])
random_port = random.randint(self.port_min, self.port_max)
self._envs["TORII_URL"] = parsed_url._replace(
netloc=f"{parsed_url.hostname}:{random_port}"
self.update_torii_url(random_port)

def update_torii_url(self, port):
"""
Update the Torii URL in the current configuration.

:param port: Port to use in the Torii URL.
:type port: int
:raises ValueError: If the port is outside the allowed range.
"""
if port < self.port_min or port > self.port_max:
raise ValueError("Port is out of allowed range.")

if self._config is None:
raise ValueError("No configuration loaded. Use load() method first.")

parsed_url = urlparse(self._config["torii_url"])
updated_url = parsed_url._replace(
netloc=f"{parsed_url.hostname}:{port}"
).geturl()
self._envs["TORII_URL"] = updated_url

@property
def torii_url(self):
Expand Down
5 changes: 3 additions & 2 deletions client_cli/pytests/test/conftest.py
Expand Up @@ -14,13 +14,14 @@
generate_random_string_with_reserved_char,
generate_random_string_with_whitespace,
generate_random_string_without_reserved_chars,
get_peers_ports_list,
key_with_invalid_character_in_key,
name_with_uppercase_letter,
not_existing_name,
random,
string,
)
from common.settings import PEERS_CONFIGS_PATH
from common.settings import PORT_MIN, PORT_MAX
from models import Account, Asset, AssetDefinition, Domain
from src.client_cli import client_cli, config

Expand All @@ -31,7 +32,7 @@ def before_all():
"""Initial setup for all test sessions.
This fixture generates configurations based on peers and is automatically
used for every test session."""
config.generate_by_peers(PEERS_CONFIGS_PATH)
client_cli.start_listening_to_events(get_peers_ports_list(PORT_MIN, PORT_MAX))
yield


Expand Down