Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(worker): bench setting the hash #2265

Merged
merged 1 commit into from
Mar 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion server/fishtest/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
according to the route/URL mapping defined in `__init__.py`.
"""

WORKER_VERSION = 267
WORKER_VERSION = 268


@exception_view_config(HTTPException)
Expand Down
178 changes: 89 additions & 89 deletions worker/games.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import threading
import time
from base64 import b64decode
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timedelta, timezone
from pathlib import Path
from queue import Empty, Queue
Expand Down Expand Up @@ -358,8 +358,8 @@ def fetch_validated_net(remote, testing_dir, net, global_cache):


def is_valid_net(content, net):
hash = hashlib.sha256(content).hexdigest()
return hash[:12] == net[3:15]
net_hash = hashlib.sha256(content).hexdigest()
return net_hash[:12] == net[3:15]


def validate_net(testing_dir, net):
Expand Down Expand Up @@ -392,57 +392,30 @@ def establish_validated_net(remote, testing_dir, net, global_cache):
time.sleep(waitTime)


def compute_bench_statistics(results, threads, depth, *, bench_nps=False):
bench_nodes_values = [bn for _, bn in results]
bench_time_values = [bt for bt, _ in results]
bench_nps_values = [1000 * bn / bt / threads for bt, bn in results]

mean_nodes = statistics.mean(bench_nodes_values)
mean_time = statistics.mean(bench_time_values)
mean_nps = statistics.mean(bench_nps_values)
min_nps = min(bench_nps_values)
max_nps = max(bench_nps_values)
median_nps = statistics.median(bench_nps_values)
std_nps = statistics.stdev(bench_nps_values) if len(bench_nps_values) > 1 else 0

print(
f"Bench for {'nps' if bench_nps else 'nodes'}\n"
f"{'Concurrency':<15}: {len(results):15.2f}\n"
f"{'Threads':<15}: {threads:15.2f}\n"
f"{'Depth':<15}: {depth:15.2f}\n"
f"{'Mean nodes':<15}: {mean_nodes:15.2f}\n"
f"{'Mean time (ms)':<15}: {mean_time:15.2f}\n"
f"{'Mean nps':<15}: {mean_nps:15.2f}\n"
f"{'Median nps':<15}: {median_nps:15.2f}\n"
f"{'Min nps':<15}: {min_nps:15.2f}\n"
f"{'Max nps':<15}: {max_nps:15.2f}\n"
f"{'Std nps':<15}: {std_nps:15.2f}\n"
f"{'Std (%)':<15}: {100 * std_nps / mean_nps:15.2f}"
)

return mean_nps if bench_nps else None


def run_single_bench(engine, threads, depth):
def run_single_bench(engine, hash_size, threads, depth):
bench_time, bench_nodes = None, None

try:
p = subprocess.Popen(
[engine, "bench", "16", str(threads), str(depth), "default", "depth"],
with subprocess.Popen(
[
engine,
"bench",
str(hash_size),
str(threads),
str(depth),
"default",
"depth",
],
stderr=subprocess.PIPE,
stdout=subprocess.DEVNULL,
universal_newlines=True,
bufsize=1,
close_fds=not IS_WINDOWS,
)

for line in iter(p.stderr.readline, ""):
if "Total time (ms)" in line:
bench_time = float(line.split(": ")[1].strip())
if "Nodes searched" in line:
bench_nodes = float(line.split(": ")[1].strip())

p.wait()
) as p:
for line in iter(p.stderr.readline, ""):
if "Total time (ms)" in line:
bench_time = float(line.split(": ")[1].strip())
if "Nodes searched" in line:
bench_nodes = float(line.split(": ")[1].strip())
except (OSError, subprocess.SubprocessError) as e:
raise e

Expand All @@ -453,25 +426,76 @@ def run_single_bench(engine, threads, depth):
return bench_time, bench_nodes


def run_parallel_benches(engine, concurrency, threads, depth):
with ProcessPoolExecutor(max_workers=concurrency) as executor:
try:
def run_parallel_benches(engine, concurrency, threads, hash_size, depth):
try:
with ThreadPoolExecutor(max_workers=concurrency) as executor:
results = list(
executor.map(
run_single_bench,
[engine] * concurrency,
[hash_size] * concurrency,
[threads] * concurrency,
[depth] * concurrency,
)
)
except Exception as e:
message = f"Failed to run engine bench: {str(e)}"
raise WorkerException(message, e=e)
except Exception as e:
message = f"Failed to run engine bench: {str(e)}"
raise WorkerException(message, e=e)

return results


def verify_signature(engine, signature, games_concurrency, threads):
def get_bench_nps(engine, games_concurrency, threads, hash_size):
_depth, depth = 11, 13
print("Warmup for bench...")
results = run_parallel_benches(
engine, games_concurrency, threads, hash_size, _depth
)
print(f"...done in {results[0][0]:.2f}ms")
print("Running bench...")
results = run_parallel_benches(engine, games_concurrency, threads, hash_size, depth)

bench_nodes_values = [bn for _, bn in results]
bench_time_values = [bt for bt, _ in results]
bench_nps_values = [1000 * bn / bt / threads for bt, bn in results]

mean_nodes = statistics.mean(bench_nodes_values)
mean_time = statistics.mean(bench_time_values)
mean_nps = statistics.mean(bench_nps_values)
min_nps = min(bench_nps_values)
max_nps = max(bench_nps_values)
stdev_nps = statistics.stdev(bench_nps_values) if len(bench_nps_values) > 1 else 0

print(
f"Statistics for {engine.name}:\n"
f"{'Concurrency':<15}: {games_concurrency:15.2f}\n"
f"{'Threads':<15}: {threads:15.2f}\n"
f"{'Hash':<15}: {hash_size:15.2f}\n"
f"{'Depth':<15}: {depth:15.2f}\n"
f"{'Mean nodes':<15}: {mean_nodes:15.2f}\n"
f"{'Mean time (ms)':<15}: {mean_time:15.2f}\n"
f"{'Mean nps':<15}: {mean_nps:15.2f}\n"
f"{'Min nps':<15}: {min_nps:15.2f}\n"
f"{'Max nps':<15}: {max_nps:15.2f}\n"
f"{'Stdev (%)':<15}: {100 * stdev_nps / mean_nps:15.2f}"
)
return mean_nps


def verify_signature(engine, signature):
hash_size, threads, depth = 16, 1, 13
print("Computing engine signature...")
bench_time, bench_nodes = run_single_bench(engine, hash_size, threads, depth)
print(f"...done in {bench_time:.2f}ms")
if int(bench_nodes) != int(signature):
message = (
f"Wrong bench in {engine.name}, "
f"user expected: {signature} but worker got: {int(bench_nodes)}"
)
raise RunException(message)


def get_cpu_features(engine):
cpu_features = "?"
with subprocess.Popen(
[engine, "compiler"],
Expand All @@ -489,28 +513,7 @@ def verify_signature(engine, signature, games_concurrency, threads):
f"Compiler info exited with non-zero code {format_returncode(p.returncode)}"
)
raise WorkerException(message)

# Validate the signature with threads = 1 and depth = 13
# Preload also the CPU before the bench for NPS
results_for_nodes = run_parallel_benches(engine, games_concurrency * threads, 1, 13)
bench_nodes = results_for_nodes[0][1]

if int(bench_nodes) != int(signature):
message = (
f"Wrong bench in {engine.name}, "
f"user expected: {signature} but worker got: {int(bench_nodes)}"
)
raise RunException(message)

# Get the NPS with the required number of threads and depth
results_for_nps = run_parallel_benches(engine, games_concurrency, threads, 13)

# Compute the statistics
print(f"Statistics for {engine.name}:")
compute_bench_statistics(results_for_nodes, 1, 13)
bench_nps = compute_bench_statistics(results_for_nps, threads, 13, bench_nps=True)

return bench_nps, cpu_features
return cpu_features


def download_from_github_raw(
Expand Down Expand Up @@ -1326,6 +1329,10 @@ def run_games(
repo_url = run["args"].get("tests_repo")
worker_concurrency = int(worker_info["concurrency"])
games_concurrency = worker_concurrency // threads
match = re.search(r"\bHash=(\d+)\b", new_options)
new_hash = int(match.group(1)) if match else 16
match = re.search(r"\bHash=(\d+)\b", base_options)
base_hash = int(match.group(1)) if match else 16

opening_offset = task.get("start", task_id * task["num_games"])
if "start" in task:
Expand Down Expand Up @@ -1471,15 +1478,11 @@ def create_environment():
pass

# Verify that the signatures are correct.
print("Benchmarking worker and verifying signature...", flush=True)
run_errors = []
try:
base_nps, cpu_features = verify_signature(
base_engine,
run["args"]["base_signature"],
games_concurrency,
threads,
)
cpu_features = get_cpu_features(base_engine)
verify_signature(base_engine, run["args"]["base_signature"])
base_nps = get_bench_nps(base_engine, games_concurrency, threads, base_hash)
except RunException as e:
run_errors.append(str(e))
except WorkerException as e:
Expand All @@ -1490,12 +1493,9 @@ def create_environment():
and new_engine == base_engine
):
try:
verify_signature(
new_engine,
run["args"]["new_signature"],
games_concurrency,
threads,
)
_ = get_cpu_features(new_engine)
verify_signature(new_engine, run["args"]["new_signature"])
_ = get_bench_nps(new_engine, games_concurrency, threads, new_hash)
except RunException as e:
run_errors.append(str(e))
except WorkerException as e:
Expand Down
2 changes: 1 addition & 1 deletion worker/sri.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"__version": 267, "updater.py": "PLWj7cnPP4rG8g5R5bJV0Za4kZpuRxbpcpebyFByXeFDUtfat/gfBn/0kJBMOY/m", "worker.py": "5V4dAIjxEK6jIIpmpgYJhCvwAlLd4Toh8fH1qKjDEXCXpwPXUcR46B4+Vk7xm4cA", "games.py": "/01XsULj1l/WwW6LF86h/jMyPKPt8kBID9IvwPctIIe5oCx8SG3AxNGfclqn9mPG"}
{"__version": 268, "updater.py": "PLWj7cnPP4rG8g5R5bJV0Za4kZpuRxbpcpebyFByXeFDUtfat/gfBn/0kJBMOY/m", "worker.py": "EIVTZsH4tTkjuaE4XCFD9AfGaq4zNUnBW0STxp5qhQMxA/bGa8/ZAu35pOW8y8Ai", "games.py": "aAEzDfhGu4C1E+Vj+ENPM0ISxp6ldAJzhtSvZPIpqHUotHYvKOLiVd+cXRucqKhX"}
2 changes: 1 addition & 1 deletion worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@

FASTCHESS_SHA = "eda8b0040d030306dfe1aec63f5437b7d234eb30"

WORKER_VERSION = 267
WORKER_VERSION = 268
FILE_LIST = ["updater.py", "worker.py", "games.py"]
HTTP_TIMEOUT = 30.0
INITIAL_RETRY_TIME = 15.0
Expand Down
Loading