Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use asyncio for subprocess calls #810

Merged
merged 4 commits into from
Feb 4, 2024

Conversation

Falmarri
Copy link
Contributor

@Falmarri Falmarri commented Dec 10, 2023

Removes the threads from compose_up and manages it using async. Also uses async processing to format the log messages instead of piping through sed. This should work on windows without having sed installed

Adds --parallel to support pull and build in parallel, same as docker compose

This should address #679 aswell

@Falmarri
Copy link
Contributor Author

This isn't ready to merge yet, I realized I was running the tests wrong and found a couple issues.

But, would this be accepted at all?

@muayyad-alsadi
Copy link
Collaborator

But, would this be accepted at all?

yes, please.

please notify me when it's ready

Removes the threads from compose_up and manages it using async. Also
uses async processing to format the log messages instead of piping
through sed. This should work on windows without having sed installed

Adds --parallel to support pull and build in parallel, same as docker
compose

Signed-off-by: Falmarri <[email protected]>
Not at 100% yet. But upped code coverage significantly and covered major
async calls.

Signed-off-by: Falmarri <[email protected]>
@Falmarri
Copy link
Contributor Author

@muayyad-alsadi If you want to take a look I think this is ready. It's bigger than I thought it would be at the beggining, because as I was adding some tests and fixing issues I added a bit more to help get the code coverage to at least run the majority of the async paths.

@Falmarri
Copy link
Contributor Author

@muayyad-alsadi gentle ping

@muayyad-alsadi
Copy link
Collaborator

I really appreciate your effort. I'll take a look and merge it. if it's not ready I'll merge it into a branch and I'll work with you on it. thank you.

self.semaphore = semaphore

async def output(self, podman_args, cmd="", cmd_args=None):
async with self.semaphore:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need semaphore here? it's supposed to run on a different subprocess. it seems semaphore is not needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's an asyncio semaphore. It's to support limiting the number of parallel operations. I'm fairly sure this is compatible with the way the v2 docker compose plugin works. Where it's parallel by default (the defalut allows maxint number of parallel operations, but you can limit it with --parallel <n> or COMPOSE_PARALLEL_LIMIT.

I followed the concerns around this docker/compose#8226 and other issues

Copy link
Collaborator

@muayyad-alsadi muayyad-alsadi Feb 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Falmarri thank you. should it default to 2x CPU cores or something like that instead of 9223372036854775807 or is this a docker default?

Copy link
Collaborator

@muayyad-alsadi muayyad-alsadi Feb 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that in many places we can replace .run with .exec which will give us simpler code and better error reporting or return code. for example

await compose.podman.run(["--version"], "", [])

should we remove log_formatter as it sometimes misses last line and since we have podman logs --color
it will also make code simpler
I think we should change the code to create containers then start them detached then run podman logs --color -n what do you think?

I see that podman.output() is only used internally so it does not need to be semaphored.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well it's not the number of CPUs. This is entirely single threaded now, so it will only ever use 1 cpu. It's just the number of concurrent tasks that are allowed, and docker compose's default is unbounded, which I've just replicated.

I was just trying to keep the changes as small as possible rather, so if you think cleaning up the run/exec makes sense that works.

Wouldn't starting the containers detached complicate this though? It would mean that if compose gets killed via sigkill, it won't be able to clean up the containers it started. I'm not an expert here but I would think that could likely lead to issues for users.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well it's not the number of CPUs. This is entirely single threaded now, so it will only ever use 1 cpu. It's just the number of concurrent tasks that are allowed, and docker compose's default is unbounded, which I've just replicated.

it's in deed, we are forking a subprocess like podman build or something like that
even if we not, it's a good heuristic indicator of how much the machine could handle.

I was just trying to keep the changes as small as possible rather

I've merged it, but I'll need someone like you to become co-maintainer and help me refactor the code.

so if you think cleaning up the run/exec makes sense that works.

I'll open an issue with the points in my mind. and your PRs are welcome.

Wouldn't starting the containers detached complicate this though?

it made CTRL+C works. I know it will break things, that's why I created a different branch.

@Falmarri
Copy link
Contributor Author

I really appreciate your effort. I'll take a look and merge it. if it's not ready I'll merge it into a branch and I'll work with you on it. thank you.

Sounds great. I notice that the tests are broken since I changed some there to run the full suite. I didn't have a way to test that. I can push a fix for flake8 being uninstalled.

Signed-off-by: Falmarri <[email protected]>
@Falmarri
Copy link
Contributor Author

@muayyad-alsadi any chance you could give me approval to automatically run the test suite? I'm mostly familiar with gitlab so it might take me a couple tries to get these tests going.

