Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Abort other fetches when resolution fails #111

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

CNSeniorious000
Copy link
Member

@CNSeniorious000 CNSeniorious000 commented May 26, 2024

For now, if you installs a incompatible package like jupyter without keep_going=True, it will raises in about 1-2s, but fetches and resolution continues to run even though they are already useless.

This is a waste of MBs and CPU. So I propose cancelling other promises after raising.

Reproduction:

In pyodide console.html, open the Network panel of DevTools, and

from micropip import install
await install("jupyter")
waterfall.mov

Desired behavior:

  • If keep_going=True, same as the original behavior
  • If keep_going=False, The resolution should stop and ongoing fetches should be aborted

Comment on lines 31 to 41
controller = AbortController.new()
kwargs["signal"] = controller.signal

async def fetch_with_abort():
try:
return await pyfetch(urls, **kwargs)
except CancelledError:
controller.abort()
raise

return await fetch_with_abort()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, I think maybe this logic can be written into pyfetch itself, because cancel is more pythonic than using a AbortController, and as I understand it, pyfetch is a pythonic wrapper for js.fetch

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR welcome!

Comment on lines 34 to 42
async def fetch_with_abort():
try:
return await pyfetch(urls, **kwargs)
except CancelledError:
controller.abort()
raise
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this into a function taking controller, urls, kwargs as arguments instead of a closure?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is done. and urls is a typo so I amend the commit

if not self.keep_going:
for future in futures:
if not future.done():
future.cancel()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't investigated how cancellation interacts with our event loop and I'm slightly worried that it could not interact well. But if this passes the tests then we should merge.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait a second, I just find that this part seems to raises several asyncio.exceptions.InvalidStateError: invalid state exceptions

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Zac-HD any tips on how to make my event loop support cancellation? Where to look who to ask etc?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anyways I'll open a separate issue about this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you try using a TaskGroup? Maybe that will fix our problems.
https://docs.python.org/3/library/asyncio-task.html#asyncio.TaskGroup

Copy link
Member Author

@CNSeniorious000 CNSeniorious000 Jun 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, I've refactored this section using TaskGroup, now all the futures should be gathered in one top-level TaskGroup.

Note that this makes the traceback looks a bit different because TaskGroup would group all the exceptions into a ExceptionGroup. Here comes a little inconsistency because if specified keep_going=True, this will only raise ValueError.

And asyncio.exceptions.InvalidStateError: invalid state stays still.

Copy link
Member Author

@CNSeniorious000 CNSeniorious000 Jun 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test fails because the raised error is not a ValueError but a ExceptionGroup containing a ValueError.

It's possible to recover the original ValueError by wrap the async with block with a try-except:

...
    try:
        async with asyncio.TaskGroup() as tg:
            self.tg = tg  # only one task group from the top level
            for requirement in requirements:
                tg.create_task(self.add_requirement(requirement))
    except ExceptionGroup as e:
        raise e.exceptions[-1] from None
...

But I am not sure whether this is the desired behavior.

Maybe it is better to name a new error like ResolutionFailedError(ValueError) to wrap both type of error.

Copy link
Member

@hoodmane hoodmane left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

@ryanking13 ryanking13 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some tests? Or would it be too difficult to simulate network failures?

@CNSeniorious000
Copy link
Member Author

It's not about network failure, it's about installing pyodide-incompatible packages.

For example, jupyter is a huge package and is not installable in pyodide. If you try to micropip.install("jupyter"), it will fail soon due to "ValueError: Can't find a pure Python 3 wheel".

But the fetches and resolution still runs in the background, which is wasting network and CPU usage.

So I came into an idea that I may cancel all the running futures, to stop resolution after any one resolution failed. (only when keep_going=False because if you want to keep_going, we must resolve all the dependencies, so we can't early-stop)

This is implemented in this PR I think, you can test it by host the wheel using a static server and use the load http://localhost:.../micropip-...-...whl in the pyodide console, and try install jupyter again. You will see all the fetches got aborted when one resolution fails, as expected.

But you will see the pyodide console print many asyncio.exceptions.InvalidStateError: invalid state lines. I don't know why. Maybe this is an issue of the webloop.

@hoodmane
Copy link
Member

hoodmane commented Jun 3, 2024

I'll try and check this out and debug the InvalidStateError issues...

Comment on lines 27 to 38
async def _pyfetch(url: str, **kwargs):
if "signal" in kwargs:
return await pyfetch(url, **kwargs)

controller = AbortController.new()
kwargs["signal"] = controller.signal

try:
return await pyfetch(url, **kwargs)
except CancelledError:
controller.abort()
raise
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be great to implement this into pyfetch directly in a followup.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So should I remove changes in this file and open another PR on pyfetch in pyodide/pyodide?

Do you think this feature should provide a parameter for user to opt-out this behavior? Maybe adding a kw-only parameter like abort_on_cancel=True?

Copy link
Member

@hoodmane hoodmane Jun 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, let's add it here and also upstream. micropip wants to support some older versions of Pyodide so it can't use the upstreamed functionality right away.

Maybe adding a kw-only parameter like abort_on_cancel=True?

Nah it feels like we're switching from a less correct behavior to a more correct one with this. Let's not overcomplicate the API by forcing people to opt into what they expect.

@hoodmane
Copy link
Member

hoodmane commented Jun 4, 2024

Looks like test_fetch_wheel_fail has to be updated to expect an ExceptionGroup to be raised.

@CNSeniorious000
Copy link
Member Author

CNSeniorious000 commented Jun 4, 2024

I recorded a video to demo this bug, which demonstrates how to reproduce this behavior. Hope that is more clear than my words.

The strange thing is, why the last exception isn't ValueError?

video demo

demo.mov

@hoodmane
Copy link
Member

hoodmane commented Jun 4, 2024

I think the problem is you're trying to cancel a future that has already been resolved. There are only three spots in the Python interpreter where InvalidStateError("invalid state") is raised:
https://github.com/python/cpython/blob/main/Modules/_asynciomodule.c?plain=1#L557
https://github.com/python/cpython/blob/main/Modules/_asynciomodule.c?plain=1#L577

@hoodmane
Copy link
Member

hoodmane commented Jun 4, 2024

I still think it could be due to a bug in WebLoop.

@hoodmane
Copy link
Member

hoodmane commented Jun 4, 2024

One way to learn more about it would be to path the interpreter to log some extra info when it sets the InvalidStateError. I'll try to look into it tomorrow.

@CNSeniorious000
Copy link
Member Author

Looks like test_fetch_wheel_fail has to be updated to expect an ExceptionGroup to be raised.

I have doubts about this. when keep_going=True it still raises a ValueError. Do you think it is reasonable to raise a ExceptionGroup when keep_going=False but ValueError otherwise?

I personally prefer the original behavior simply using gather because we are constructing the error message ourselves, so we need no ExceptionGroup.

IIRC, The point of TaskGroup is not only canceling other futures when one fails, but also waiting them and group their exceptions into one. I think we don't need this. We only need the first ValueError if keep_going=True, else we need all the error messages to construct a single ValueError with meaningful message. So I think maybe TaskGroup is not that suitable for our use case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realized that the original implementation can only abort in await pyfetch(...) stage, but we need to abort await response.bytes() too, so I changed the approach.

Now I used a decorator to dependency-inject an AbortSignal into the decorated function, and pass that to the call to pyfetch. After decorating, the signature of them are the same as before. But maybe this needs a re-review.

Copy link
Member Author

@CNSeniorious000 CNSeniorious000 Jun 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I still changed fetch_bytes and fetch_string_and_headers. Maybe you think that isn't elegant.

In fact, through a more hacking way, it is possible to only decorate it, without dependency injection, and no longer need to pass signal=signal in pyfetch themselves, which enables this decorater to be used elsewhere too. (Maybe after #112, there would be more resolution/downloading implementations, and they can simply decorate their fetching function with this decorator to ensure the aborting-when-cancelled behavior)

In the _abort_on_cancel decorator, replace the input function's __locals__ with a ChainMap. In that context, insert a _signal into that namespace, and replace the pyfetch in that namespace to partial(pyfetch, signal=_signal). Then we can simplify the patching code:

@_abort_on_cancel
async def fetch_string_and_headers(
-   signal: AbortSignal, url: str, kwargs: dict[str, str]
+   url: str, kwargs: dict[str, str]
) -> tuple[str, dict[str, str]]:
-   response = await pyfetch(url, **kwargs, signal=signal)
+   response = await pyfetch(url, **kwargs)
    ...

Potential downsides:

  1. the code may become a little confusing
  2. if the user don't use the name pyfetch, but using other names, it won't be patched as partial(pyfetch, signal=_signal
  3. ChainMap and partial may have small runtime overhead

@CNSeniorious000
Copy link
Member Author

Everything works fine now. Here is a short video for demonstration:

good.mov

But I don't know how to implement a testcase to ensure this behavior.

@hoodmane
Copy link
Member

Next week I'll try to see if I can make a test. I think we can monkeypatch fetch perhaps...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants