Skip to content

Commit

Permalink
Transport: AsyncTransport plugin (#6626)
Browse files Browse the repository at this point in the history
# Short

* Transport: `AsyncTransport`  plugin with entry point `core.ssh_async`
* `TransportPath` new Path Type , so that transport plugins support more types -- previously str only.

# Long

This PR proposes many changes to make transport tasks asynchronous. This ensures that the daemon won’t be blocked by time-consuming tasks such as uploads, downloads, and similar operations, requested by @giovannipizzi. 

 Here’s a summary of the main updates:

- **New Transport Plugin:** Introduces `AsyncSshTransport` with the entry point `core.ssh_async`.
- **Enhanced Authentication:** `AsyncSshTransport` supports executing custom scripts before connections, which is particularly useful for authentication. 🥇
- **Engine Updates:** Modifies the engine to consistently call asynchronous transport methods.
- **Backward Compatibility:** Provides synchronous counterparts for all asynchronous methods in `AsyncSshTransport`.
- **Transport Class Overhaul:** Major changes to `Transport` class. Introduces `BlockingTransport`, and `AsyncTransport` as helper class for plugin developers.
- **Improved Documentation:** Adds more docstrings and comments to guide plugin developers. Blocking plugins should inherit from `Transport`, while asynchronous ones should inherit from `AsyncSshTransport`.
- **New Path Type:** Defines a `TransportPath` type and upgrades transport plugins to work with `Union[str, Path, PurePosixPath]`.
- **New Feature:** Introduces `copy_from_remote_to_remote_async`, addressing a previous issue where such tasks blocked the entire daemon.

## Test Results: Performance Comparisons

### **When `core.ssh_async` Outperforms**
In scenarios where the daemon is blocked by heavy transfer tasks (uploading/downloading/copying large files), `core.ssh_async` shows significant improvement. 

For example, I submitted two WorkGraphs:
1. The first handles heavy transfers:
   - Upload 10 MB
   - Remote copy 1 GB
   - Retrieve 1 GB
2. The second performs a simple shell command: `touch file`.

The time taken until the submit command is processed (with one daemon running):
- **`core.ssh_async`:** **Only 4 seconds!** 🚀🚀🚀🚀 *A major improvement!*
- **`core.ssh`:** **108 seconds** (WorkGraph 1 fully completes before processing the second).

### **When `core.ssh_async` and `core.ssh` Are Comparable**
For tasks involving both (and many!) uploads and downloads (a common scenario), performance varies slightly depending on the case.

- **Large Files (~1 GB):**
  - `core.ssh_async` performs better due to simultaneous uploads and downloads. In some networks, this can almost double the bandwidth, as demonstrated in the graph in on the PR #6626. My bandwidth is 11.8 MB/s but increased to nearly double under favorable conditions. However, under heavy network load, bandwidth may revert to its base level (e.g., 11.8 MB/s)

    **Test Case:** Two WorkGraphs: one uploads 1 GB, the other retrieves 1 GB using `RemoteData`.
    - `core.ssh_async`: **120 seconds**
    - `core.ssh`: **204 seconds**

- **Small Files (Many Small Transfers):**
  - **Test Case:** 25 WorkGraphs each transferring a few 1 MB files.
    - `core.ssh_async`: **105 seconds**
    - `core.ssh`: **65 seconds**

  In this scenario, however, the overhead of asynchronous calls seems to outweigh the benefits. We need to discuss the trade-offs and explore possible optimizations. As @agoscinski mentioned, this might be expected, see here [async overheads](https://stackoverflow.com/questions/55761652/what-is-the-overhead-of-an-asyncio-task).
  • Loading branch information
khsrali authored Jan 23, 2025
1 parent d0c9572 commit eba6954
Show file tree
Hide file tree
Showing 22 changed files with 2,925 additions and 521 deletions.
1 change: 1 addition & 0 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ dependencies:
- python~=3.9
- alembic~=1.2
- archive-path~=0.4.2
- asyncssh~=2.19.0
- circus~=0.18.0
- click-spinner~=0.1.8
- click~=8.1
Expand Down
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ classifiers = [
dependencies = [
'alembic~=1.2',
'archive-path~=0.4.2',
"asyncssh~=2.19.0",
'circus~=0.18.0',
'click-spinner~=0.1.8',
'click~=8.1',
Expand Down Expand Up @@ -175,6 +176,7 @@ requires-python = '>=3.9'
[project.entry-points.'aiida.transports']
'core.local' = 'aiida.transports.plugins.local:LocalTransport'
'core.ssh' = 'aiida.transports.plugins.ssh:SshTransport'
'core.ssh_async' = 'aiida.transports.plugins.ssh_async:AsyncSshTransport'
'core.ssh_auto' = 'aiida.transports.plugins.ssh_auto:SshAutoTransport'

[project.entry-points.'aiida.workflows']
Expand Down Expand Up @@ -308,6 +310,7 @@ module = 'tests.*'
ignore_missing_imports = true
module = [
'ase.*',
'asyncssh.*',
'bpython.*',
'bs4.*',
'CifFile.*',
Expand Down Expand Up @@ -388,6 +391,7 @@ testpaths = [
'tests'
]
timeout = 240
timeout_method = "thread"
xfail_strict = true

[tool.ruff]
Expand Down
64 changes: 32 additions & 32 deletions src/aiida/engine/daemon/execmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ async def upload_calculation(
if dry_run:
workdir = Path(folder.abspath)
else:
remote_user = transport.whoami()
remote_user = await transport.whoami_async()
remote_working_directory = computer.get_workdir().format(username=remote_user)
if not remote_working_directory.strip():
raise exceptions.ConfigurationError(
Expand All @@ -114,13 +114,13 @@ async def upload_calculation(
)

# If it already exists, no exception is raised
if not transport.path_exists(remote_working_directory):
if not await transport.path_exists_async(remote_working_directory):
logger.debug(
f'[submission of calculation {node.pk}] Path '
f'{remote_working_directory} does not exist, trying to create it'
)
try:
transport.makedirs(remote_working_directory)
await transport.makedirs_async(remote_working_directory)
except EnvironmentError as exc:
raise exceptions.ConfigurationError(
f'[submission of calculation {node.pk}] '
Expand All @@ -133,14 +133,14 @@ async def upload_calculation(
# and I do not have to know the logic, but I just need to
# read the absolute path from the calculation properties.
workdir = Path(remote_working_directory).joinpath(calc_info.uuid[:2], calc_info.uuid[2:4])
transport.makedirs(str(workdir), ignore_existing=True)
await transport.makedirs_async(workdir, ignore_existing=True)

try:
# The final directory may already exist, most likely because this function was already executed once, but
# failed and as a result was rescheduled by the engine. In this case it would be fine to delete the folder
# and create it from scratch, except that we cannot be sure that this the actual case. Therefore, to err on
# the safe side, we move the folder to the lost+found directory before recreating the folder from scratch
transport.mkdir(str(workdir.joinpath(calc_info.uuid[4:])))
await transport.mkdir_async(workdir.joinpath(calc_info.uuid[4:]))
except OSError:
# Move the existing directory to lost+found, log a warning and create a clean directory anyway
path_existing = os.path.join(str(workdir), calc_info.uuid[4:])
Expand All @@ -151,12 +151,12 @@ async def upload_calculation(
)

# Make sure the lost+found directory exists, then copy the existing folder there and delete the original
transport.mkdir(path_lost_found, ignore_existing=True)
transport.copytree(path_existing, path_target)
transport.rmtree(path_existing)
await transport.mkdir_async(path_lost_found, ignore_existing=True)
await transport.copytree_async(path_existing, path_target)
await transport.rmtree_async(path_existing)

# Now we can create a clean folder for this calculation
transport.mkdir(str(workdir.joinpath(calc_info.uuid[4:])))
await transport.mkdir_async(workdir.joinpath(calc_info.uuid[4:]))
finally:
workdir = workdir.joinpath(calc_info.uuid[4:])

Expand All @@ -171,11 +171,11 @@ async def upload_calculation(
# Note: this will possibly overwrite files
for root, dirnames, filenames in code.base.repository.walk():
# mkdir of root
transport.makedirs(str(workdir.joinpath(root)), ignore_existing=True)
await transport.makedirs_async(workdir.joinpath(root), ignore_existing=True)

# remotely mkdir first
for dirname in dirnames:
transport.makedirs(str(workdir.joinpath(root, dirname)), ignore_existing=True)
await transport.makedirs_async(workdir.joinpath(root, dirname), ignore_existing=True)

# Note, once #2579 is implemented, use the `node.open` method instead of the named temporary file in
# combination with the new `Transport.put_object_from_filelike`
Expand All @@ -185,11 +185,11 @@ async def upload_calculation(
content = code.base.repository.get_object_content(Path(root) / filename, mode='rb')
handle.write(content)
handle.flush()
transport.put(handle.name, str(workdir.joinpath(root, filename)))
await transport.put_async(handle.name, workdir.joinpath(root, filename))
if code.filepath_executable.is_absolute():
transport.chmod(str(code.filepath_executable), 0o755) # rwxr-xr-x
await transport.chmod_async(code.filepath_executable, 0o755) # rwxr-xr-x
else:
transport.chmod(str(workdir.joinpath(code.filepath_executable)), 0o755) # rwxr-xr-x
await transport.chmod_async(workdir.joinpath(code.filepath_executable), 0o755) # rwxr-xr-x

# local_copy_list is a list of tuples, each with (uuid, dest_path, rel_path)
# NOTE: validation of these lists are done inside calculation.presubmit()
Expand Down Expand Up @@ -288,7 +288,7 @@ async def _copy_remote_files(logger, node, computer, transport, remote_copy_list
f'remotely, directly on the machine {computer.label}'
)
try:
transport.copy(remote_abs_path, str(workdir.joinpath(dest_rel_path)))
await transport.copy_async(remote_abs_path, workdir.joinpath(dest_rel_path))
except FileNotFoundError:
logger.warning(
f'[submission of calculation {node.pk}] Unable to copy remote '
Expand All @@ -314,8 +314,8 @@ async def _copy_remote_files(logger, node, computer, transport, remote_copy_list
)
remote_dirname = Path(dest_rel_path).parent
try:
transport.makedirs(str(workdir.joinpath(remote_dirname)), ignore_existing=True)
transport.symlink(remote_abs_path, str(workdir.joinpath(dest_rel_path)))
await transport.makedirs_async(workdir.joinpath(remote_dirname), ignore_existing=True)
await transport.symlink_async(remote_abs_path, workdir.joinpath(dest_rel_path))
except OSError:
logger.warning(
f'[submission of calculation {node.pk}] Unable to create remote symlink '
Expand Down Expand Up @@ -356,14 +356,14 @@ async def _copy_local_files(logger, node, transport, inputs, local_copy_list, wo
# The logic below takes care of an edge case where the source is a file but the target is a directory. In
# this case, the v2.5.1 implementation would raise an `IsADirectoryError` exception, because it would try
# to open the directory in the sandbox folder as a file when writing the contents.
if file_type_source == FileType.FILE and target and transport.isdir(str(workdir.joinpath(target))):
if file_type_source == FileType.FILE and target and await transport.isdir_async(workdir.joinpath(target)):
raise IsADirectoryError

# In case the source filename is specified and it is a directory that already exists in the remote, we
# want to avoid nested directories in the target path to replicate the behavior of v2.5.1. This is done by
# setting the target filename to '.', which means the contents of the node will be copied in the top level
# of the temporary directory, whose contents are then copied into the target directory.
if filename and transport.isdir(str(workdir.joinpath(filename))):
if filename and await transport.isdir_async(workdir.joinpath(filename)):
filename_target = '.'

filepath_target = (dirpath / filename_target).resolve().absolute()
Expand All @@ -372,25 +372,25 @@ async def _copy_local_files(logger, node, transport, inputs, local_copy_list, wo
if file_type_source == FileType.DIRECTORY:
# If the source object is a directory, we copy its entire contents
data_node.base.repository.copy_tree(filepath_target, filename_source)
transport.put(
await transport.put_async(
f'{dirpath}/*',
str(workdir.joinpath(target)) if target else str(workdir.joinpath('.')),
workdir.joinpath(target) if target else workdir.joinpath('.'),
overwrite=True,
)
else:
# Otherwise, simply copy the file
with filepath_target.open('wb') as handle:
with data_node.base.repository.open(filename_source, 'rb') as source:
shutil.copyfileobj(source, handle)
transport.makedirs(str(workdir.joinpath(Path(target).parent)), ignore_existing=True)
transport.put(str(filepath_target), str(workdir.joinpath(target)))
await transport.makedirs_async(workdir.joinpath(Path(target).parent), ignore_existing=True)
await transport.put_async(filepath_target, workdir.joinpath(target))


async def _copy_sandbox_files(logger, node, transport, folder, workdir: Path):
"""Copy the contents of the sandbox folder to the working directory."""
for filename in folder.get_content_list():
logger.debug(f'[submission of calculation {node.pk}] copying file/folder {filename}...')
transport.put(folder.get_abs_path(filename), str(workdir.joinpath(filename)))
await transport.put_async(folder.get_abs_path(filename), workdir.joinpath(filename))


def submit_calculation(calculation: CalcJobNode, transport: Transport) -> str | ExitCode:
Expand Down Expand Up @@ -461,7 +461,7 @@ async def stash_calculation(calculation: CalcJobNode, transport: Transport) -> N
for source_filename in source_list:
if transport.has_magic(source_filename):
copy_instructions = []
for globbed_filename in transport.glob(str(source_basepath / source_filename)):
for globbed_filename in await transport.glob_async(source_basepath / source_filename):
target_filepath = target_basepath / Path(globbed_filename).relative_to(source_basepath)
copy_instructions.append((globbed_filename, target_filepath))
else:
Expand All @@ -470,10 +470,10 @@ async def stash_calculation(calculation: CalcJobNode, transport: Transport) -> N
for source_filepath, target_filepath in copy_instructions:
# If the source file is in a (nested) directory, create those directories first in the target directory
target_dirname = target_filepath.parent
transport.makedirs(str(target_dirname), ignore_existing=True)
await transport.makedirs_async(target_dirname, ignore_existing=True)

try:
transport.copy(str(source_filepath), str(target_filepath))
await transport.copy_async(source_filepath, target_filepath)
except (OSError, ValueError) as exception:
EXEC_LOGGER.warning(f'failed to stash {source_filepath} to {target_filepath}: {exception}')
else:
Expand Down Expand Up @@ -612,7 +612,7 @@ async def retrieve_files_from_list(
upto what level of the original remotepath nesting the files will be copied.
:param transport: the Transport instance.
:param folder: an absolute path to a folder that contains the files to copy.
:param folder: an absolute path to a folder that contains the files to retrieve.
:param retrieve_list: the list of files to retrieve.
"""
workdir = Path(calculation.get_remote_workdir())
Expand All @@ -621,7 +621,7 @@ async def retrieve_files_from_list(
tmp_rname, tmp_lname, depth = item
# if there are more than one file I do something differently
if transport.has_magic(tmp_rname):
remote_names = transport.glob(str(workdir.joinpath(tmp_rname)))
remote_names = await transport.glob_async(workdir.joinpath(tmp_rname))
local_names = []
for rem in remote_names:
# get the relative path so to make local_names relative
Expand All @@ -644,7 +644,7 @@ async def retrieve_files_from_list(
abs_item = item if item.startswith('/') else str(workdir.joinpath(item))

if transport.has_magic(abs_item):
remote_names = transport.glob(abs_item)
remote_names = await transport.glob_async(abs_item)
local_names = [os.path.split(rem)[1] for rem in remote_names]
else:
remote_names = [abs_item]
Expand All @@ -656,6 +656,6 @@ async def retrieve_files_from_list(
if rem.startswith('/'):
to_get = rem
else:
to_get = str(workdir.joinpath(rem))
to_get = workdir.joinpath(rem)

transport.get(to_get, os.path.join(folder, loc), ignore_nonexisting=True)
await transport.get_async(to_get, os.path.join(folder, loc), ignore_nonexisting=True)
4 changes: 2 additions & 2 deletions src/aiida/orm/computers.py
Original file line number Diff line number Diff line change
Expand Up @@ -626,12 +626,12 @@ def get_transport(self, user: Optional['User'] = None) -> 'Transport':
"""Return a Transport class, configured with all correct parameters.
The Transport is closed (meaning that if you want to run any operation with
it, you have to open it first (i.e., e.g. for a SSH transport, you have
to open a connection). To do this you can call ``transports.open()``, or simply
to open a connection). To do this you can call ``transport.open()``, or simply
run within a ``with`` statement::
transport = Computer.get_transport()
with transport:
print(transports.whoami())
print(transport.whoami())
:param user: if None, try to obtain a transport for the default user.
Otherwise, pass a valid User.
Expand Down
3 changes: 2 additions & 1 deletion src/aiida/orm/nodes/data/remote/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ def listdir_withattributes(self, path='.'):
"""Connects to the remote folder and lists the directory content.
:param relpath: If 'relpath' is specified, lists the content of the given subfolder.
:return: a list of dictionaries, where the documentation is in :py:class:Transport.listdir_withattributes.
:return: a list of dictionaries, where the documentation
is in :py:class:Transport.listdir_withattributes.
"""
authinfo = self.get_authinfo()

Expand Down
3 changes: 2 additions & 1 deletion src/aiida/orm/nodes/process/calculation/calcjob.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,8 @@ def get_authinfo(self) -> 'AuthInfo':
def get_transport(self) -> 'Transport':
"""Return the transport for this calculation.
:return: `Transport` configured with the `AuthInfo` associated to the computer of this node
:return: Transport configured
with the `AuthInfo` associated to the computer of this node
"""
return self.get_authinfo().get_transport()

Expand Down
4 changes: 2 additions & 2 deletions src/aiida/plugins/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ def TransportFactory(entry_point_name: str, load: Literal[False]) -> EntryPoint:


def TransportFactory(entry_point_name: str, load: bool = True) -> Union[EntryPoint, Type['Transport']]:
"""Return the `Transport` sub class registered under the given entry point.
"""Return the Transport sub class registered under the given entry point.
:param entry_point_name: the entry point name.
:param load: if True, load the matched entry point and return the loaded resource instead of the entry point itself.
Expand All @@ -435,7 +435,7 @@ def TransportFactory(entry_point_name: str, load: bool = True) -> Union[EntryPoi
if not load:
return entry_point

if isclass(entry_point) and issubclass(entry_point, Transport):
if isclass(entry_point) and (issubclass(entry_point, Transport)):
return entry_point

raise_invalid_type_error(entry_point_name, entry_point_group, valid_classes)
Expand Down
2 changes: 1 addition & 1 deletion src/aiida/schedulers/plugins/direct.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ def _get_submit_command(self, submit_script):
directory.
IMPORTANT: submit_script should be already escaped.
"""
submit_command = f'bash {submit_script} > /dev/null 2>&1 & echo $!'
submit_command = f'(bash {submit_script} > /dev/null 2>&1 & echo $!) &'

self.logger.info(f'submitting with: {submit_command}')

Expand Down
2 changes: 2 additions & 0 deletions src/aiida/tools/pytest_fixtures/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
aiida_computer,
aiida_computer_local,
aiida_computer_ssh,
aiida_computer_ssh_async,
aiida_localhost,
ssh_key,
)
Expand All @@ -33,6 +34,7 @@
'aiida_computer',
'aiida_computer_local',
'aiida_computer_ssh',
'aiida_computer_ssh_async',
'aiida_config',
'aiida_config_factory',
'aiida_config_tmp',
Expand Down
32 changes: 32 additions & 0 deletions src/aiida/tools/pytest_fixtures/orm.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,38 @@ def factory(label: str | None = None, configure: bool = True) -> 'Computer':
return factory


@pytest.fixture
def aiida_computer_ssh_async(aiida_computer) -> t.Callable[[], 'Computer']:
"""Factory to return a :class:`aiida.orm.computers.Computer` instance with ``core.ssh_async`` transport.
Usage::
def test(aiida_computer_ssh):
computer = aiida_computer_ssh(label='some-label', configure=True)
assert computer.transport_type == 'core.ssh_async'
assert computer.is_configured
The factory has the following signature:
:param label: The computer label. If not specified, a random UUID4 is used.
:param configure: Boolean, if ``True``, ensures the computer is configured, otherwise the computer is returned
as is. Note that if a computer with the given label already exists and it was configured before, the
computer will not be "un-"configured. If an unconfigured computer is absolutely required, make sure to first
delete the existing computer or specify another label.
:return: A stored computer instance.
"""

def factory(label: str | None = None, configure: bool = True) -> 'Computer':
computer = aiida_computer(label=label, hostname='localhost', transport_type='core.ssh_async')

if configure:
computer.configure()

return computer

return factory


@pytest.fixture
def aiida_localhost(aiida_computer_local) -> 'Computer':
"""Return a :class:`aiida.orm.computers.Computer` instance representing localhost with ``core.local`` transport.
Expand Down
3 changes: 3 additions & 0 deletions src/aiida/transports/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@
from .transport import *

__all__ = (
'AsyncTransport',
'BlockingTransport',
'SshTransport',
'Transport',
'TransportPath',
'convert_to_bool',
'parse_sshconfig',
)
Expand Down
2 changes: 1 addition & 1 deletion src/aiida/transports/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def transport_options(transport_type):
"""Decorate a command with all options for a computer configure subcommand for transport_type."""

def apply_options(func):
"""Decorate the command functionn with the appropriate options for the transport type."""
"""Decorate the command function with the appropriate options for the transport type."""
options_list = list_transport_options(transport_type)
options_list.reverse()
func = arguments.COMPUTER(callback=partial(match_comp_transport, transport_type=transport_type))(func)
Expand Down
Loading

1 comment on commit eba6954

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance Alert ⚠️

Possible performance regression was detected for benchmark 'pytest-benchmarks:ubuntu-22.04,psql_dos'.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 2.

Benchmark suite Current: eba6954 Previous: d0c9572 Ratio
tests/benchmark/test_json_contains.py::test_wide_json[100-2] 19.41046971696955 iter/sec (stddev: 0.0011605) 569.015838428265 iter/sec (stddev: 0.000063974) 29.31

This comment was automatically generated by workflow using github-action-benchmark.

CC: @giovannipizzi @agoscinski @GeigerJ2 @khsrali @unkcpz

Please sign in to comment.