Skip to content

Commit

Permalink
Done test_multiple_databases_distributed_deadlock_detection
Browse files Browse the repository at this point in the history
  • Loading branch information
ivyazmitinov committed Jun 28, 2024
1 parent f25b4b2 commit 1c219fe
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 12 deletions.
11 changes: 11 additions & 0 deletions src/test/regress/citus_tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -1026,6 +1026,15 @@ def create_database(self, name):
self.databases.add(name)
self.sql(sql.SQL("CREATE DATABASE {}").format(sql.Identifier(name)))

def drop_database(self, name):
self.sql("DROP EXTENSION IF EXISTS citus CASCADE", dbname=name)
self.sql(
sql.SQL("DROP DATABASE IF EXISTS {} WITH (FORCE)").format(
sql.Identifier(name)
)
)
self.databases.remove(name)

def create_schema(self, name):
self.schemas.add(name)
self.sql(sql.SQL("CREATE SCHEMA {}").format(sql.Identifier(name)))
Expand Down Expand Up @@ -1055,11 +1064,13 @@ def cleanup_users(self):

def cleanup_databases(self):
for database in self.databases:
self.sql("DROP EXTENSION IF EXISTS citus CASCADE", dbname=database)
self.sql(
sql.SQL("DROP DATABASE IF EXISTS {} WITH (FORCE)").format(
sql.Identifier(database)
)
)
self.databases.clear()

def cleanup_schemas(self):
for schema in self.schemas:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def test_set_maindb(cluster_factory):

wait_until_maintenance_deamons_start(2, cluster)

cluster.coordinator.sql("DROP DATABASE mymaindb;")
cluster.coordinator.drop_database("mymaindb")

wait_until_maintenance_deamons_start(1, cluster)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,20 @@
import pytest
from psycopg.errors import DeadlockDetected

# For every database there is expected to be 2 queries,
# so ~80 connections will be held by deadlocks. Another 5 is expected to be used by maintenance daemon,
# leaving ~15 available
DATABASES_NUMBER = 40


async def test_multiple_databases_distributed_deadlock_detection(cluster):

# Disable maintenance on all nodes
for node in cluster.nodes:
node.configure(
"citus.recover_2pc_interval = '-1'",
"citus.distributed_deadlock_detection_factor = '-1'",
"citus.max_maintenance_shared_pool_size = 5",
# "log_min_messages = 'debug4'",
# "citus.main_db='postgres'"
)
node.restart()

Expand Down Expand Up @@ -42,9 +44,7 @@ async def test_multiple_databases_distributed_deadlock_detection(cluster):
"""
)

print("Setup is done")

async def test_deadlock(db_name, run_on_coordinator):
async def create_deadlock(db_name, run_on_coordinator):
"""Function to prepare a deadlock query in a given database"""
# Init connections and store for later commits
if run_on_coordinator:
Expand Down Expand Up @@ -89,11 +89,13 @@ async def run_deadlocked_queries():

await asyncio.wait_for(run_deadlocked_queries(), 300)

await first_connection.commit()
await second_connection.commit()

async def enable_maintenance_when_deadlocks_ready():
"""Function to enable maintenance daemons, when all the expected deadlock queries are ready"""
# Let deadlocks commence
await asyncio.sleep(2)
# cluster.debug()
# Check that queries are deadlocked
databases_with_deadlock = set()
while len(databases_with_deadlock) < DATABASES_NUMBER:
Expand All @@ -111,11 +113,7 @@ async def enable_maintenance_when_deadlocks_ready():
)
queries_deadlocked = await cursor.fetchone()
if queries_deadlocked[0]:
print(f"Queries are deadlocked on {db_name}")
databases_with_deadlock.add(db_name)

print("Queries on all databases are deadlocked, enabling maintenance")

# Enable maintenance back
for node in cluster.nodes:
node.reset_configuration(
Expand All @@ -124,13 +122,28 @@ async def enable_maintenance_when_deadlocks_ready():
)
node.reload()

# Distribute deadlocked queries among all nodes in the cluster
tasks = list()
for idx, db_name in enumerate(db_names):
run_on_coordinator = True if idx % 3 == 0 else False
tasks.append(
test_deadlock(db_name=db_name, run_on_coordinator=run_on_coordinator)
create_deadlock(db_name=db_name, run_on_coordinator=run_on_coordinator)
)

tasks.append(enable_maintenance_when_deadlocks_ready())

# await for the results
await asyncio.gather(*tasks)

# Check for "too many clients" on all nodes
for node in cluster.nodes:
with node.cur() as cursor:
cursor.execute(
"""
SELECT count(*) AS too_many_clients_errors_count
FROM regexp_split_to_table(pg_read_file(%s), E'\n') AS t(log_line)
WHERE log_line LIKE '%%sorry, too many clients already%%';""",
(node.log_path.as_posix(),),
)
too_many_clients_errors_count = cursor.fetchone()[0]
assert too_many_clients_errors_count == 0

0 comments on commit 1c219fe

Please sign in to comment.