Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix flaky tests 2 (stateless, integration) #61869

Merged
merged 6 commits into from Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Expand Up @@ -164,6 +164,9 @@ tests/queries/0_stateless/*.generated-expect
tests/queries/0_stateless/*.expect.history
tests/integration/**/_gen

# pytest --pdb history
.pdb_history

# rust
/rust/**/target*
# It is autogenerated from *.in
Expand Down
28 changes: 28 additions & 0 deletions tests/integration/conftest.py
Expand Up @@ -13,6 +13,34 @@
logging.raiseExceptions = False


@pytest.fixture(scope="session", autouse=True)
def pdb_history(request):
"""
Fixture loads and saves pdb history to file, so it can be preserved between runs
"""
if request.config.getoption("--pdb"):
import readline # pylint:disable=import-outside-toplevel
import pdb # pylint:disable=import-outside-toplevel

def save_history():
readline.write_history_file(".pdb_history")

def load_history():
try:
readline.read_history_file(".pdb_history")
except FileNotFoundError:
pass

load_history()
pdb.Pdb.use_rawinput = True

yield

save_history()
else:
yield


@pytest.fixture(autouse=True, scope="session")
def tune_local_port_range():
# Lots of services uses non privileged ports:
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/helpers/cluster.py
Expand Up @@ -862,12 +862,12 @@ def cleanup(self):

def get_docker_handle(self, docker_id):
exception = None
for i in range(5):
for i in range(20):
try:
return self.docker_client.containers.get(docker_id)
except Exception as ex:
print("Got exception getting docker handle", str(ex))
time.sleep(i * 2)
time.sleep(0.5)
exception = ex
raise exception

Expand Down
70 changes: 65 additions & 5 deletions tests/integration/test_backup_restore_on_cluster/test.py
@@ -1,4 +1,3 @@
from time import sleep
import pytest
import re
import os.path
Expand Down Expand Up @@ -164,8 +163,15 @@ def test_replicated_database():
node2.query("INSERT INTO mydb.tbl VALUES (2, 'count')")
node1.query("INSERT INTO mydb.tbl VALUES (3, 'your')")
node2.query("INSERT INTO mydb.tbl VALUES (4, 'chickens')")
node1.query("OPTIMIZE TABLE mydb.tbl ON CLUSTER 'cluster' FINAL")

fm4v marked this conversation as resolved.
Show resolved Hide resolved
node1.query("SYSTEM SYNC REPLICA ON CLUSTER 'cluster' mydb.tbl")

# check data in sync
expect = TSV([[1, "Don\\'t"], [2, "count"], [3, "your"], [4, "chickens"]])
assert node1.query("SELECT * FROM mydb.tbl ORDER BY x") == expect
assert node2.query("SELECT * FROM mydb.tbl ORDER BY x") == expect

# Make backup.
backup_name = new_backup_name()
node1.query(
Expand All @@ -179,14 +185,63 @@ def test_replicated_database():
node1.query(f"RESTORE DATABASE mydb ON CLUSTER 'cluster' FROM {backup_name}")
node1.query("SYSTEM SYNC REPLICA ON CLUSTER 'cluster' mydb.tbl")

assert node1.query("SELECT * FROM mydb.tbl ORDER BY x") == TSV(
[[1, "Don\\'t"], [2, "count"], [3, "your"], [4, "chickens"]]
assert node1.query("SELECT * FROM mydb.tbl ORDER BY x") == expect
assert node2.query("SELECT * FROM mydb.tbl ORDER BY x") == expect


def test_replicated_database_compare_parts():
"""
stop merges and fetches then write data to two nodes and
compare that parts are restored from single node (second) after backup
replica is selected by settings replica_num=2, replica_num_in_backup=2
"""
node1.query(
"CREATE DATABASE mydb ON CLUSTER 'cluster' ENGINE=Replicated('/clickhouse/path/','{shard}','{replica}')"
)

assert node2.query("SELECT * FROM mydb.tbl ORDER BY x") == TSV(
[[1, "Don\\'t"], [2, "count"], [3, "your"], [4, "chickens"]]
node1.query(
"CREATE TABLE mydb.tbl(x UInt8, y String) ENGINE=ReplicatedMergeTree ORDER BY x"
)

node2.query("SYSTEM SYNC DATABASE REPLICA mydb")

node1.query("SYSTEM STOP MERGES mydb.tbl")
node2.query("SYSTEM STOP MERGES mydb.tbl")

node1.query("SYSTEM STOP FETCHES mydb.tbl")
node2.query("SYSTEM STOP FETCHES mydb.tbl")

node1.query("INSERT INTO mydb.tbl VALUES (1, 'a')")
node1.query("INSERT INTO mydb.tbl VALUES (2, 'b')")

node2.query("INSERT INTO mydb.tbl VALUES (3, 'x')")
node2.query("INSERT INTO mydb.tbl VALUES (4, 'y')")

p2 = node2.query("SELECT * FROM mydb.tbl ORDER BY x")

# Make backup.
backup_name = new_backup_name()
node1.query(
f"BACKUP DATABASE mydb ON CLUSTER 'cluster' TO {backup_name} SETTINGS replica_num=2"
)

# Drop table on both nodes.
node1.query("DROP DATABASE mydb ON CLUSTER 'cluster' SYNC")

# Restore from backup on node2.
node1.query(
f"RESTORE DATABASE mydb ON CLUSTER 'cluster' FROM {backup_name} SETTINGS replica_num_in_backup=2"
)
node1.query("SYSTEM SYNC REPLICA ON CLUSTER 'cluster' mydb.tbl")

# compare parts
p1_ = node1.query("SELECT _part, * FROM mydb.tbl ORDER BY x")
p2_ = node2.query("SELECT _part, * FROM mydb.tbl ORDER BY x")
assert p1_ == p2_

# compare data
assert p2 == node2.query("SELECT * FROM mydb.tbl ORDER BY x")


def test_different_tables_on_nodes():
node1.query(
Expand Down Expand Up @@ -427,7 +482,12 @@ def test_replicated_database_async():
node1.query("INSERT INTO mydb.tbl VALUES (22)")
node2.query("INSERT INTO mydb.tbl2 VALUES ('a')")
node2.query("INSERT INTO mydb.tbl2 VALUES ('bb')")

node1.query("OPTIMIZE TABLE mydb.tbl ON CLUSTER 'cluster' FINAL")
node1.query("OPTIMIZE TABLE mydb.tbl2 ON CLUSTER 'cluster' FINAL")

node1.query("SYSTEM SYNC REPLICA ON CLUSTER 'cluster' mydb.tbl")
node1.query("SYSTEM SYNC REPLICA ON CLUSTER 'cluster' mydb.tbl2")

backup_name = new_backup_name()
[id, status] = node1.query(
Expand Down
19 changes: 19 additions & 0 deletions tests/integration/test_dictionaries_update_and_reload/test.py
Expand Up @@ -37,6 +37,16 @@ def get_status(dictionary_name):
).rstrip("\n")


def get_status_retry(dictionary_name, expect, retry_count=10, sleep_time=0.5):
for _ in range(retry_count):
res = get_status(dictionary_name)
if res == expect:
return res
time.sleep(sleep_time)

raise Exception(f'Expected result "{expect}" did not occur')


def get_last_exception(dictionary_name):
return (
instance.query(
Expand Down Expand Up @@ -250,6 +260,15 @@ def test_reload_after_fail_by_timer(started_cluster):
assert expected_error in instance.query_and_get_error(
"SELECT dictGetInt32('no_file_2', 'a', toUInt64(9))"
)

# on sanitizers builds it can return 'FAILED_AND_RELOADING' which is not quite right
# add retry for these builds
if (
instance.is_built_with_sanitizer()
and get_status("no_file_2") == "FAILED_AND_RELOADING"
):
get_status_retry("no_file_2", expect="FAILED")

assert get_status("no_file_2") == "FAILED"

# Creating the file source makes the dictionary able to load.
Expand Down
@@ -1,4 +1,4 @@
<clickhouse>
<max_server_memory_usage>2000000000</max_server_memory_usage>
<max_server_memory_usage>1500000000</max_server_memory_usage>
<allow_use_jemalloc_memory>false</allow_use_jemalloc_memory>
</clickhouse>
</clickhouse>
10 changes: 4 additions & 6 deletions tests/integration/test_global_overcommit_tracker/test.py
Expand Up @@ -22,7 +22,7 @@ def start_cluster():
cluster.shutdown()


GLOBAL_TEST_QUERY_A = "SELECT groupArray(number) FROM numbers(2500000) SETTINGS memory_overcommit_ratio_denominator_for_user=1"
GLOBAL_TEST_QUERY_A = "SELECT groupArray(number) FROM numbers(5000000) SETTINGS memory_overcommit_ratio_denominator_for_user=1"
GLOBAL_TEST_QUERY_B = "SELECT groupArray(number) FROM numbers(2500000) SETTINGS memory_overcommit_ratio_denominator_for_user=80000000"


Expand All @@ -42,11 +42,9 @@ def test_global_overcommit():

responses_A = list()
responses_B = list()
for i in range(100):
if i % 2 == 0:
responses_A.append(node.get_query_request(GLOBAL_TEST_QUERY_A, user="A"))
else:
responses_B.append(node.get_query_request(GLOBAL_TEST_QUERY_B, user="B"))
for i in range(50):
responses_A.append(node.get_query_request(GLOBAL_TEST_QUERY_A, user="A"))
responses_B.append(node.get_query_request(GLOBAL_TEST_QUERY_B, user="B"))

overcommited_killed = False
for response in responses_A:
Expand Down
14 changes: 10 additions & 4 deletions tests/integration/test_merges_memory_limit/test.py
Expand Up @@ -17,18 +17,24 @@ def start_cluster():


def test_memory_limit_success():
if node.is_built_with_thread_sanitizer():
pytest.skip(
"tsan build is skipped because it slowly merges the parts, "
"rather than failing over the memory limit"
)

node.query(
"CREATE TABLE test_merge_oom ENGINE=AggregatingMergeTree ORDER BY id EMPTY AS SELECT number%1024 AS id, arrayReduce( 'groupArrayState', arrayMap( x-> randomPrintableASCII(100), range(8192))) fat_state FROM numbers(20000)"
"CREATE TABLE test_merge_oom ENGINE=AggregatingMergeTree ORDER BY id EMPTY AS SELECT number%1024 AS id, arrayReduce('groupArrayState', arrayMap(x-> randomPrintableASCII(100), range(8192))) fat_state FROM numbers(20000)"
)
node.query("SYSTEM STOP MERGES test_merge_oom")
node.query(
"INSERT INTO test_merge_oom SELECT number%1024 AS id, arrayReduce( 'groupArrayState', arrayMap( x-> randomPrintableASCII(100), range(8192))) fat_state FROM numbers(3000)"
"INSERT INTO test_merge_oom SELECT number%1024 AS id, arrayReduce('groupArrayState', arrayMap( x-> randomPrintableASCII(100), range(8192))) fat_state FROM numbers(3000)"
)
node.query(
"INSERT INTO test_merge_oom SELECT number%1024 AS id, arrayReduce( 'groupArrayState', arrayMap( x-> randomPrintableASCII(100), range(8192))) fat_state FROM numbers(3000)"
"INSERT INTO test_merge_oom SELECT number%1024 AS id, arrayReduce('groupArrayState', arrayMap( x-> randomPrintableASCII(100), range(8192))) fat_state FROM numbers(3000)"
)
node.query(
"INSERT INTO test_merge_oom SELECT number%1024 AS id, arrayReduce( 'groupArrayState', arrayMap( x-> randomPrintableASCII(100), range(8192))) fat_state FROM numbers(3000)"
"INSERT INTO test_merge_oom SELECT number%1024 AS id, arrayReduce('groupArrayState', arrayMap( x-> randomPrintableASCII(100), range(8192))) fat_state FROM numbers(3000)"
)

_, error = node.query_and_get_answer_with_error(
Expand Down
18 changes: 10 additions & 8 deletions tests/integration/test_runtime_configurable_cache_size/test.py
@@ -1,7 +1,6 @@
import os
import pytest
import shutil
import time
import pytest
from helpers.cluster import ClickHouseCluster

# Tests that sizes of in-memory caches (mark / uncompressed / index mark / index uncompressed / mmapped file / query cache) can be changed
Expand Down Expand Up @@ -101,9 +100,10 @@ def test_query_cache_size_is_runtime_configurable(start_cluster):
node.query("SELECT 2 SETTINGS use_query_cache = 1, query_cache_ttl = 1")
node.query("SELECT 3 SETTINGS use_query_cache = 1, query_cache_ttl = 1")

res = node.query_with_retry(
time.sleep(2)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unreliable, it will remain flaky

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually break this test here in the last assert, sleep is required as i understand

        res = node.query_with_retry(
            "SELECT value FROM system.asynchronous_metrics WHERE metric = 'QueryCacheEntries'",
            check_callback=lambda result: result == "1\n",
        )
>       assert res == "1\n"
E       AssertionError: assert '2\n' == '1\n'
E         - 1
E         + 2

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wrote this test originally and found this open PR by luck.

There were three recorded test failures in the past 6 months or so:

which is super sporadic given how many CI tests run per day.

In all cases, we ran three queries with query cache TTL = 1 sec, then wait 2 sec, and find that the query cache contains only 1 entry whereas 2 entries are expected. I can honestly not think of a scenario where this can happen except one: the asynchronous metrics are updated in a weird time interval that they report only 1 entry. Instead of adding retries (l. 105, please revert all retries from this file which only hide problems), please add SYSTEM RELOAD ASYNCHRONOUS METRICS before l. 106, see #53710. When I am back from vacation, I can add some documentation for SYSTEM RELOAD ASYNCHRONOUS METRICS.

Copy link
Member Author

@fm4v fm4v Mar 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Retries are removed and SYSTEM RELOAD ASYNCHRONOUS METRICS after each wait. Lets pray it will be fixed

node.query("SYSTEM RELOAD ASYNCHRONOUS METRICS")
res = node.query(
"SELECT value FROM system.asynchronous_metrics WHERE metric = 'QueryCacheEntries'",
check_callback=lambda result: result == "2\n",
)
assert res == "2\n"

Expand All @@ -116,9 +116,10 @@ def test_query_cache_size_is_runtime_configurable(start_cluster):
node.query("SYSTEM RELOAD CONFIG")

# check that eviction worked as expected
res = node.query_with_retry(
time.sleep(2)
node.query("SYSTEM RELOAD ASYNCHRONOUS METRICS")
res = node.query(
"SELECT value FROM system.asynchronous_metrics WHERE metric = 'QueryCacheEntries'",
check_callback=lambda result: result == "2\n",
)
assert (
res == "2\n"
Expand All @@ -132,9 +133,10 @@ def test_query_cache_size_is_runtime_configurable(start_cluster):
node.query("SELECT 4 SETTINGS use_query_cache = 1, query_cache_ttl = 1")
node.query("SELECT 5 SETTINGS use_query_cache = 1, query_cache_ttl = 1")

res = node.query_with_retry(
time.sleep(2)
node.query("SYSTEM RELOAD ASYNCHRONOUS METRICS")
res = node.query(
"SELECT value FROM system.asynchronous_metrics WHERE metric = 'QueryCacheEntries'",
check_callback=lambda result: result == "1\n",
)
assert res == "1\n"

Expand Down
4 changes: 2 additions & 2 deletions tests/queries/0_stateless/01287_max_execution_speed.sql
Expand Up @@ -2,13 +2,13 @@

SET min_execution_speed = 100000000000, timeout_before_checking_execution_speed = 0;
SELECT count() FROM system.numbers; -- { serverError 160 }
SELECT 'Ok (1)';
SET min_execution_speed = 0;
SELECT 'Ok (1)';

SET min_execution_speed_bytes = 800000000000, timeout_before_checking_execution_speed = 0;
SELECT count() FROM system.numbers; -- { serverError 160 }
SELECT 'Ok (2)';
SET min_execution_speed_bytes = 0;
SELECT 'Ok (2)';

SET max_execution_speed = 1000000;
SET max_block_size = 100;
Expand Down
Expand Up @@ -21,6 +21,10 @@ function test_completion_word()
# - here and below you should escape variables of the expect.
# - you should not use "expect <<..." since in this case timeout/eof will
# not work (I guess due to attached stdin)

# TODO: get build sanitizer and debug/release info to dynamically change test
# like here timeout 120 seconds is too big for release build
# but ok for sanitizer builds
cat > "$SCRIPT_PATH" << EOF
# NOTE: log will be appended
exp_internal -f $CLICKHOUSE_TMP/$(basename "${BASH_SOURCE[0]}").debuglog 0
Expand All @@ -30,7 +34,7 @@ exp_internal -f $CLICKHOUSE_TMP/$(basename "${BASH_SOURCE[0]}").debuglog 0
set stdout_channel [open "/dev/stdout" w]

log_user 0
set timeout 60
set timeout 120
match_max 100000
expect_after {
# Do not ignore eof from expect
Expand Down
@@ -1,6 +1,5 @@
create table test (number UInt64) engine=MergeTree order by number;
insert into test select * from numbers(100000000);
insert into test select * from numbers(50000000);
select ignore(number) from test where RAND() > 4292390314 limit 10;
select count() > 0 from test where RAND() > 4292390314;
drop table test;

Expand Up @@ -5,8 +5,8 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. "$CURDIR"/../shell_config.sh

db=${CLICKHOUSE_DATABASE}
user1="user1_03006_$db_$RANDOM"
user2="user2_03006_$db_$RANDOM"
user1="user1_03006_${db}_$RANDOM"
user2="user2_03006_${db}_$RANDOM"

${CLICKHOUSE_CLIENT} --multiquery <<EOF
DROP DATABASE IF EXISTS $db;
Expand Down