Skip to content

Commit

Permalink
test: make cluster nodes test more detailed
Browse files Browse the repository at this point in the history
  • Loading branch information
sunsingerus committed Nov 28, 2024
1 parent f97edd2 commit a98ea97
Showing 1 changed file with 40 additions and 21 deletions.
61 changes: 40 additions & 21 deletions tests/e2e/test_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1250,14 +1250,14 @@ def get_shards_from_remote_servers(chi, cluster, shell=None):
return chi_shards


def wait_for_cluster(chi, cluster, num_shards, num_replicas=0, pwd="", force_wait = False):
def wait_for_cluster(chi, cluster, num_shards, num_replicas=0, pwd="", force_wait=False):
with Given(f"Cluster {cluster} is properly configured"):
if current().context.operator_version >= "0.24" and force_wait == False:
if current().context.operator_version >= "0.24" and force_wait is False:
print(f"operator {current().context.operator_version} does not require extra wait, skipping check")
else:
with By(f"remote_servers have {num_shards} shards"):
assert num_shards == get_shards_from_remote_servers(chi, cluster)
with By(f"ClickHouse recognizes {num_shards} shards in the cluster"):
with By(f"ClickHouse recognizes {num_shards} shards in the cluster {cluster}"):
for shard in range(num_shards):
shards = ""
for i in range(1, 10):
Expand All @@ -1275,22 +1275,41 @@ def wait_for_cluster(chi, cluster, num_shards, num_replicas=0, pwd="", force_wai
assert shards == str(num_shards)

if num_replicas > 0:
with By(f"ClickHouse recognizes {num_replicas} replicas in the cluster"):
for replica in range(num_replicas):
replicas = ""
for i in range(1, 10):
replicas = clickhouse.query(
chi,
f"select uniq(replica_num) from system.clusters where cluster ='{cluster}'",
host=f"chi-{chi}-{cluster}-0-{replica}",
pwd=pwd,
with_error=True,
with By(f"ClickHouse recognizes {num_replicas} replicas shard in the cluster {cluster}"):
for shard in range(num_shards):
for replica in range(num_replicas):
replicas = ""
for i in range(1, 10):
replicas = clickhouse.query(
chi,
f"select uniq(replica_num) from system.clusters where cluster ='{cluster}'",
host=f"chi-{chi}-{cluster}-{shard}-{replica}",
pwd=pwd,
with_error=True,
)
if replicas == str(num_replicas):
break
with Then("Not ready. Wait for " + str(i * 5) + " seconds"):
time.sleep(i * 5)
assert replicas == str(num_replicas)
if replicas == str(num_replicas):
break
with Then(f"Not ready. {replicas}/{num_replicas} replicas Wait for " + str(i * 5) + " seconds"):
time.sleep(i * 5)
assert replicas == str(num_replicas)
num_hosts = num_shards * num_replicas
with By(f"ClickHouse recognizes {num_hosts} hosts in the cluster {cluster}"):
for shard in range(num_shards):
for replica in range(num_replicas):
hosts = ""
for i in range(1, 10):
hosts = clickhouse.query(
chi,
f"select count() from system.clusters where cluster ='{cluster}'",
host=f"chi-{chi}-{cluster}-{shard}-{replica}",
pwd=pwd,
with_error=True,
)
if hosts == str(num_hosts):
break
with Then(f"Not ready. {hosts}/{num_hosts} hosts Wait for " + str(i * 5) + " seconds"):
time.sleep(i * 5)
assert hosts == str(num_hosts)


@TestScenario
Expand Down Expand Up @@ -1383,12 +1402,14 @@ def test_014_0(self):
"CREATE TABLE test_replicated_014.test_replicated_014 (a Int8) Engine = ReplicatedMergeTree ORDER BY tuple()",
]

wait_for_cluster(chi_name, cluster, n_shards, 2)
wait_for_cluster(chi_name, cluster, n_shards, 1)

with Then("Create schema objects"):
for q in create_ddls:
clickhouse.query(chi_name, q, host=f"chi-{chi_name}-{cluster}-0-0")

# Give some time for replication to catch up
time.sleep(10)
with Given("Replicated tables are created on a first replica and data is inserted"):
for table in replicated_tables:
if table != "test_atomic_014.test_mv2_014":
Expand Down Expand Up @@ -1478,7 +1499,6 @@ def check_schema_propagation(replicas):
)
assert out == "1"


with And("Replicated table should have the data"):
for replica in replicas:
for shard in shards:
Expand All @@ -1491,7 +1511,6 @@ def check_schema_propagation(replicas):
)
assert out == f"{shard}"


# replicas = [1]
replicas = [1, 2]
with When(f"Add {len(replicas)} more replicas"):
Expand Down

0 comments on commit a98ea97

Please sign in to comment.