Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New logic for STAGE_INPUT valorization #70

Merged
merged 6 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 45 additions & 44 deletions cobrawap/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
sys.path.append(str(Path(inspect.getfile(lambda: None)).parent))
sys.path.append(str(Path(inspect.getfile(lambda: None)).parent / 'pipeline'))
from cmd_utils import get_setting, set_setting, get_initial_available_stages
from cmd_utils import is_profile_name_valid, create_new_configfile
from cmd_utils import input_profile, get_profile, setup_entry_stage
from cmd_utils import is_profile_name_valid, create_new_configfile
from cmd_utils import input_profile, get_profile, setup_entry_stage
from cmd_utils import working_directory, load_config_file, get_config
from cmd_utils import locate_str_in_list, read_stage_output
log = logging.getLogger()
Expand Down Expand Up @@ -48,7 +48,7 @@ def get_parser():

# Initialization
subparsers = CLI.add_subparsers(help='')
CLI_init = subparsers.add_parser('init',
CLI_init = subparsers.add_parser('init',
help='initialize the cobrawap directories (required only once)')
CLI_init.set_defaults(command='init')
CLI_init.add_argument("--output_path", type=Path, default=None,
Expand All @@ -59,13 +59,13 @@ def get_parser():
"stored [default: '~/cobrawap_config/']")

# Show Settings
CLI_settings = subparsers.add_parser('settings',
CLI_settings = subparsers.add_parser('settings',
help='display the content of ~/.cobrawap/config')
CLI_settings.set_defaults(command='settings')


# Configuration
CLI_create = subparsers.add_parser('create',
CLI_create = subparsers.add_parser('create',
help='create configuration for a new dataset')
CLI_create.set_defaults(command='create')
CLI_create.add_argument("--data_path", type=Path, nargs='?', default=None,
Expand All @@ -77,12 +77,12 @@ def get_parser():
CLI_create.add_argument("--profile", type=str, nargs='?', default=None,
help="profile name of this dataset/application "
"(see profile name conventions in documentation)")
CLI_create.add_argument("--parent_profile", type=str, nargs='?', default=None,
CLI_create.add_argument("--parent_profile", type=str, nargs='?', default=None,
help="optional parent profile name "
"(see profile name conventions in documentation)")

# Additional configurations
CLI_profile = subparsers.add_parser('add_profile',
CLI_profile = subparsers.add_parser('add_profile',
help='create a new configuration for an existing dataset')
CLI_profile.set_defaults(command='add_profile')
CLI_profile.add_argument("--profile", type=str, nargs='?', default=None,
Expand All @@ -97,15 +97,15 @@ def get_parser():
"[default: basic template]")

# Run
CLI_run = subparsers.add_parser('run',
CLI_run = subparsers.add_parser('run',
help='run the analysis pipeline on the selected '
'input and with the specified configurations')
CLI_run.set_defaults(command='run')
CLI_run.add_argument("--profile", type=str, nargs='?', default=None,
help="name of the config profile to be analyzed")

# Stage
CLI_stage = subparsers.add_parser('run_stage',
CLI_stage = subparsers.add_parser('run_stage',
help='execute an individual stage')
CLI_stage.set_defaults(command='run_stage')
CLI_stage.add_argument("--profile", type=str, nargs='?', default=None,
Expand All @@ -115,7 +115,7 @@ def get_parser():
help="select individual stage to execute")

# Block
CLI_block = subparsers.add_parser('run_block',
CLI_block = subparsers.add_parser('run_block',
help='execute an individual block method on some input')
CLI_block.set_defaults(command='run_block')
CLI_block.add_argument("block", type=str, nargs='?', default=None,
Expand All @@ -141,7 +141,7 @@ def main():
elif args.command == 'settings':
log.info("display settings at ~/.cobrawap/config")
print_settings(**vars(args))

elif args.command == 'create':
log.info("creating a set of config files")
create(**vars(args))
Expand Down Expand Up @@ -192,7 +192,7 @@ def initialize(output_path=None, config_path=None, **kwargs):
config_path.mkdir(parents=True, exist_ok=True)
if not config_path.is_dir():
raise ValueError(f"{config_path} is not a valid directory!")

set_setting(dict(config_path=str(config_path)))

# set pipeline path
Expand All @@ -218,12 +218,12 @@ def initialize(output_path=None, config_path=None, **kwargs):
shutil.copy(pipeline_path / stage / 'configs' \
/ 'config_template.yaml',
stage_config_path / 'config.yaml')

pipeline_config_path = config_path / 'configs'
pipeline_config_path.mkdir(parents=True, exist_ok=True)
shutil.copy(pipeline_path / 'configs' / 'config_template.yaml',
pipeline_config_path / 'config.yaml')

stage01_script_path = config_path / stages['1'] / 'scripts'
stage01_script_path.mkdir(parents=True, exist_ok=True)
shutil.copy(pipeline_path / stages['1'] / 'scripts' \
Expand All @@ -238,28 +238,28 @@ def print_settings(*args, **kwargs):
return None


def create(profile=None, parent_profile=None, data_path=None,
def create(profile=None, parent_profile=None, data_path=None,
loading_script_name=None, **kwargs):
profile, parent_profile = get_profile(profile=profile,
profile, parent_profile = get_profile(profile=profile,
parent_profile=parent_profile)
base_name = parent_profile if parent_profile else profile

for stage_number, stage in get_setting('stages').items():
config_name = profile if '1' in str(stage_number) else base_name
create_new_configfile(stage=stage,
create_new_configfile(stage=stage,
profile=config_name,
parent=parent_profile)
setup_entry_stage(profile=profile,

setup_entry_stage(profile=profile,
parent_profile=parent_profile,
data_path=data_path,
data_path=data_path,
loading_script_name=loading_script_name)
return None


def add_profile(profile=None, parent_profile=None, stages=None,
def add_profile(profile=None, parent_profile=None, stages=None,
data_path=None, loading_script_name=None, **kwargs):
profile, parent_profile = get_profile(profile=profile,
profile, parent_profile = get_profile(profile=profile,
parent_profile=parent_profile)
# get stage selection
stages = ''
Expand All @@ -275,26 +275,26 @@ def add_profile(profile=None, parent_profile=None, stages=None,
stages = ''

for stage_number in stages:
create_new_configfile(stage_number=stage_number,
create_new_configfile(stage_number=stage_number,
profile=profile,
parent=parent_profile)

if any('1' in stage for stage in stages):
setup_entry_stage(profile=profile, parent_profile=parent_profile,
data_path=data_path,
data_path=data_path,
loading_script_name=loading_script_name)
return None


def run(profile=None, extra_args=None, **kwargs):
# select profile
profile = input_profile(profile=profile)

# set runtime config
pipeline_path = Path(get_setting('pipeline_path'))

# execute snakemake
snakemake_args = ['snakemake','-c1','--config',f'PROFILE={profile}']
snakemake_args = ['snakemake', '-c1', '--config', f'PROFILE={profile}']
log.info(f'Executing `{" ".join(snakemake_args+extra_args)}`')

with working_directory(pipeline_path):
Expand Down Expand Up @@ -324,34 +324,35 @@ def run_stage(profile=None, stage=None, extra_args=None, **kwargs):
pipeline_config_path = config_path / 'configs' / 'config.yaml'
config_dict = load_config_file(pipeline_config_path)
stage_idx = locate_str_in_list(config_dict['STAGES'], stage)
# stage_idx_global = locate_str_in_list([v for k,v in stages.items()], stage)
if stage_idx is None:
raise IndexError("Make sure that the selected stage is also specified "\
"in your top-level config in the list `STAGES`!")

stage_config_path = get_config(config_dir=config_path / stage,
config_name=f'config_{profile}.yaml',
get_path_instead=True)

prev_stage = config_dict['STAGES'][stage_idx-1]
prev_stage_config_path = get_config(config_dir=config_path / prev_stage,
config_name=f'config_{profile}.yaml',
get_path_instead=True)
prev_config_name = Path(prev_stage_config_path).name
output_name = read_stage_output(stage=prev_stage,
config_dir=config_path,
config_name=prev_config_name)
stage_input = output_path / profile / prev_stage / output_name

if stage_idx > 0:
prev_stage = config_dict['STAGES'][stage_idx-1]
prev_stage_config_path = get_config(config_dir=config_path / prev_stage,
config_name=f'config_{profile}.yaml',
get_path_instead=True)
prev_config_name = Path(prev_stage_config_path).name
prev_output_name = read_stage_output(stage=prev_stage,
config_dir=config_path,
config_name=prev_config_name)
stage_input = output_path / profile / prev_stage / prev_output_name
extra_args = [f'STAGE_INPUT={stage_input}'] + extra_args

# descend into stage folder
pipeline_path = pipeline_path / stage

# append stage specific arguments
extra_args = [f'STAGE_INPUT={stage_input}'] \
+ extra_args \
+ ['--configfile', f'{stage_config_path}']
extra_args = extra_args + ['--configfile', f'{stage_config_path}']

# execute snakemake
snakemake_args = ['snakemake','-c1','--config',f'PROFILE={profile}']
snakemake_args = ['snakemake', '-c1', '--config', f'PROFILE={profile}']
log.info(f'Executing `{" ".join(snakemake_args+extra_args)}`')

with working_directory(pipeline_path):
Expand Down Expand Up @@ -387,7 +388,7 @@ def run_block(block=None, block_args=None, block_help=False, **kwargs):
# execute block
myenv = os.environ.copy()
myenv['PYTHONPATH'] = ':'.join(sys.path)
subprocess.run(['python', str(block_dir / f'{block}.py')]
subprocess.run(['python', str(block_dir / f'{block}.py')]
+ block_args,
env=myenv)
return None
Expand Down
8 changes: 5 additions & 3 deletions cobrawap/pipeline/utils/Snakefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ from utils.snakefile import get_setting
CONFIG_PATH = Path(get_setting('config_path'))
OUTPUT_PATH = Path(get_setting('output_path'))

is_first_stage = config['STAGE_NAME'] == get_setting('stages')['1']
if is_first_stage:
config['STAGE_INPUT'] = None
if 'STAGE_INPUT' not in config:
logger.warning('No STAGE_INPUT defined for running the stage individually! '
'You can set it via the command line with '
logger.warning('No STAGE_INPUT defined for running stage \'{}\' individually! '.format(config['STAGE_NAME']) +
'You can set it via the command line with ' +
'`--config STAGE_INPUT=/path/to/file`.')
config['STAGE_INPUT'] = None

if 'USE_LINK_AS_STAGE_OUTPUT' not in config:
config['USE_LINK_AS_STAGE_OUTPUT'] = True
Expand Down