Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Engine: Submission failure with DeliveryError #6015

Open
mbercx opened this issue May 13, 2023 · 10 comments
Open

🐛 Engine: Submission failure with DeliveryError #6015

mbercx opened this issue May 13, 2023 · 10 comments

Comments

@mbercx
Copy link
Member

mbercx commented May 13, 2023

Just ran into this error while trying to submit:

---------------------------------------------------------------------------
DeliveryError                             Traceback (most recent call last)
Cell In[14], line 42
     39 builder.w90_chk_to_ukk_script = w90_script
     41 wc_group, _ = orm.Group.collection.get_or_create('tmp[/workchains](https://file+.vscode-resource.vscode-cdn.net/workchains)')
---> 42 wc_node = submit(builder); #wc_group.add_nodes(wc_node)

[...] # Trimmed for brevity, full traceback below

File [/opt/homebrew/Cellar/python](https://file+.vscode-resource.vscode-cdn.net/opt/homebrew/Cellar/python)@3.9[/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/futures.py:201](https://file+.vscode-resource.vscode-cdn.net/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/futures.py:201), in Future.result(self)
    199 self.__log_traceback = False
    200 if self._exception is not None:
--> 201     raise self._exception
    202 return self._result

DeliveryError: (None, )
Full Traceback
---------------------------------------------------------------------------
DeliveryError                             Traceback (most recent call last)
Cell In[14], line 42
     39 builder.w90_chk_to_ukk_script = w90_script
     41 wc_group, _ = orm.Group.collection.get_or_create('tmp[/workchains](https://file+.vscode-resource.vscode-cdn.net/workchains)')
---> 42 wc_node = submit(builder); #wc_group.add_nodes(wc_node)

File [~/project/super/code/aiida-core/aiida/engine/launch.py:103](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/epw-workchain/~/project/super/code/aiida-core/aiida/engine/launch.py:103), in submit(process, **inputs)
    100 assert runner.persister is not None, 'runner does not have a persister'
    101 assert runner.controller is not None, 'runner does not have a persister'
--> 103 process_inited = instantiate_process(runner, process, **inputs)
    105 # If a dry run is requested, simply forward to `run`, because it is not compatible with `submit`. We choose for this
    106 # instead of raising, because in this way the user does not have to change the launcher when testing. The same goes
    107 # for if `remote_folder` is present in the inputs, which means we are importing an already completed calculation.
    108 if process_inited.metadata.get('dry_run', False) or 'remote_folder' in inputs:

File [~/project/super/code/aiida-core/aiida/engine/utils.py:64](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/epw-workchain/~/project/super/code/aiida-core/aiida/engine/utils.py:64), in instantiate_process(runner, process, **inputs)
     61 else:
     62     raise ValueError(f'invalid process {type(process)}, needs to be Process or ProcessBuilder')
---> 64 process = process_class(runner=runner, inputs=inputs)
     66 return process

File [~/.virtualenvs/super/lib/python3.9/site-packages/plumpy/base/state_machine.py:194](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/epw-workchain/~/.virtualenvs/super/lib/python3.9/site-packages/plumpy/base/state_machine.py:194), in StateMachineMeta.__call__(cls, *args, **kwargs)
    186 """
    187 Create the state machine and enter the initial state.
    188 
   (...)
    191 :return: An instance of the state machine
    192 """
    193 inst = super().__call__(*args, **kwargs)
--> 194 inst.transition_to(inst.create_initial_state())
    195 call_with_super_check(inst.init)
    196 return inst

File [~/.virtualenvs/super/lib/python3.9/site-packages/plumpy/base/state_machine.py:339](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/epw-workchain/~/.virtualenvs/super/lib/python3.9/site-packages/plumpy/base/state_machine.py:339), in StateMachine.transition_to(self, new_state, *args, **kwargs)
    337         raise
    338     self._transition_failing = True
--> 339     self.transition_failed(initial_state_label, label, *sys.exc_info()[1:])
    340 finally:
    341     self._transition_failing = False

File [~/.virtualenvs/super/lib/python3.9/site-packages/plumpy/processes.py:1003](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/epw-workchain/~/.virtualenvs/super/lib/python3.9/site-packages/plumpy/processes.py:1003), in Process.transition_failed(self, initial_state, final_state, exception, trace)
    998 def transition_failed(
    999     self, initial_state: Hashable, final_state: Hashable, exception: Exception, trace: TracebackType
   1000 ) -> None:
   1001     # If we are creating, then reraise instead of failing.
   1002     if final_state == process_states.ProcessState.CREATED:
-> 1003         raise exception.with_traceback(trace)
   1005     self.transition_to(process_states.ProcessState.EXCEPTED, exception, trace)

File [~/.virtualenvs/super/lib/python3.9/site-packages/plumpy/base/state_machine.py:324](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/epw-workchain/~/.virtualenvs/super/lib/python3.9/site-packages/plumpy/base/state_machine.py:324), in StateMachine.transition_to(self, new_state, *args, **kwargs)
    321     self._exit_current_state(new_state)
    323 try:
--> 324     self._enter_next_state(new_state)
    325 except StateEntryFailed as exception:
    326     # Make sure we have a state instance
    327     new_state = self._create_state_instance(exception.state, *exception.args, **exception.kwargs)

File [~/.virtualenvs/super/lib/python3.9/site-packages/plumpy/base/state_machine.py:388](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/epw-workchain/~/.virtualenvs/super/lib/python3.9/site-packages/plumpy/base/state_machine.py:388), in StateMachine._enter_next_state(self, next_state)
    386 next_state.do_enter()
    387 self._state = next_state
--> 388 self._fire_state_event(StateEventHook.ENTERED_STATE, last_state)

File [~/.virtualenvs/super/lib/python3.9/site-packages/plumpy/base/state_machine.py:300](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/epw-workchain/~/.virtualenvs/super/lib/python3.9/site-packages/plumpy/base/state_machine.py:300), in StateMachine._fire_state_event(self, hook, state)
    298 def _fire_state_event(self, hook: Hashable, state: Optional[State]) -> None:
    299     for callback in self._event_callbacks.get(hook, []):
--> 300         callback(self, hook, state)

File [~/.virtualenvs/super/lib/python3.9/site-packages/plumpy/processes.py:331](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/epw-workchain/~/.virtualenvs/super/lib/python3.9/site-packages/plumpy/processes.py:331), in Process._setup_event_hooks..(_s, _h, from_state)
    325 def _setup_event_hooks(self) -> None:
    326     """Set the event hooks to process, when it is created or loaded(recreated)."""
    327     event_hooks = {
    328         state_machine.StateEventHook.ENTERING_STATE:
    329         lambda _s, _h, state: self.on_entering(cast(process_states.State, state)),
    330         state_machine.StateEventHook.ENTERED_STATE:
--> 331         lambda _s, _h, from_state: self.on_entered(cast(Optional[process_states.State], from_state)),
    332         state_machine.StateEventHook.EXITING_STATE:
    333         lambda _s, _h, _state: self.on_exiting()
    334     }
    335     for hook, callback in event_hooks.items():
    336         self.add_state_event_callback(hook, callback)

File [~/project/super/code/aiida-core/aiida/engine/processes/process.py:426](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/epw-workchain/~/project/super/code/aiida-core/aiida/engine/processes/process.py:426), in Process.on_entered(self, from_state)
    424 self._save_checkpoint()
    425 set_process_state_change_timestamp(self)
--> 426 super().on_entered(from_state)

File [~/.virtualenvs/super/lib/python3.9/site-packages/plumpy/processes.py:714](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/epw-workchain/~/.virtualenvs/super/lib/python3.9/site-packages/plumpy/processes.py:714), in Process.on_entered(self, from_state)
    712 self.logger.info('Process<%s>: Broadcasting state change: %s', self.pid, subject)
    713 try:
--> 714     self._communicator.broadcast_send(body=None, sender=self.pid, subject=subject)
    715 except ConnectionClosed:
    716     message = 'Process<%s>: no connection available to broadcast state change from %s to %s'

File [~/.virtualenvs/super/lib/python3.9/site-packages/plumpy/communications.py:175](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/epw-workchain/~/.virtualenvs/super/lib/python3.9/site-packages/plumpy/communications.py:175), in LoopCommunicator.broadcast_send(self, body, sender, subject, correlation_id)
    168 def broadcast_send(
    169     self,
    170     body: Optional[Any],
   (...)
    173     correlation_id: Optional['ID_TYPE'] = None
    174 ) -> futures.Future:
--> 175     return self._communicator.broadcast_send(body, sender, subject, correlation_id)

File [~/.virtualenvs/super/lib/python3.9/site-packages/kiwipy/rmq/threadcomms.py:258](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/epw-workchain/~/.virtualenvs/super/lib/python3.9/site-packages/kiwipy/rmq/threadcomms.py:258), in RmqThreadCommunicator.broadcast_send(self, body, sender, subject, correlation_id)
    256 def broadcast_send(self, body, sender=None, subject=None, correlation_id=None):
    257     self._ensure_open()
--> 258     result = self._loop_scheduler.await_(
    259         self._communicator.broadcast_send(body=body, sender=sender, subject=subject, correlation_id=correlation_id)
    260     )
    261     return isinstance(result, pamqp.specification.Basic.Ack)

File [~/.virtualenvs/super/lib/python3.9/site-packages/pytray/aiothreads.py:164](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/epw-workchain/~/.virtualenvs/super/lib/python3.9/site-packages/pytray/aiothreads.py:164), in LoopScheduler.await_(self, awaitable, name)
    153 """
    154 Await an awaitable on the event loop and return the result.  It may take a little time for
    155 the loop to get around to scheduling it, so we use a timeout as set by the TASK_TIMEOUT class
   (...)
    161 :return: the result of running the coroutine
    162 """
    163 try:
--> 164     return self.await_submit(awaitable).result(timeout=self.task_timeout)
    165 except concurrent.futures.TimeoutError as exc:
    166     # Try to get a reasonable name for the awaitable
    167     name = name or getattr(awaitable, "__name__", "Awaitable")

File [/opt/homebrew/Cellar/python](https://file+.vscode-resource.vscode-cdn.net/opt/homebrew/Cellar/python)@3.9[/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py:446](https://file+.vscode-resource.vscode-cdn.net/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py:446), in Future.result(self, timeout)
    444     raise CancelledError()
    445 elif self._state == FINISHED:
--> 446     return self.__get_result()
    447 else:
    448     raise TimeoutError()

File [/opt/homebrew/Cellar/python](https://file+.vscode-resource.vscode-cdn.net/opt/homebrew/Cellar/python)@3.9[/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py:391](https://file+.vscode-resource.vscode-cdn.net/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py:391), in Future.__get_result(self)
    389 if self._exception:
    390     try:
--> 391         raise self._exception
    392     finally:
    393         # Break a reference cycle with the exception in self._exception
    394         self = None

File [/opt/homebrew/Cellar/python](https://file+.vscode-resource.vscode-cdn.net/opt/homebrew/Cellar/python)@3.9[/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py:258](https://file+.vscode-resource.vscode-cdn.net/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py:258), in Task.__step(***failed resolving arguments***)
    256         result = coro.send(None)
    257     else:
--> 258         result = coro.throw(exc)
    259 except StopIteration as exc:
    260     if self._must_cancel:
    261         # Task is cancelled right before coro stops.

File [~/.virtualenvs/super/lib/python3.9/site-packages/pytray/aiothreads.py:178](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/epw-workchain/~/.virtualenvs/super/lib/python3.9/site-packages/pytray/aiothreads.py:178), in LoopScheduler.await_submit..coro()
    177 async def coro():
--> 178     res = await awaitable
    179     if asyncio.isfuture(res):
    180         future = ThreadFuture()

File [~/.virtualenvs/super/lib/python3.9/site-packages/kiwipy/rmq/communicator.py:522](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/epw-workchain/~/.virtualenvs/super/lib/python3.9/site-packages/kiwipy/rmq/communicator.py:522), in RmqCommunicator.broadcast_send(self, body, sender, subject, correlation_id)
    520 async def broadcast_send(self, body, sender=None, subject=None, correlation_id=None):
    521     publisher = await self.get_message_publisher()
--> 522     result = await publisher.broadcast_send(body, sender, subject, correlation_id)
    523     return result

File [~/.virtualenvs/super/lib/python3.9/site-packages/kiwipy/rmq/communicator.py:66](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/epw-workchain/~/.virtualenvs/super/lib/python3.9/site-packages/kiwipy/rmq/communicator.py:66), in RmqPublisher.broadcast_send(self, msg, sender, subject, correlation_id)
     61 message = aio_pika.Message(
     62     body=self._encode(message_dict),
     63     delivery_mode=aio_pika.DeliveryMode.NOT_PERSISTENT,
     64 )
     65 # Send as mandatory=False because we don't expect the message to be routable to anyone
---> 66 return await self.publish(message, routing_key=defaults.BROADCAST_TOPIC, mandatory=False)

File [~/.virtualenvs/super/lib/python3.9/site-packages/kiwipy/rmq/messages.py:209](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/epw-workchain/~/.virtualenvs/super/lib/python3.9/site-packages/kiwipy/rmq/messages.py:209), in BasePublisherWithReplyQueue.publish(self, message, routing_key, mandatory)
    200 async def publish(self, message, routing_key, mandatory=True):
    201     """
    202     Send a fire-and-forget message i.e. no response expected.
    203 
   (...)
    207     :return:
    208     """
--> 209     result = await self._exchange.publish(message, routing_key=routing_key, mandatory=mandatory)
    210     return result

File [~/.virtualenvs/super/lib/python3.9/site-packages/aio_pika/exchange.py:233](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/epw-workchain/~/.virtualenvs/super/lib/python3.9/site-packages/aio_pika/exchange.py:233), in Exchange.publish(self, message, routing_key, mandatory, immediate, timeout)
    227 if self.internal:
    228     # Caught on the client side to prevent channel closure
    229     raise ValueError(
    230         "Can not publish to internal exchange: '%s'!" % self.name,
    231     )
--> 233 return await asyncio.wait_for(
    234     self.channel.basic_publish(
    235         exchange=self.name,
    236         routing_key=routing_key,
    237         body=message.body,
    238         properties=message.properties,
    239         mandatory=mandatory,
    240         immediate=immediate,
    241     ),
    242     timeout=timeout,
    243 )

File [/opt/homebrew/Cellar/python](https://file+.vscode-resource.vscode-cdn.net/opt/homebrew/Cellar/python)@3.9[/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py:442](https://file+.vscode-resource.vscode-cdn.net/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py:442), in wait_for(fut, timeout, loop)
    437     warnings.warn("The loop argument is deprecated since Python 3.8, "
    438                   "and scheduled for removal in Python 3.10.",
    439                   DeprecationWarning, stacklevel=2)
    441 if timeout is None:
--> 442     return await fut
    444 if timeout <= 0:
    445     fut = ensure_future(fut, loop=loop)

File [~/.virtualenvs/super/lib/python3.9/site-packages/aiormq/channel.py:547](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/epw-workchain/~/.virtualenvs/super/lib/python3.9/site-packages/aiormq/channel.py:547), in Channel.basic_publish(self, body, exchange, routing_key, properties, mandatory, immediate, timeout)
    544 if not self.publisher_confirms:
    545     return
--> 547 return await asyncio.wait_for(confirmation, timeout=timeout)

File [/opt/homebrew/Cellar/python](https://file+.vscode-resource.vscode-cdn.net/opt/homebrew/Cellar/python)@3.9[/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py:442](https://file+.vscode-resource.vscode-cdn.net/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py:442), in wait_for(fut, timeout, loop)
    437     warnings.warn("The loop argument is deprecated since Python 3.8, "
    438                   "and scheduled for removal in Python 3.10.",
    439                   DeprecationWarning, stacklevel=2)
    441 if timeout is None:
--> 442     return await fut
    444 if timeout <= 0:
    445     fut = ensure_future(fut, loop=loop)

File [/opt/homebrew/Cellar/python](https://file+.vscode-resource.vscode-cdn.net/opt/homebrew/Cellar/python)@3.9[/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/futures.py:284](https://file+.vscode-resource.vscode-cdn.net/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/futures.py:284), in Future.__await__(self)
    282 if not self.done():
    283     self._asyncio_future_blocking = True
--> 284     yield self  # This tells Task to wait for completion.
    285 if not self.done():
    286     raise RuntimeError("await wasn't used with future")

File [/opt/homebrew/Cellar/python](https://file+.vscode-resource.vscode-cdn.net/opt/homebrew/Cellar/python)@3.9[/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py:328](https://file+.vscode-resource.vscode-cdn.net/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py:328), in Task.__wakeup(self, future)
    326 def __wakeup(self, future):
    327     try:
--> 328         future.result()
    329     except BaseException as exc:
    330         # This may also be a cancellation.
    331         self.__step(exc)

File [/opt/homebrew/Cellar/python](https://file+.vscode-resource.vscode-cdn.net/opt/homebrew/Cellar/python)@3.9[/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/futures.py:201](https://file+.vscode-resource.vscode-cdn.net/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/futures.py:201), in Future.result(self)
    199 self.__log_traceback = False
    200 if self._exception is not None:
--> 201     raise self._exception
    202 return self._result

DeliveryError: (None, )

Still figuring out what happened, all my checkmarks are green:

verdi status
 ✔ version:     AiiDA v2.3.0
 ✔ config:      /Users/mbercx/project/super/.aiida
 ✔ profile:     dev
 ✔ storage:     Storage for 'dev' [open] @ postgresql://mbercx:***@localhost:5432/super-dev / DiskObjectStoreRepository: 39319bead002422387f2793c4e406dc6 | /Users/mbercx/project/super/repositories/dev/container
 ✔ rabbitmq:    Connected to RabbitMQ v3.11.13 as amqp://guest:[email protected]:5672?heartbeat=600
 ✔ daemon:      Daemon is running with PID 78099

Versions:

  • MacOS Monterey v12.5
  • aiida-core: 2.3.0
  • nest-asyncio: 1.5.6
@mbercx mbercx changed the title 🐛 Engine: 🐛 Engine: Submission failure with DeliveryError May 13, 2023
@mbercx
Copy link
Member Author

mbercx commented May 13, 2023

The issue seems to be transient. Turning it off and on again resolved it.

@mbercx
Copy link
Member Author

mbercx commented May 13, 2023

The processes did get created, but are not picked up as the daemon is (re)started.

@sphuber
Copy link
Contributor

sphuber commented May 13, 2023

Try running verdi devel rabbitmq tasks analyze to see if there are inconsistencies

@mbercx
Copy link
Member Author

mbercx commented May 13, 2023

Indeed

❯ verdi devel rabbitmq tasks analyze
Warning: There are active processes without process task: {531970, 531717, 532038, 531463, 532012, 531984, 532025, 531931, 531998}
Critical: Inconsistencies detected between database and RabbitMQ. Run again with `--fix` to address problems.

This was in a different environment (I already cleaned up the ones above, apparently). Do you need me to check anything else to figure out what caused the problem?

@sphuber
Copy link
Contributor

sphuber commented May 13, 2023

Not really I'm afraid. The was just to help with the processes not getting started after submission. That is most likely due to the missing task, which can be fixed with that command. The only hope for debugging this is to be able to reproduce it, and since you said it is transient, that is going to be tricky 😅

@mbercx
Copy link
Member Author

mbercx commented May 13, 2023

Haha, fair! It did keep on happening before I had to shut down my computer (was moving location with my mac Mini, so had to shut down unfortunately), so next time maybe we can do some live debugging. ^^

Hmm, searching through the documentation doesn't give any clue about what is going on, or how to fix it with that tasty verdi devel command. Maybe we should have add it to a suitable "troubleshooting" section?

Screenshot 2023-05-13 at 19 43 25

@sphuber
Copy link
Contributor

sphuber commented May 13, 2023

For the DeliveryError, I haven't seen it before, so no idea what could be going on.

As for verdi devel rabbitmq, I only added that quite recently, and since it was experimental and only to be used in case of problems caused by bugs, we decided to put it under verdi devel. But we have used it multiple times now for various users and it seems to be working quite well. I think it would be time to maybe add an entry in the FAQ like "My jobs are stuck in "Created" state." and advertize this verdi devel rabbitmq tasks analyze --fix command as it will automagically correct things.

@mbercx
Copy link
Member Author

mbercx commented May 14, 2023

Another note: Seems the output of a different submission attempt (in the original project) was still captured in my notebook. Here the error trace is different:

Full Traceback
---------------------------------------------------------------------------
ChannelInvalidStateError                  Traceback (most recent call last)
Cell In[5], line 22
     17 builder.base.pw.parallelization = orm.Dict({'npool': 2})
     18 builder.base.pw.metadata.options.resources = {
     19     'num_machines': 2,
     20     'num_mpiprocs_per_machine': 1
     21 }
---> 22 submit(builder)

File [~/project/super/code/aiida-core/aiida/engine/launch.py:103](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/~/project/super/code/aiida-core/aiida/engine/launch.py:103), in submit(process, **inputs)
    100 assert runner.persister is not None, 'runner does not have a persister'
    101 assert runner.controller is not None, 'runner does not have a persister'
--> 103 process_inited = instantiate_process(runner, process, **inputs)
    105 # If a dry run is requested, simply forward to `run`, because it is not compatible with `submit`. We choose for this
    106 # instead of raising, because in this way the user does not have to change the launcher when testing. The same goes
    107 # for if `remote_folder` is present in the inputs, which means we are importing an already completed calculation.
    108 if process_inited.metadata.get('dry_run', False) or 'remote_folder' in inputs:

File [~/project/super/code/aiida-core/aiida/engine/utils.py:64](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/~/project/super/code/aiida-core/aiida/engine/utils.py:64), in instantiate_process(runner, process, **inputs)
     61 else:
     62     raise ValueError(f'invalid process {type(process)}, needs to be Process or ProcessBuilder')
---> 64 process = process_class(runner=runner, inputs=inputs)
     66 return process

File [~/.virtualenvs/super/lib/python3.9/site-packages/plumpy/base/state_machine.py:195](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/~/.virtualenvs/super/lib/python3.9/site-packages/plumpy/base/state_machine.py:195), in StateMachineMeta.__call__(cls, *args, **kwargs)
    193 inst = super().__call__(*args, **kwargs)
    194 inst.transition_to(inst.create_initial_state())
--> 195 call_with_super_check(inst.init)
    196 return inst

File [~/.virtualenvs/super/lib/python3.9/site-packages/plumpy/base/utils.py:29](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/~/.virtualenvs/super/lib/python3.9/site-packages/plumpy/base/utils.py:29), in call_with_super_check(wrapped, *args, **kwargs)
     27 call_count = getattr(self, '_called', 0)
     28 self._called = call_count + 1
---> 29 wrapped(*args, **kwargs)
     30 msg = f"Base '{wrapped.__name__}' was not called from '{self.__class__}'\nHint: Did you forget to call the super?"
     31 assert self._called == call_count, msg

File [~/project/super/code/aiida-core/aiida/engine/processes/process.py:187](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/~/project/super/code/aiida-core/aiida/engine/processes/process.py:187), in Process.init(self)
    186 def init(self) -> None:
--> 187     super().init()
    188     if self._logger is None:
    189         self.set_logger(self.node.logger)

File [~/.virtualenvs/super/lib/python3.9/site-packages/plumpy/base/utils.py:16](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/~/.virtualenvs/super/lib/python3.9/site-packages/plumpy/base/utils.py:16), in super_check..wrapper(self, *args, **kwargs)
     14 msg = f"The function '{wrapped.__name__}' was not called through call_with_super_check"
     15 assert getattr(self, '_called', 0) >= 1, msg
---> 16 wrapped(self, *args, **kwargs)
     17 self._called -= 1

File [~/.virtualenvs/super/lib/python3.9/site-packages/plumpy/processes.py:303](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/~/.virtualenvs/super/lib/python3.9/site-packages/plumpy/processes.py:303), in Process.init(self)
    301 if self._communicator is not None:
    302     try:
--> 303         identifier = self._communicator.add_rpc_subscriber(self.message_receive, identifier=str(self.pid))
    304         self.add_cleanup(functools.partial(self._communicator.remove_rpc_subscriber, identifier))
    305     except kiwipy.TimeoutError:

File [~/.virtualenvs/super/lib/python3.9/site-packages/plumpy/communications.py:141](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/~/.virtualenvs/super/lib/python3.9/site-packages/plumpy/communications.py:141), in LoopCommunicator.add_rpc_subscriber(self, subscriber, identifier)
    139 def add_rpc_subscriber(self, subscriber: 'RpcSubscriber', identifier: Optional['ID_TYPE'] = None) -> 'ID_TYPE':
    140     converted = convert_to_comm(subscriber, self._loop)
--> 141     return self._communicator.add_rpc_subscriber(converted, identifier)

File [~/.virtualenvs/super/lib/python3.9/site-packages/kiwipy/rmq/threadcomms.py:215](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/~/.virtualenvs/super/lib/python3.9/site-packages/kiwipy/rmq/threadcomms.py:215), in RmqThreadCommunicator.add_rpc_subscriber(self, subscriber, identifier)
    213 def add_rpc_subscriber(self, subscriber, identifier=None):
    214     self._ensure_open()
--> 215     return self._loop_scheduler.await_(
    216         self._communicator.add_rpc_subscriber(self._wrap_subscriber(subscriber), identifier)
    217     )

File [~/.virtualenvs/super/lib/python3.9/site-packages/pytray/aiothreads.py:164](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/~/.virtualenvs/super/lib/python3.9/site-packages/pytray/aiothreads.py:164), in LoopScheduler.await_(self, awaitable, name)
    153 """
    154 Await an awaitable on the event loop and return the result.  It may take a little time for
    155 the loop to get around to scheduling it, so we use a timeout as set by the TASK_TIMEOUT class
   (...)
    161 :return: the result of running the coroutine
    162 """
    163 try:
--> 164     return self.await_submit(awaitable).result(timeout=self.task_timeout)
    165 except concurrent.futures.TimeoutError as exc:
    166     # Try to get a reasonable name for the awaitable
    167     name = name or getattr(awaitable, "__name__", "Awaitable")

File [/opt/homebrew/Cellar/python](https://file+.vscode-resource.vscode-cdn.net/opt/homebrew/Cellar/python)@3.9[/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py:446](https://file+.vscode-resource.vscode-cdn.net/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py:446), in Future.result(self, timeout)
    444     raise CancelledError()
    445 elif self._state == FINISHED:
--> 446     return self.__get_result()
    447 else:
    448     raise TimeoutError()

File [/opt/homebrew/Cellar/python](https://file+.vscode-resource.vscode-cdn.net/opt/homebrew/Cellar/python)@3.9[/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py:391](https://file+.vscode-resource.vscode-cdn.net/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py:391), in Future.__get_result(self)
    389 if self._exception:
    390     try:
--> 391         raise self._exception
    392     finally:
    393         # Break a reference cycle with the exception in self._exception
    394         self = None

File [/opt/homebrew/Cellar/python](https://file+.vscode-resource.vscode-cdn.net/opt/homebrew/Cellar/python)@3.9[/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py:258](https://file+.vscode-resource.vscode-cdn.net/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py:258), in Task.__step(***failed resolving arguments***)
    256         result = coro.send(None)
    257     else:
--> 258         result = coro.throw(exc)
    259 except StopIteration as exc:
    260     if self._must_cancel:
    261         # Task is cancelled right before coro stops.

File [~/.virtualenvs/super/lib/python3.9/site-packages/pytray/aiothreads.py:178](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/~/.virtualenvs/super/lib/python3.9/site-packages/pytray/aiothreads.py:178), in LoopScheduler.await_submit..coro()
    177 async def coro():
--> 178     res = await awaitable
    179     if asyncio.isfuture(res):
    180         future = ThreadFuture()

File [~/.virtualenvs/super/lib/python3.9/site-packages/kiwipy/rmq/communicator.py:481](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/~/.virtualenvs/super/lib/python3.9/site-packages/kiwipy/rmq/communicator.py:481), in RmqCommunicator.add_rpc_subscriber(self, subscriber, identifier)
    480 async def add_rpc_subscriber(self, subscriber, identifier=None):
--> 481     msg_subscriber = await self.get_message_subscriber()
    482     identifier = await msg_subscriber.add_rpc_subscriber(subscriber, identifier)
    483     return identifier

File [~/.virtualenvs/super/lib/python3.9/site-packages/kiwipy/rmq/communicator.py:427](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/~/.virtualenvs/super/lib/python3.9/site-packages/kiwipy/rmq/communicator.py:427), in RmqCommunicator.get_message_subscriber(self)
    418 if self._message_subscriber is None:
    419     subscriber = RmqSubscriber(
    420         self._connection,
    421         message_exchange=self._message_exchange,
   (...)
    425         testing_mode=self._testing_mode
    426     )
--> 427     await subscriber.connect()
    428     self._message_subscriber = subscriber
    430 return self._message_subscriber

File [~/.virtualenvs/super/lib/python3.9/site-packages/kiwipy/rmq/communicator.py:177](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/~/.virtualenvs/super/lib/python3.9/site-packages/kiwipy/rmq/communicator.py:177), in RmqSubscriber.connect(self)
    174 if self._testing_mode:
    175     exchange_params.setdefault('auto_delete', self._testing_mode)
--> 177 self._channel = await self._connection.channel()
    178 self._exchange = await self._channel.declare_exchange(name=self._exchange_name, **exchange_params)
    180 await self._create_broadcast_queue()

File [~/.virtualenvs/super/lib/python3.9/site-packages/aio_pika/channel.py:127](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/~/.virtualenvs/super/lib/python3.9/site-packages/aio_pika/channel.py:127), in Channel.__await__(self)
    126 def __await__(self):
--> 127     yield from self.initialize().__await__()
    128     return self

File [~/.virtualenvs/super/lib/python3.9/site-packages/aio_pika/robust_channel.py:87](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/~/.virtualenvs/super/lib/python3.9/site-packages/aio_pika/robust_channel.py:87), in RobustChannel.initialize(self, timeout)
     86 async def initialize(self, timeout: TimeoutType = None) -> None:
---> 87     await super().initialize(timeout)
     88     self.add_close_callback(self._on_channel_close)

File [~/.virtualenvs/super/lib/python3.9/site-packages/aio_pika/channel.py:172](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/~/.virtualenvs/super/lib/python3.9/site-packages/aio_pika/channel.py:172), in Channel.initialize(self, timeout)
    169 if self._channel is not None:
    170     raise RuntimeError("Can't initialize channel")
--> 172 self._channel = await asyncio.wait_for(
    173     self._create_channel(), timeout=timeout,
    174 )
    176 self._delivery_tag = 0
    178 if self.default_exchange is None:

File [/opt/homebrew/Cellar/python](https://file+.vscode-resource.vscode-cdn.net/opt/homebrew/Cellar/python)@3.9[/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py:442](https://file+.vscode-resource.vscode-cdn.net/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py:442), in wait_for(fut, timeout, loop)
    437     warnings.warn("The loop argument is deprecated since Python 3.8, "
    438                   "and scheduled for removal in Python 3.10.",
    439                   DeprecationWarning, stacklevel=2)
    441 if timeout is None:
--> 442     return await fut
    444 if timeout <= 0:
    445     fut = ensure_future(fut, loop=loop)

File [~/.virtualenvs/super/lib/python3.9/site-packages/aio_pika/channel.py:162](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/~/.virtualenvs/super/lib/python3.9/site-packages/aio_pika/channel.py:162), in Channel._create_channel(self)
    159 async def _create_channel(self) -> aiormq.Channel:
    160     await self._connection.ready()
--> 162     return await self._connection.connection.channel(
    163         publisher_confirms=self._publisher_confirms,
    164         on_return_raises=self._on_return_raises,
    165         channel_number=self._channel_number,
    166     )

File [~/.virtualenvs/super/lib/python3.9/site-packages/aiormq/connection.py:527](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/~/.virtualenvs/super/lib/python3.9/site-packages/aiormq/connection.py:527), in Connection.channel(self, channel_number, publisher_confirms, frame_buffer, **kwargs)
    524 self.channels[channel_number] = channel
    526 try:
--> 527     await channel.open()
    528 except Exception:
    529     self.channels[channel_number] = None

File [~/.virtualenvs/super/lib/python3.9/site-packages/aiormq/channel.py:174](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/~/.virtualenvs/super/lib/python3.9/site-packages/aiormq/channel.py:174), in Channel.open(self)
    173 async def open(self):
--> 174     frame = await self.rpc(spec.Channel.Open())
    176     if self.publisher_confirms:
    177         await self.rpc(spec.Confirm.Select())

File [~/.virtualenvs/super/lib/python3.9/site-packages/aiormq/base.py:168](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/~/.virtualenvs/super/lib/python3.9/site-packages/aiormq/base.py:168), in task..wrap(self, *args, **kwargs)
    165 @wraps(func)
    166 async def wrap(self: "Base", *args, **kwargs):
    167     # noinspection PyCallingNonCallable
--> 168     return await self.create_task(func(self, *args, **kwargs))

File [~/.virtualenvs/super/lib/python3.9/site-packages/aiormq/base.py:25](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/~/.virtualenvs/super/lib/python3.9/site-packages/aiormq/base.py:25), in TaskWrapper.__inner(self)
     23 async def __inner(self):
     24     try:
---> 25         return await self.task
     26     except asyncio.CancelledError as e:
     27         raise self.exception from e

File [/opt/homebrew/Cellar/python](https://file+.vscode-resource.vscode-cdn.net/opt/homebrew/Cellar/python)@3.9[/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/futures.py:284](https://file+.vscode-resource.vscode-cdn.net/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/futures.py:284), in Future.__await__(self)
    282 if not self.done():
    283     self._asyncio_future_blocking = True
--> 284     yield self  # This tells Task to wait for completion.
    285 if not self.done():
    286     raise RuntimeError("await wasn't used with future")

File [/opt/homebrew/Cellar/python](https://file+.vscode-resource.vscode-cdn.net/opt/homebrew/Cellar/python)@3.9[/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py:328](https://file+.vscode-resource.vscode-cdn.net/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py:328), in Task.__wakeup(self, future)
    326 def __wakeup(self, future):
    327     try:
--> 328         future.result()
    329     except BaseException as exc:
    330         # This may also be a cancellation.
    331         self.__step(exc)

File [/opt/homebrew/Cellar/python](https://file+.vscode-resource.vscode-cdn.net/opt/homebrew/Cellar/python)@3.9[/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/futures.py:201](https://file+.vscode-resource.vscode-cdn.net/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/futures.py:201), in Future.result(self)
    199 self.__log_traceback = False
    200 if self._exception is not None:
--> 201     raise self._exception
    202 return self._result

File [/opt/homebrew/Cellar/python](https://file+.vscode-resource.vscode-cdn.net/opt/homebrew/Cellar/python)@3.9[/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py:256](https://file+.vscode-resource.vscode-cdn.net/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py:256), in Task.__step(***failed resolving arguments***)
    252 try:
    253     if exc is None:
    254         # We use the `send` method directly, because coroutines
    255         # don't have `__iter__` and `__next__` methods.
--> 256         result = coro.send(None)
    257     else:
    258         result = coro.throw(exc)

File [~/.virtualenvs/super/lib/python3.9/site-packages/aiormq/channel.py:121](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/~/.virtualenvs/super/lib/python3.9/site-packages/aiormq/channel.py:121), in Channel.rpc(self, frame, timeout)
    118     return value
    120 if self.writer is None:
--> 121     raise ChannelInvalidStateError("writer is None")
    123 lock = self.lock
    125 try:

ChannelInvalidStateError: writer is None

Also note that the problem was present across different environments.

@sphuber
Copy link
Contributor

sphuber commented May 14, 2023

That exception actually is familiar, see #4595 . I think this is due to an instability in the connection with RabbitMQ that is managed by aio-pika and aiormq. They both have significantly more recent versions with fixes to the connection stability, as well as auto-reconnect in case it is lost. I suspect that this could help a lot in dealing with these problems. I have prepared a branch that has been open for a very long time (see #5732) but I haven't been able to merge it since there is one test that fails, namely the one shutting down the communicator. I have been debugging this for hours but couldn't solve it and neither could @muhrin for the time being. If you'd like, I could rebase it to make it up to date with the latest version, and maybe you can give it a go? All the tests are running except the one that shuts the daemon. So running should work fine in principle, and it would provide valuable information if you wouldn't see these problems anymore when submitting.

@mbercx
Copy link
Member Author

mbercx commented May 14, 2023

If you'd like, I could rebase it to make it up to date with the latest version, and maybe you can give it a go?

It would be a good opportunity for me to get more familiar with these tools and the engine, but doing so would most likely take more time than I can commit to at the moment. I'll try and pick this up once I've checked some boxes, if you haven't fixed it by then of course. ^^

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants