Skip to content

Commit

Permalink
Periodically validate Fishtest's internal data structures.
Browse files Browse the repository at this point in the history
In this way we guarantee that the schemas in schemas.py are
kept up-to-date and hence preserve their documentary value.

Also: switch to string keys for the worker_runs dict. This is
what we do elsewhere and it makes the schema more readable:

worker_runs_schema = {
    uuid: {
        run_id: True,
        "last_run": run_id,
    }
}

Also: retire the separate variable "purge_count" and include
it in the "active_run" dict.

Requires upgrade of vtjson for the set datatype used in
"unfinished_runs_schema".

Co-Authored-By: vdbergh <[email protected]>
  • Loading branch information
Viren6 and vdbergh committed Jun 27, 2024
1 parent a33848b commit b3ab87a
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 37 deletions.
106 changes: 76 additions & 30 deletions server/montytest/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,20 @@
from montytest.actiondb import ActionDb
from montytest.schemas import (
RUN_VERSION,
active_runs_schema,
cache_schema,
compute_committed_games,
compute_cores,
compute_results,
compute_total_games,
compute_workers,
connections_counter_schema,
nn_schema,
pgns_schema,
runs_schema,
unfinished_runs_schema,
worker_runs_schema,
wtt_map_schema,
)
from montytest.stats.stat_util import SPRT_elo
from montytest.userdb import UserDb
Expand Down Expand Up @@ -86,6 +91,10 @@ def __init__(self, db_name="montytest_new", port=-1, is_primary_instance=True):
self.actiondb.system_event(message=f"start montytest@{self.port}")

self.__is_primary_instance = is_primary_instance

# Create a lock for each active run
self.run_lock = threading.Lock()
self.active_runs = {}

self.request_task_lock = threading.Lock()
self.scheduler = None
Expand All @@ -100,6 +109,57 @@ def schedule_tasks(self):
# short intial delay to make testing more pleasant
self.scheduler.create_task(180.0, self.validate_random_run, initial_delay=60.0)
self.scheduler.create_task(180.0, self.clean_wtt_map, initial_delay=60.0)
self.scheduler.create_task(
900.0, self.validate_data_structures, initial_delay=60.0
)

def validate_data_structures(self):
# The main purpose of task is to ensure that the schemas
# in schemas.py are kept up-to-date.
print(
"Validate_data_structures: validating Fishtest's internal data structures...",
flush=True,
)
try:
validate(
cache_schema,
self.run_cache,
name="run_cache",
subs={"runs_schema": dict},
)
validate(
wtt_map_schema,
self.wtt_map,
name="wtt_map",
subs={"runs_schema": dict},
)
validate(
connections_counter_schema,
self.connections_counter,
name="connections_counter",
)
validate(
unfinished_runs_schema,
self.unfinished_runs,
name="unfinished_runs",
)
validate(
active_runs_schema,
self.active_runs,
name="active_runs",
)
validate(
worker_runs_schema,
self.worker_runs,
name="worker_runs",
)
except ValidationError as e:
message = f"Validation of internal data structures failed: {str(e)}"
print(message, flush=True)
self.actiondb.log_message(
username="fishtest.system",
message=message,
)

def update_itp(self):
with self.unfinished_runs_lock:
Expand Down Expand Up @@ -339,18 +399,6 @@ def update_aggregated_data(self):

self.update_itp()

# This will be moved to a more suitable place once we have documented more
# internal montytest data structures.
try:
validate(
cache_schema,
self.run_cache,
name="run_cache",
subs={"runs_schema": dict},
)
except ValidationError as e:
print(f"Validation of run_cache failed: {str(e)}")

def new_run(
self,
base_tag,
Expand Down Expand Up @@ -1013,7 +1061,7 @@ def priority(run): # lower is better
# Always consider the higher priority runs first
-run["args"]["priority"],
# Try to avoid repeatedly working on the same test
run["_id"] == last_run_id,
str(run["_id"]) == last_run_id,
# Make sure all runs at this priority level get _some_ cores
run["cores"] > 0,
# Try to match run["args"]["itp"].
Expand All @@ -1033,6 +1081,8 @@ def priority(run): # lower is better
run_found = False

for run in unfinished_runs:
run_id = str(run["_id"])

if run["finished"]:
continue

Expand Down Expand Up @@ -1075,7 +1125,7 @@ def priority(run): # lower is better
if near_github_api_limit:
have_binary = (
unique_key in self.worker_runs
and run["_id"] in self.worker_runs[unique_key]
and run_id in self.worker_runs[unique_key]
)
if not have_binary:
continue
Expand Down Expand Up @@ -1117,7 +1167,7 @@ def priority(run): # lower is better
return {"task_waiting": False}

# Now we create a new task for this run.
run_id = run["_id"]
run_id = str(run["_id"])
with self.active_run_lock(run_id):
# It may happen that the run we have selected is now finished or
# has enough games.
Expand Down Expand Up @@ -1185,32 +1235,28 @@ def priority(run): # lower is better
# Cache some data. Currently we record the id's
# the worker has seen, as well as the last id that was seen.
# Note that "worker_runs" is empty after a server restart.

if unique_key not in self.worker_runs:
self.worker_runs[unique_key] = {}

if run["_id"] not in self.worker_runs[unique_key]:
self.worker_runs[unique_key][run["_id"]] = True

self.worker_runs[unique_key]["last_run"] = run["_id"]
self.worker_runs[unique_key][run_id] = True
self.worker_runs[unique_key]["last_run"] = run_id

return {"run": run, "task_id": task_id}

# Create a lock for each active run
run_lock = threading.Lock()
active_runs = {}
purge_count = 0

def active_run_lock(self, id):
id = str(id)
with self.run_lock:
self.purge_count = self.purge_count + 1
if self.purge_count > 100000:
if "purge_count" not in self.active_runs:
self.active_runs["purge_count"] = 0
self.active_runs["purge_count"] += 1
if self.active_runs["purge_count"] > 100000:
old = time.time() - 10000
self.active_runs = dict(
(k, v) for k, v in self.active_runs.items() if v["time"] >= old
(k, v)
for k, v in self.active_runs.items()
if (k != "purge_count" and v["time"] >= old)
)
self.purge_count = 0
self.active_runs["purge_count"] = 0
if id in self.active_runs:
active_lock = self.active_runs[id]["lock"]
self.active_runs[id]["time"] = time.time()
Expand Down
46 changes: 39 additions & 7 deletions server/montytest/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import copy
import math
import threading
from datetime import datetime, timezone

from bson.binary import Binary
Expand Down Expand Up @@ -67,6 +68,8 @@
unumber = intersect(number, ge(0))
sunumber = intersect(number, gt(0))

task_id = set_name(uint, "task_id")


def size_is_length(x):
return x["size"] == len(x["pgn_zip"])
Expand Down Expand Up @@ -196,7 +199,7 @@ def action_is(x):
"worker": long_worker_name,
"run_id": run_id,
"run": run_name,
"task_id": uint,
"task_id": task_id,
"message": action_message,
},
),
Expand All @@ -210,7 +213,7 @@ def action_is(x):
"worker": long_worker_name,
"run_id": run_id,
"run": run_name,
"task_id": uint,
"task_id": task_id,
"message": action_message,
},
),
Expand All @@ -224,7 +227,7 @@ def action_is(x):
"worker": long_worker_name,
"run_id": run_id,
"run": run_name,
"task_id": uint,
"task_id": task_id,
},
),
(
Expand Down Expand Up @@ -294,7 +297,7 @@ def action_is(x):
"run": run_name,
"message": action_message,
"worker?": long_worker_name,
"task_id?": uint,
"task_id?": task_id,
},
ifthen(at_least_one_of("worker", "task_id"), keys("worker", "task_id")),
),
Expand Down Expand Up @@ -441,7 +444,7 @@ def valid_spsa_results(R):
{
"password": str,
"run_id?": run_id,
"task_id?": uint,
"task_id?": task_id,
"pgn?": str,
"message?": str,
"worker_info": worker_info_schema_api,
Expand Down Expand Up @@ -737,7 +740,7 @@ def total_games_must_match(run):
"residual": number,
"residual_color": str,
"bad": True,
"task_id": uint,
"task_id": task_id,
"stats": results_schema,
"worker_info": worker_info_schema_runs,
},
Expand All @@ -762,11 +765,40 @@ def total_games_must_match(run):
valid_aggregated_data,
)

runs_schema = set_label(runs_schema, "runs_schema")

cache_schema = {
run_id: {
"run": set_label(runs_schema, "runs_schema"),
"run": runs_schema,
"is_changed": bool, # Indicates if the run has changed since last_sync_time.
"last_sync_time": ufloat, # Last sync time (reading from or writing to db). If never synced then creation time.
"last_access_time": ufloat, # Last time the cache entry was touched (via buffer() or get_run()).
},
}

wtt_map_schema = {
short_worker_name: (runs_schema, task_id),
}

connections_counter_schema = {
ip_address: suint,
}

unfinished_runs_schema = {
run_id,
}

active_runs_schema = {
"purge_count?": suint,
run_id: {
"time": ufloat,
"lock": threading.RLock,
},
}

worker_runs_schema = {
uuid: {
run_id: True,
"last_run": run_id,
}
}

0 comments on commit b3ab87a

Please sign in to comment.