Skip to content

Commit

Permalink
Mirroring of workflows and calculations works
Browse files Browse the repository at this point in the history
Either in groups, or not associated with any group.
Either sorted by groups, or in a top-level flat hierarchy.
"De-duplication" works by symlinking calculations if they are part
of a workflow.

Next, check what happens if a workflow is part of two groups -> Here,
de-deplucation should actually make more sense.
  • Loading branch information
GeigerJ2 committed Jan 23, 2025
1 parent 93c36c7 commit 1c4b67b
Show file tree
Hide file tree
Showing 7 changed files with 179 additions and 406 deletions.
1 change: 1 addition & 0 deletions docs/source/reference/command_line.rst
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,7 @@ Below is a list with all available subcommands.
configure-rabbitmq Configure RabbitMQ for a profile.
delete Delete one or more profiles.
list Display a list of all available profiles.
mirror Dump all data in an AiiDA profile's storage to disk.
set-default Set a profile as the default profile.
setdefault (Deprecated) Set a profile as the default profile.
setup Set up a new profile.
Expand Down
132 changes: 43 additions & 89 deletions src/aiida/cmdline/commands/cmd_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,39 +277,36 @@ def profile_delete(force, delete_data, profiles):
@verdi_profile.command('mirror')
@options.PATH()
@options.OVERWRITE()
@options.INCREMENTAL()
@options.ORGANIZE_BY_GROUPS()
@options.DRY_RUN()
# @options.INCREMENTAL()
@options.DUMP_PROCESSES()
@options.ONLY_TOP_LEVEL_WORKFLOWS()
@options.DUMP_DATA()
@options.DEDUPLICATE()
@options.DATA_HIDDEN()
@options.ALSO_RAW()
@options.ALSO_RICH()
@options.INCLUDE_INPUTS()
@options.INCLUDE_OUTPUTS()
@options.INCLUDE_ATTRIBUTES()
@options.INCLUDE_EXTRAS()
@options.FLAT()
@options.ALSO_RAW()
@options.ALSO_RICH()
@options.RICH_SPEC()
@options.RICH_DUMP_ALL()
@options.DUMP_CONFIG_FILE()
@options.NODES()
@options.GROUPS()
@options.ORGANIZE_BY_GROUPS()
@options.ONLY_TOP_LEVEL_WORKFLOWS()
@options.DRY_RUN()
@click.pass_context
def storage_mirror(
def profile_mirror(
ctx,
path,
overwrite,
incremental,
organize_by_groups,
dry_run,
dump_processes,
only_top_level_workflows,
dump_data,
deduplicate,
data_hidden,
also_raw,
also_rich,
include_inputs,
Expand All @@ -325,6 +322,7 @@ def storage_mirror(
):
"""Dump all data in an AiiDA profile's storage to disk."""

from pathlib import Path

from aiida import orm
from aiida.tools.dumping.parser import DumpConfigParser
Expand All @@ -337,39 +335,14 @@ def storage_mirror(

profile = ctx.obj['profile']

# from aiida.manage.manager import get_manager

# manager = get_manager()
# storage = manager.get_profile_storage()

# with spinner():
# data = storage.get_info(detailed=True)

# echo.echo_dictionary(data, sort_keys=False, fmt='yaml')

# print(f"Profile name: {profile_name}")

# # TODO: export computers alone, and groups
# t1 = time.time()
# qb = orm.QueryBuilder().append(orm.Node, tag='node', project=['uuid'])
# all_uuids = qb.all(flat=True)
# print(f"All UUIDs retrieved in {time.time() - t1:6.3f} s.")

# t1 = time.time()
# with open('all-source-uuids.json', 'w') as fhandle:
# json.dump({'profile_name': profile_name, 'uuids': all_uuids}, fhandle)
# print(f"{len(all_uuids)} UUIDs written in {time.time() - t1:6.3f} s.")

if nodes and groups:
echo.echo_critical('`nodes` and `groups` specified. Set only one.')
# if all_entries and groups:
# echo.echo_critical('`all_entries` and `groups` specified. Set only one.')

if dump_config_file is None:
general_kwargs = {
'path': path,
'overwrite': overwrite,
'incremental': incremental,
# 'incremental': incremental,
'dry_run': dry_run,
}

Expand All @@ -379,19 +352,18 @@ def storage_mirror(
'include_attributes': include_attributes,
'include_extras': include_extras,
'flat': flat,
# "calculations_hidden": calculations_hidden
}

datadumper_kwargs = {
'also_raw': also_raw,
'also_rich': also_rich,
'data_hidden': data_hidden,
}

collection_kwargs = {
'should_dump_processes': dump_processes,
'should_dump_data': dump_data,
'only_top_level_workflows': only_top_level_workflows,
'organize_by_groups': organize_by_groups,
}

rich_kwargs = {
Expand Down Expand Up @@ -420,13 +392,10 @@ def storage_mirror(
path = general_kwargs['path']
overwrite = general_kwargs['overwrite']
dry_run = general_kwargs['dry_run']
incremental = general_kwargs['incremental']

if not overwrite and incremental:
echo.echo_report('Overwrite set to false, but incremental dumping selected. Will keep existing directories.')
incremental = not overwrite

if not str(path).endswith(profile.name):
path /= profile.name
if path is None:
path = Path.cwd() / f'{profile.name}-mirror'

# TODO: Implement proper dry-run feature
dry_run_message = f"Dry run for dumping of profile `{profile.name}`'s data at path: `{path}`.\n"
Expand All @@ -439,7 +408,7 @@ def storage_mirror(
else:
echo.echo_report(f"Dumping of profile `{profile.name}`'s data at path: `{path}`.")

SAFEGUARD_FILE = '.verdi_storage_dump' # noqa: N806
SAFEGUARD_FILE = '.verdi_profile_mirror' # noqa: N806

try:
prepare_dump_path(
Expand Down Expand Up @@ -471,25 +440,15 @@ def storage_mirror(
)
# dumper_pretty_print(process_dumper)

from aiida.tools.dumping.incremental import DumpNodeCollector

dumpnodecollector = DumpNodeCollector(dump_parent_path=path)

dumpnodecollector.update_uuids_before_dump()
dumpnodecollector.create_organized_uuid_dicts()
# dumpnodecollector.populate_uuid_dict()

# raise SystemExit()

# TODO: Possibly implement specifying specific computers
# TODO: Although, users could just specify the relevant nodes
# TODO: Also add option to specify node types via entry points
# TODO: Use `batch_iter` from aiida.tools.archive.common

# === Dump the data that is not associated with any group ===
if not groups:
collection_dumper = CollectionDumper(
dump_parent_path=path,
output_path=path,
overwrite=overwrite,
incremental=incremental,
nodes=nodes,
Expand All @@ -505,13 +464,13 @@ def storage_mirror(
if dump_processes and collection_dumper._should_dump_processes():
echo.echo_report(f'Dumping processes not in any group for profile `{profile.name}`...')
collection_dumper.dump_processes()

if dump_data:
if not also_rich and not also_raw:
echo.echo_critical('`--dump-data was given, but neither --also-raw or --also-rich specified.')
echo.echo_report(f'Dumping data not in any group for profile {profile.name}...')

collection_dumper.dump_data_rich()
# collection_dumper.dump_plugin_data()
# collection_dumper.dump_data_rich()

# === Dump data per-group if Groups exist in profile or are selected ===
# TODO: Invert default behavior here, as I typically want to dump all entries
Expand All @@ -520,34 +479,29 @@ def storage_mirror(
if not groups: # and all_entries:
groups = orm.QueryBuilder().append(orm.Group).all(flat=True)

if groups is not None and not nodes:
for group in groups:
if organize_by_groups:
group_subdir = Path(*group.type_string.split('.'))
group_path = path / 'groups' / group_subdir / group.label
else:
group_path = path

collection_dumper = CollectionDumper(
dump_parent_path=path,
output_path=group_path,
overwrite=overwrite,
incremental=incremental,
group=group,
**collection_kwargs,
**rich_kwargs,
process_dumper=process_dumper,
data_dumper=data_dumper,
)
collection_dumper.create_entity_counter()
if dump_processes:
# The additional `_should_dump_processes` check here ensures that no reporting like
# "Dumping processes for group `SSSP/1.3/PBE/efficiency`" is printed for groups that
# don't contain processes
if collection_dumper._should_dump_processes():
echo.echo_report(f'Dumping processes for group `{group.label}`...')
collection_dumper.dump_processes()
if dump_data:
echo.echo_report(f'Dumping data for group `{group.label}`...')
collection_dumper.dump_data_rich()
# collection_dumper.dump_plugin_data()
if groups:
if not nodes:
for group in groups:
collection_dumper = CollectionDumper(
dump_parent_path=path,
overwrite=overwrite,
incremental=incremental,
group=group,
**collection_kwargs,
**rich_kwargs,
process_dumper=process_dumper,
data_dumper=data_dumper,
deduplicate=deduplicate,
)

collection_dumper.create_entity_counter()
if dump_processes:
# The additional `_should_dump_processes` check here ensures that no reporting like
# "Dumping processes for group `SSSP/1.3/PBE/efficiency`" is printed for groups that
# don't contain processes
if collection_dumper._should_dump_processes():
echo.echo_report(f'Dumping processes for group `{group.label}`...')
collection_dumper.dump_processes()
if dump_data:
echo.echo_report(f'Dumping data for group `{group.label}`...')
collection_dumper.dump_data_rich()
Loading

0 comments on commit 1c4b67b

Please sign in to comment.