Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement log flushing #808

Merged
merged 7 commits into from
May 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 36 additions & 5 deletions conda-store-server/conda_store_server/action/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@

def action(f: typing.Callable):
@functools.wraps(f)
def wrapper(*args, **kwargs):
action_context = ActionContext()
def wrapper(*args, stdout=None, stderr=None, **kwargs):
action_context = ActionContext(stdout=stdout, stderr=stderr)
with contextlib.ExitStack() as stack:
# redirect stdout -> action_context.stdout
stack.enter_context(contextlib.redirect_stdout(action_context.stdout))
Expand All @@ -35,18 +35,48 @@ def wrapper(*args, **kwargs):


class ActionContext:
def __init__(self):
def __init__(self, stdout=None, stderr=None):
if stdout is not None and stderr is None:
stderr = stdout

self.id = str(uuid.uuid4())
self.stdout = io.StringIO()
self.stderr = io.StringIO()
self.stdout = stdout if stdout is not None else io.StringIO()
self.stderr = stderr if stderr is not None else io.StringIO()
self.log = logging.getLogger(f"conda_store_server.action.{self.id}")
self.log.propagate = False
self.log.addHandler(logging.StreamHandler(stream=self.stdout))
self.log.setLevel(logging.INFO)
self.result = None
self.artifacts = {}

def run_command(self, command, redirect_stderr=True, **kwargs):
"""Runs command and immediately writes to logs"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The bulk of this function has to do with redirecting output to context buffers from the subprocess, so it might make sense to mention something about that in the docstring.

I know this isn't done elsewhere in the code, so maybe it would be useful to discuss in tomorrow's meeting about docstrings and type annotations. But IMO it would make future future maintenance easier if there was something said about each parameter, e.g. why is there an option to redirect stderr but not stdout? Part of the reason I mention it is that LSPs will pull out docstrings and type annotations to give richer hints if we use those, and doing so will unlock static type checking in the future.

self.log.info(f"Running command: {' '.join(command)}")

# Unlike subprocess.run, Popen doesn't support the check argument, so
# ignore it. The code below always checks the return code
kwargs.pop("check", None)

# https://stackoverflow.com/questions/4417546/constantly-print-subprocess-output-while-process-is-running
with subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT if redirect_stderr else subprocess.PIPE,
bufsize=1,
universal_newlines=True,
**kwargs,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://docs.python.org/3/library/subprocess.html

bufsize [...] 1 means line buffered (only usable if text=True or universal_newlines=True)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be useful to put this in the code (in the docstring, maybe?)

) as p:
for line in p.stdout:
self.stdout.write(line)
if not redirect_stderr:
for line in p.stderr:
self.stderr.write(line)

if p.returncode != 0:
raise subprocess.CalledProcessError(p.returncode, p.args)

def run(self, *args, redirect_stderr=True, **kwargs):
"""Runs command waiting for it to succeed before writing to logs"""
result = subprocess.run(
*args,
**kwargs,
Expand All @@ -57,4 +87,5 @@ def run(self, *args, redirect_stderr=True, **kwargs):
self.stdout.write(result.stdout)
if not redirect_stderr:
self.stderr.write(result.stderr)

return result
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def write_file(filename, s):
"constructor",
"--help",
]
logged_command(context, command, timeout=10)
logged_command(context, command)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logged_command(context, command)
context.run_command(command)

Based on your comment under the logged_command function.

except FileNotFoundError:
warnings.warn(
"Installer generation requires constructor: https://github.com/conda/constructor"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,4 @@ def action_install_lockfile(
str(lockfile_filename),
]

context.run(command, check=True)
context.run_command(command, check=True)
12 changes: 3 additions & 9 deletions conda-store-server/conda_store_server/action/utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,4 @@
import subprocess


def logged_command(context, command, **kwargs):
context.log.info(f"Running command: {' '.join(command)}")
context.log.info(
subprocess.check_output(
command, stderr=subprocess.STDOUT, encoding="utf-8", **kwargs
)
)
# This is here only for backward compatibility, new code should use the
# run_command method instead of calling this function
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to issue a DeprecationWarning here? cc @dcmcand for insight about our policy around this.

context.run_command(command, **kwargs)
171 changes: 104 additions & 67 deletions conda-store-server/conda_store_server/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,29 +10,56 @@

import yaml

from filelock import FileLock
from sqlalchemy.orm import Session

from conda_store_server import action, api, conda_utils, orm, schema, utils
from conda_store_server.utils import BuildPathError


def append_to_logs(db: Session, conda_store, build, logs: typing.Union[str, bytes]):
try:
current_logs = conda_store.storage.get(build.log_key)
except Exception:
current_logs = b""
class LoggedStream:
"""Allows writing to storage via logging.StreamHandler"""
Copy link
Contributor Author

@nkaretnikov nkaretnikov Apr 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is designed to be used with StreamHandler, via self.log in ActionContext, as well as a dedicated stream, via self.stdout and self.stderr. This allows capturing output no matter how it's written to, by writing to one of the streams directly or using the logging library, and without changing existing logging code.

From https://docs.python.org/3/library/logging.handlers.html#streamhandler:

The StreamHandler class, located in the core logging package, sends logging output to streams such as sys.stdout, sys.stderr or any file-like object (or, more precisely, any object which supports write() and flush() methods).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you leave this useful information in the docstring?


if isinstance(logs, str):
logs = logs.encode("utf-8")
def __init__(self, db, conda_store, build, prefix=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want, the default can be prefix = "", which would eliminate the need for the branch on L34.

self.db = db
self.conda_store = conda_store
self.build = build
self.prefix = prefix

conda_store.storage.set(
db,
build.id,
build.log_key,
current_logs + logs,
content_type="text/plain",
artifact_type=schema.BuildArtifactType.LOGS,
)
def write(self, b, /):
for line in b.split("\n"):
# Skips empty lines
if not line:
continue
if self.prefix is not None:
line = self.prefix + line
append_to_logs(self.db, self.conda_store, self.build, line + "\n")

def flush(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed?

pass


def append_to_logs(db: Session, conda_store, build, logs: typing.Union[str, bytes]):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be okay to add types for the other parameters?

# For instance, with local storage, this involves reading from and writing
# to a file. Locking here prevents a race condition when multiple tasks
# attempt to write to a shared resource, which is the log
with FileLock(f"{build.build_path(conda_store)}.log.lock"):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to have a timeout here? This lock is only active during writes to the file, not actual commands running, right? If so, something short (5s?) should be okay.

try:
current_logs = conda_store.storage.get(build.log_key)
except Exception:
current_logs = b""

if isinstance(logs, str):
logs = logs.encode("utf-8")

conda_store.storage.set(
db,
build.id,
build.log_key,
current_logs + logs,
content_type="text/plain",
artifact_type=schema.BuildArtifactType.LOGS,
)


def set_build_started(db: Session, build: orm.Build):
Expand Down Expand Up @@ -179,6 +206,12 @@ def build_conda_environment(db: Session, conda_store, build):
),
platforms=settings.conda_solve_platforms,
conda_flags=conda_store.conda_flags,
stdout=LoggedStream(
db=db,
conda_store=conda_store,
build=build,
prefix="action_solve_lockfile: ",
),
)

conda_store.storage.set(
Expand All @@ -190,40 +223,28 @@ def build_conda_environment(db: Session, conda_store, build):
artifact_type=schema.BuildArtifactType.LOCKFILE,
)

append_to_logs(
db,
conda_store,
build,
"::group::action_solve_lockfile\n"
+ context.stdout.getvalue()
+ "\n::endgroup::\n",
)
conda_lock_spec = context.result

context = action.action_fetch_and_extract_conda_packages(
conda_lock_spec=conda_lock_spec,
pkgs_dir=conda_utils.conda_root_package_dir(),
)
append_to_logs(
db,
conda_store,
build,
"::group::action_fetch_and_extract_conda_packages\n"
+ context.stdout.getvalue()
+ "\n::endgroup::\n",
stdout=LoggedStream(
db=db,
conda_store=conda_store,
build=build,
prefix="action_fetch_and_extract_conda_packages: ",
),
)

context = action.action_install_lockfile(
conda_lock_spec=conda_lock_spec,
conda_prefix=conda_prefix,
)
append_to_logs(
db,
conda_store,
build,
"::group::action_install_lockfile\n"
+ context.stdout.getvalue()
+ "\n::endgroup::\n",
stdout=LoggedStream(
db=db,
conda_store=conda_store,
build=build,
prefix="action_install_lockfile: ",
),
)

utils.symlink(conda_prefix, environment_prefix)
Expand All @@ -233,15 +254,35 @@ def build_conda_environment(db: Session, conda_store, build):
permissions=settings.default_permissions,
uid=settings.default_uid,
gid=settings.default_gid,
stdout=LoggedStream(
db=db,
conda_store=conda_store,
build=build,
prefix="action_set_conda_prefix_permissions: ",
),
)

action.action_add_conda_prefix_packages(
db=db,
conda_prefix=conda_prefix,
build_id=build.id,
stdout=LoggedStream(
db=db,
conda_store=conda_store,
build=build,
prefix="action_add_conda_prefix_packages: ",
),
)

context = action.action_get_conda_prefix_stats(conda_prefix)
context = action.action_get_conda_prefix_stats(
conda_prefix,
stdout=LoggedStream(
db=db,
conda_store=conda_store,
build=build,
prefix="action_get_conda_prefix_stats: ",
),
)
build.size = context.result["disk_usage"]

set_build_completed(db, conda_store, build)
Expand Down Expand Up @@ -299,15 +340,14 @@ def build_conda_env_export(db: Session, conda_store, build: orm.Build):
)

context = action.action_generate_conda_export(
conda_command=settings.conda_command, conda_prefix=conda_prefix
)
append_to_logs(
db,
conda_store,
build,
"::group::action_generate_conda_export\n"
+ context.stdout.getvalue()
+ "\n::endgroup::\n",
conda_command=settings.conda_command,
conda_prefix=conda_prefix,
stdout=LoggedStream(
db=db,
conda_store=conda_store,
build=build,
prefix="action_generate_conda_export: ",
),
)

conda_prefix_export = yaml.dump(context.result).encode("utf-8")
Expand All @@ -330,16 +370,15 @@ def build_conda_pack(db: Session, conda_store, build: orm.Build):
):
with tempfile.TemporaryDirectory() as tmpdir:
output_filename = pathlib.Path(tmpdir) / "environment.tar.gz"
context = action.action_generate_conda_pack(
conda_prefix=conda_prefix, output_filename=output_filename
)
append_to_logs(
db,
conda_store,
build,
"::group::action_generate_conda_pack\n"
+ context.stdout.getvalue()
+ "\n::endgroup::\n",
action.action_generate_conda_pack(
conda_prefix=conda_prefix,
output_filename=output_filename,
stdout=LoggedStream(
db=db,
conda_store=conda_store,
build=build,
prefix="action_generate_conda_pack: ",
),
)
conda_store.storage.fset(
db,
Expand Down Expand Up @@ -425,16 +464,14 @@ def build_constructor_installer(db: Session, conda_store, build: orm.Build):
),
installer_dir=pathlib.Path(tmpdir),
version=build.build_key,
stdout=LoggedStream(
db=db,
conda_store=conda_store,
build=build,
prefix="action_generate_constructor_installer: ",
),
)
output_filename = context.result
append_to_logs(
db,
conda_store,
build,
"::group::action_generate_constructor_installer\n"
+ context.stdout.getvalue()
+ "\n::endgroup::\n",
)
if output_filename is None:
return
conda_store.storage.fset(
Expand Down
26 changes: 20 additions & 6 deletions conda-store-server/conda_store_server/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,17 @@ def fset(
filename: str,
artifact_type: schema.BuildArtifactType,
):
db.add(
orm.BuildArtifact(build_id=build_id, key=key, artifact_type=artifact_type)
ba = orm.BuildArtifact
exists = (
db.query(ba)
.filter(ba.build_id == build_id)
.filter(ba.key == key)
.filter(ba.artifact_type == artifact_type)
.first()
)
db.commit()
if not exists:
db.add(ba(build_id=build_id, key=key, artifact_type=artifact_type))
db.commit()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fixes a pre-existing issue. It's important to fix this now because this PR writes to logs line by line, so a new SQL entry would be created for each line.

Multiple SQL entries add nothing here, it's just an oversight. The format is:

1|1|LOGS|logs/6e6d70d9-1713098291-1-test.log
2|1|LOGS|logs/6e6d70d9-1713098291-1-test.log
3|1|LOGS|logs/6e6d70d9-1713098291-1-test.log
4|1|LOGS|logs/6e6d70d9-1713098291-1-test.log
5|1|LOGS|logs/6e6d70d9-1713098291-1-test.log
6|1|LOGS|logs/6e6d70d9-1713098291-1-test.log
7|1|LOGS|logs/6e6d70d9-1713098291-1-test.log


def set(
self,
Expand All @@ -34,10 +41,17 @@ def set(
value: bytes,
artifact_type: schema.BuildArtifactType,
):
db.add(
orm.BuildArtifact(build_id=build_id, key=key, artifact_type=artifact_type)
ba = orm.BuildArtifact
exists = (
db.query(ba)
.filter(ba.build_id == build_id)
.filter(ba.key == key)
.filter(ba.artifact_type == artifact_type)
.first()
)
db.commit()
if not exists:
db.add(ba(build_id=build_id, key=key, artifact_type=artifact_type))
db.commit()
Comment on lines +44 to +54
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks the same as Storage.fset. Can we just call that?


def get(self, key: str):
raise NotImplementedError()
Expand Down
Loading