Skip to content

Commit

Permalink
MAINT Factor GraphBuilder class out of the build_from_graph function
Browse files Browse the repository at this point in the history
This rewrites all of the closed-over state in build_from_graph into a separate
GraphBuilder class. Should make it easier to understand the code here.
  • Loading branch information
hoodmane committed Jan 29, 2024
1 parent b8287e4 commit f9f7fa7
Showing 1 changed file with 142 additions and 90 deletions.
232 changes: 142 additions & 90 deletions pyodide-build/pyodide_build/buildall.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@
import subprocess
import sys
from collections import defaultdict
from collections.abc import Iterable
from collections.abc import Iterable, Iterator
from contextlib import contextmanager
from datetime import datetime
from functools import total_ordering
from graphlib import TopologicalSorter
from pathlib import Path
from queue import PriorityQueue, Queue
from queue import Empty, PriorityQueue, Queue
from threading import Lock, Thread
from time import perf_counter, sleep
from typing import Any

from packaging.utils import canonicalize_name
from pyodide_lock import PyodideLockSpec
from pyodide_lock.spec import PackageSpec as PackageLockSpec
from pyodide_lock.utils import update_package_sha256
Expand All @@ -28,6 +28,8 @@
from rich.spinner import Spinner
from rich.table import Table

from packaging.utils import canonicalize_name

from . import build_env, recipe
from .buildpkg import needs_rebuild
from .common import (
Expand Down Expand Up @@ -497,6 +499,139 @@ def generate_needs_build_set(
return needs_build


@dataclasses.dataclass(eq=False, repr=False)
class GraphBuilder:
pkg_map: dict[str, BasePackage]
build_args: BuildArgs
build_dir: Path
needs_build: set[str]

build_queue: PriorityQueue[tuple[int, BasePackage]] = dataclasses.field(
default_factory=PriorityQueue
)
built_queue: Queue[BasePackage | BaseException] = dataclasses.field(
default_factory=Queue
)
lock: Lock = dataclasses.field(default_factory=Lock)
building_rust_pkg: bool = False
queue_idx: int = 1
progress_formatter: ReplProgressFormatter = None # type: ignore[assignment]
messages: list[str] = dataclasses.field(default_factory=list)

def __post_init__(self):
self.progress_formatter = ReplProgressFormatter(len(self.needs_build))
for pkg_name in self.needs_build:
pkg = self.pkg_map[pkg_name]
if len(pkg.unbuilt_host_dependencies) == 0:
self.build_queue.put((job_priority(pkg), pkg))

@contextmanager
def queue_index(self, pkg: BasePackage) -> Iterator[int | None]:
is_rust_pkg = pkg.meta.is_rust_package()
with self.lock:
queue_idx = self.queue_idx
if is_rust_pkg and self.building_rust_pkg:
# Don't build multiple rust packages at the same time.
# See: https://github.com/pyodide/pyodide/issues/3565
# Note that if there are only rust packages left in the queue,
# this will keep pushing and popping packages until the current rust package
# is built. This is not ideal but presumably the overhead is negligible.
self.build_queue.put((job_priority(pkg), pkg))
yield None
return
if is_rust_pkg:
self.building_rust_pkg = True
self.queue_idx += 1
try:
yield queue_idx
finally:
if is_rust_pkg:
self.building_rust_pkg = False

@contextmanager
def pkg_status_display(self, n: int, pkg: BasePackage) -> Iterator[None]:
idx = pkg._queue_idx
assert idx
pkg_status = self.progress_formatter.add_package(
name=pkg.name,
idx=idx,
thread=n,
total_packages=len(self.needs_build),
)
t0 = perf_counter()
success = True
try:
yield
except BaseException:
success = False
raise
finally:
pkg_status.finish(success, perf_counter() - t0)
self.progress_formatter.remove_package(pkg_status)

def build_one(self, n: int, pkg: BasePackage) -> BaseException | None:
try:
with self.pkg_status_display(n, pkg):
pkg.build(self.build_args, self.build_dir)
except Exception as e:
# What about BaseException?
return e
else:
return None

def get_pkg(self) -> BasePackage:
while True:
try:
return self.build_queue.get(timeout=0.5)[1]
except Empty:
pass

def builder(self, n: int) -> None:
while True:
pkg = self.get_pkg()
print(n, "starting", pkg.name)
with self.queue_index(pkg) as idx:
if idx is None:
# Rust package and we're already building one.
# Release the GIL so new packages get queued
sleep(0.01)
continue
pkg._queue_idx = idx
res = self.build_one(n, pkg)
print(n, "finished", pkg.name, not bool(res))
if res:
# Build failed
self.built_queue.put(res)
return
self.built_queue.put(pkg)
# Release the GIL so new packages get queued
sleep(0.01)

def run(self, n_jobs: int, already_built: set[str]) -> None:
for n in range(0, n_jobs):
Thread(target=self.builder, args=(n + 1,), daemon=True).start()

num_built = len(already_built)
with Live(self.progress_formatter, console=console_stdout):
while num_built < len(self.pkg_map):
match self.built_queue.get():
case BaseException() as err:
break
case BasePackage() as pkg:
pass

num_built += 1

self.progress_formatter.update_progress_bar()

for _dependent in pkg.host_dependents:
dependent = self.pkg_map[_dependent]
dependent.unbuilt_host_dependencies.remove(pkg.name)
if len(dependent.unbuilt_host_dependencies) == 0:
self.build_queue.put((job_priority(dependent), dependent))
raise err


def build_from_graph(
pkg_map: dict[str, BasePackage],
build_args: BuildArgs,
Expand All @@ -523,7 +658,6 @@ def build_from_graph(

# Insert packages into build_queue. We *must* do this after counting
# dependents, because the ordering ought not to change after insertion.
build_queue: PriorityQueue[tuple[int, BasePackage]] = PriorityQueue()

if force_rebuild:
# If "force_rebuild" is set, just rebuild everything
Expand All @@ -548,96 +682,14 @@ def build_from_graph(
logger.success("All packages already built. Quitting.")
return

sorted_needs_build = sorted(needs_build)
logger.info(
"Building the following packages: "
f"[bold]{format_name_list(sorted(needs_build))}[/bold]"
f"[bold]{format_name_list(sorted_needs_build)}[/bold]"
)

for pkg_name in needs_build:
pkg = pkg_map[pkg_name]
if len(pkg.unbuilt_host_dependencies) == 0:
build_queue.put((job_priority(pkg), pkg))

built_queue: Queue[BasePackage | BaseException] = Queue()
thread_lock = Lock()
queue_idx = 1
building_rust_pkg = False
progress_formatter = ReplProgressFormatter(len(needs_build))

def builder(n: int) -> None:
nonlocal queue_idx, building_rust_pkg
while True:
_, pkg = build_queue.get()

with thread_lock:
if pkg.meta.is_rust_package():
# Don't build multiple rust packages at the same time.
# See: https://github.com/pyodide/pyodide/issues/3565
# Note that if there are only rust packages left in the queue,
# this will keep pushing and popping packages until the current rust package
# is built. This is not ideal but presumably the overhead is negligible.
if building_rust_pkg:
build_queue.put((job_priority(pkg), pkg))

# Release the GIL so new packages get queued
sleep(0.1)
continue

building_rust_pkg = True

pkg._queue_idx = queue_idx
queue_idx += 1

pkg_status = progress_formatter.add_package(
name=pkg.name,
idx=pkg._queue_idx,
thread=n,
total_packages=len(needs_build),
)
t0 = perf_counter()

success = True
try:
pkg.build(build_args, build_dir)
except Exception as e:
built_queue.put(e)
success = False
return
finally:
pkg_status.finish(success, perf_counter() - t0)
progress_formatter.remove_package(pkg_status)

built_queue.put(pkg)

with thread_lock:
if pkg.meta.is_rust_package():
building_rust_pkg = False

# Release the GIL so new packages get queued
sleep(0.01)

for n in range(0, n_jobs):
Thread(target=builder, args=(n + 1,), daemon=True).start()

num_built = len(already_built)
build_state = GraphBuilder(pkg_map, build_args, build_dir, set(needs_build))
try:
with Live(progress_formatter, console=console_stdout):
while num_built < len(pkg_map):
match built_queue.get():
case BaseException() as err:
raise err
case BasePackage() as pkg:
pass

num_built += 1

progress_formatter.update_progress_bar()

for _dependent in pkg.host_dependents:
dependent = pkg_map[_dependent]
dependent.unbuilt_host_dependencies.remove(pkg.name)
if len(dependent.unbuilt_host_dependencies) == 0:
build_queue.put((job_priority(dependent), dependent))
build_state.run(n_jobs, already_built)
except BuildError as err:
logger.error(err.msg)
sys.exit(err.returncode)
Expand Down

0 comments on commit f9f7fa7

Please sign in to comment.