@muayyad-alsadi
Copy link
Collaborator

I've merged this into a branch called devel-asyncio

muayyad-alsadi added a commit that referenced this pull request Feb 2, 2024
@muayyad-alsadi
Copy link
Collaborator

please check the following here

https://github.com/muayyad-alsadi/my-podman-compose/blob/devel-asyncio/podman_compose.py#L1159

class Pool:
    def __init__(self, parallel):
        self.semaphore = asyncio.Semaphore(parallel) if isinstance(parallel, int) else parallel
        self.tasks = []

    async def join(self):
        await asyncio.gather(*self.tasks)

async def compose_down(compose, args):
    pool = Pool(compose.semaphore)
    pool.append(asyncio.create_task(compose.podman.run_bg([], "stop", [*podman_stop_args, cnt["name"]]), name=cnt["name"]), pool=pool)
    await pool.join()

also

https://github.com/muayyad-alsadi/my-podman-compose/blob/devel-asyncio/podman_compose.py#L2292

        await compose.podman.run([], "pod", ["start", pod_name])
        # TODO: use run and do a loop to capture exit_code_from using podman wait
        await compose.podman.run([], "pod", ["logs", "--color", "-n", "-f", pod_name])
        await compose.commands["stop"](compose, argparse.Namespace(services=[]))

@muayyad-alsadi muayyad-alsadi merged commit c5be5ba into containers:devel Feb 4, 2024
1 of 7 checks passed
@muayyad-alsadi
Copy link
Collaborator

@muayyad-alsadi any chance you could give me approval to automatically run the test suite? I'm mostly familiar with gitlab so it might take me a couple tries to get these tests going.

I guess this would happen automatically when your first PR is accepted and it's now
please send me another PR that just fix the tests.
we will work the new features and refactoring later.

@muayyad-alsadi
Copy link
Collaborator

I've implemented a pool with cleaner code no more task_reference hack

                # This is hacky to make the tasks not get garbage collected
                # https://github.com/python/cpython/issues/91887
                out_t = asyncio.create_task(format_out(p.stdout))
                task_reference.add(out_t)
                out_t.add_done_callback(task_reference.discard)

                err_t = asyncio.create_task(format_out(p.stderr))
                task_reference.add(err_t)
                err_t.add_done_callback(task_reference.discard)

https://github.com/muayyad-alsadi/my-podman-compose/blob/devel-asyncio/podman_compose.py

new code looks like this

class Pool:
    def __init__(self, podman: Podman, parallel):
        self.podman: Podman = podman
        self.semaphore = asyncio.Semaphore(parallel) if isinstance(parallel, int) else parallel
        self.tasks = []

    def create_task(self, coro, *, name=None, context=None):
        return self.tasks.append(asyncio.create_task(coro, name=name, context=context))

    def run(self, *args, name=None, **kwargs):
        return self.create_task(self._run_one(*args, **kwargs), name=name)

    async def _run_one(self, *args, **kwargs) -> int:
        async with self.semaphore:
            return await self.podman.run(*args, **kwargs)

    async def join(self, *, desc="joining enqueued tasks") -> int:
        if not self.tasks: return 0
        ls = await tqdm.gather(*self.tasks, desc=desc)
        failed = [ i for i in ls if i != 0]
        del self.tasks[:]
        count = len(failed)
        if count>1:
            log(f"** EE ** got multiple failures: [{count}] failures")
        if failed:
            log(f"** EE ** retcode: [{failed[0]}]")
            return failed[0]
        return 0

and used like this in pull, down and build commands

    for cnt in tqdm(containers, "stopping ..."):
        # ...
        compose.pool.run([], "stop", [*podman_stop_args, cnt["name"]], name=cnt["name"])
        
    await compose.pool.join(desc="waiting to be stopped")

the entire build logic is now removed.

@muayyad-alsadi
Copy link
Collaborator

@Falmarri I believe you have permissions to run tests.

did you have any chance to see devel-asyncio branch?

@Falmarri
Copy link
Contributor Author

Sorry I've been pretty busy lately. I should have some time to work on this now

@Falmarri Falmarri deleted the feature/async branch February 25, 2024 19:49
@Falmarri
Copy link
Contributor Author

I've implemented a pool with cleaner code no more task_reference hack

You actually missed the place where this hack was used. It was only used for the logging output tasks, because those were never intended to be joined on since they ran infinitely. It was only to prevent the garbage collector from reaping the tasks. Let me see if I can add them to the pool, but I'm concerned joining on them will incorrectly block

@Falmarri
Copy link
Contributor Author

I also don't like the tqdm progress bars. They don't really work when logs are being output to stdout

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants