Skip to content

Commit

Permalink
chore: finetune code
Browse files Browse the repository at this point in the history
  • Loading branch information
mageshwaran-lifebit committed Dec 8, 2023
1 parent a28f847 commit 964c434
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ IFS=';' read -ra mountCommandsArray <<< "$mountCommandsString"
# Install NEXTFLOW and launch it
#
if [[ '!{customNextflowBinaryUrl}' ]]; then
curl -s https://get.nextflow.io --output nextflow
curl -s https://get.nextflow.io --output $HOME/nextflow
NXF_PACK="all" NXF_URL="!{customNextflowBinaryUrl}" NXF_VER=${version} NXF_MODE="ignite" NXF_EXECUTOR="ignite" bash nextflow info
chmod +x $HOME/nextflow
$HOME/nextflow -download
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ class AmazonClientFactory {
try {
def stsClient = AWSSecurityTokenServiceClientBuilder.defaultClient();
def roleArn = stsClient.getCallerIdentity(new GetCallerIdentityRequest()).getArn()
if(roleArn){
if(roleArn) {
return roleArn.split('/')[-2]
}
return null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class AwsBatchExecutor extends Executor implements ExtensionPoint {

protected void validateWorkDir() {
/*
* make sure the work dir is a S3 bucket and if we are not usign lustre fsx
* make sure the work dir is a S3 bucket and if we are not using lustre fsx
*/
def isUsingLustre = session.config.navigate('cloud.fsxFileSystemsMountCommands')
log.debug "Checking workdir validation, isUsingLustre $isUsingLustre"
Expand Down Expand Up @@ -261,20 +261,6 @@ class AwsBatchExecutor extends Executor implements ExtensionPoint {
@PackageScope
ThrottlingExecutor getReaper() { reaper }

String getInstanceIdByQueueAndTaskArn(String queue, String taskArn) {
try {
return helper?.getInstanceIdByQueueAndTaskArn(queue, taskArn)
}
catch ( AccessDeniedException e ) {
log.warn "Unable to retrieve AWS Batch instance Id | ${e.message}"
return null
}
catch( Exception e ) {
log.warn "Unable to retrieve AWS batch instance id for queue=$queue; task=$taskArn | ${e.message}", e
return null
}
}

CloudMachineInfo getMachineInfoByQueueAndTaskArn(String queue, String taskArn) {
try {
return helper?.getCloudInfoByQueueAndTaskArn(queue, taskArn)
Expand Down Expand Up @@ -314,4 +300,18 @@ class AwsBatchExecutor extends Executor implements ExtensionPoint {
ThreadPoolHelper.await(reaper, Duration.of('60min'), waitMsg, exitMsg, )
}

String getInstanceIdByQueueAndTaskArn(String queue, String taskArn) {
try {
return helper?.getInstanceIdByQueueAndTaskArn(queue, taskArn)
}
catch ( AccessDeniedException e ) {
log.warn "Unable to retrieve AWS Batch instance Id | ${e.message}"
return null
}
catch( Exception e ) {
log.warn "Unable to retrieve AWS batch instance id for queue=$queue; task=$taskArn | ${e.message}", e
return null
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package nextflow.cloud.aws.batch

import nextflow.cloud.aws.util.LifebitUtils

import java.nio.file.Path

import groovy.transform.CompileStatic
Expand Down Expand Up @@ -70,12 +72,8 @@ class AwsBatchFileCopyStrategy extends SimpleFileCopyStrategy {
copy.remove('PATH')
// when a remote bin directory is provide managed it properly
if( opts.remoteBinDir ) {
final isUsingLustreFsx = !opts.getFsxFileSystemsMountCommands().isEmpty()
final copyCommandWhenUsingLustre = "cp -r ${opts.remoteBinDir} \$PWD/nextflow-bin\n"
final copyCommandWhenUsingS3 = "${opts.getAwsCli()} s3 cp --recursive --only-show-errors s3:/${opts.remoteBinDir} \$PWD/nextflow-bin\n"
final copyCommand = isUsingLustreFsx ? copyCommandWhenUsingLustre : copyCommandWhenUsingS3

result << copyCommand
result << LifebitUtils.isUsingLustreFsx(opts) ? "cp -r ${opts.remoteBinDir} \$PWD/nextflow-bin\n" : copyCommandWhenUsingS3
result << "chmod +x \$PWD/nextflow-bin/* || true\n"
result << "export PATH=\$PWD/nextflow-bin:\$PATH\n"
}
Expand All @@ -88,8 +86,7 @@ class AwsBatchFileCopyStrategy extends SimpleFileCopyStrategy {

@Override
String getStageInputFilesScript(Map<String,Path> inputFiles) {
final isUsingLustreFsx = !opts.getFsxFileSystemsMountCommands().isEmpty()
if( isUsingLustreFsx ) {
if( LifebitUtils.isUsingLustreFsx(opts) ) {
log.trace "[USING LUSTRE FSX] stage_inputs."
return super.getStageInputFilesScript(inputFiles) + '\n'
}
Expand All @@ -104,8 +101,7 @@ class AwsBatchFileCopyStrategy extends SimpleFileCopyStrategy {
*/
@Override
String stageInputFile( Path path, String targetName ) {
final isUsingLustreFsx = !opts.getFsxFileSystemsMountCommands().isEmpty()
if( isUsingLustreFsx ) {
if( LifebitUtils.isUsingLustreFsx(opts) ) {
return "cp -r ${Escape.path(path)} ${Escape.path(targetName)}"
}
// third param should not be escaped, because it's used in the grep match rule
Expand All @@ -120,7 +116,6 @@ class AwsBatchFileCopyStrategy extends SimpleFileCopyStrategy {
*/
@Override
String getUnstageOutputFilesScript(List<String> outputFiles, Path targetDir) {

final patterns = normalizeGlobStarPaths(outputFiles)
// create a bash script that will copy the out file to the working directory
log.trace "[AWS BATCH] Unstaging file path: $patterns"
Expand All @@ -132,8 +127,7 @@ class AwsBatchFileCopyStrategy extends SimpleFileCopyStrategy {
for( String it : patterns )
escape.add( Escape.path(it) )

def isUsingLustreFsx = !opts.getFsxFileSystemsMountCommands().isEmpty()
if ( isUsingLustreFsx ) {
if ( LifebitUtils.isUsingLustreFsx(opts) ) {
log.trace "[USING LUSTRE FSX] unstage_outputs."
return """\
uploads=()
Expand Down Expand Up @@ -162,11 +156,8 @@ class AwsBatchFileCopyStrategy extends SimpleFileCopyStrategy {
*/
@Override
String touchFile( Path file ) {
def encryption = opts.storageEncryption ? "--sse $opts.storageEncryption " : ''
final isUsingLustreFsx = !opts.getFsxFileSystemsMountCommands().isEmpty()
final touchCommandWhenUsingLustre = "echo start > ${Escape.path(file)}"
final touchCommandWhenUsingS3 = "echo start | nxf_s3_upload - s3:/${Escape.path(file)}"
return isUsingLustreFsx ? touchCommandWhenUsingLustre : touchCommandWhenUsingS3
return LifebitUtils.isUsingLustreFsx(opts) ? "echo start > ${Escape.path(file)}" : touchCommandWhenUsingS3
}

/**
Expand All @@ -182,10 +173,9 @@ class AwsBatchFileCopyStrategy extends SimpleFileCopyStrategy {
*/
@Override
String copyFile( String name, Path target ) {
final isUsingLustreFsx = !opts.getFsxFileSystemsMountCommands().isEmpty()
final copyCommandWhenUsingLustre = "cp -r ${Escape.path(name)} ${Escape.path(target.getParent())}"
final copyCommandWhenUsingS3 = "nxf_s3_upload ${Escape.path(name)} s3:/${Escape.path(target.getParent())}"
return isUsingLustreFsx ? copyCommandWhenUsingLustre : copyCommandWhenUsingS3
return LifebitUtils.isUsingLustreFsx(opts) ? copyCommandWhenUsingLustre : copyCommandWhenUsingS3
}

static String uploadCmd( String source, Path target ) {
Expand All @@ -196,11 +186,7 @@ class AwsBatchFileCopyStrategy extends SimpleFileCopyStrategy {
* {@inheritDoc}
*/
String exitFile( Path path ) {
def encryption = opts.storageEncryption ? "--sse $opts.storageEncryption " : ''
final isUsingLustreFsx = !opts.getFsxFileSystemsMountCommands().isEmpty()
final exitCommandWhenUsingLustre = "> ${Escape.path(path)}"
final exitCommandWhenUsingS3 = "| nxf_s3_upload - s3:/${Escape.path(path)} || true"
return isUsingLustreFsx ? exitCommandWhenUsingLustre : exitCommandWhenUsingS3
return LifebitUtils.isUsingLustreFsx(opts) ? "> ${Escape.path(path)}" : "| nxf_s3_upload - s3:/${Escape.path(path)} || true"
}

/**
Expand All @@ -213,7 +199,7 @@ class AwsBatchFileCopyStrategy extends SimpleFileCopyStrategy {

@Override
String getTempDir( Path targetDir ) {
final isUsingLustreFsx = !opts.getFsxFileSystemsMountCommands().isEmpty()
return isUsingLustreFsx ? "${Escape.path(targetDir)}" : super.getTempDir(targetDir)
return LifebitUtils.isUsingLustreFsx(opts) ? "${Escape.path(targetDir)}" : super.getTempDir(targetDir)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package nextflow.cloud.aws.batch

import nextflow.cloud.aws.util.LifebitUtils

import static nextflow.cloud.aws.batch.AwsContainerOptionsMapper.*

import java.nio.file.Path
Expand Down Expand Up @@ -623,14 +625,13 @@ class AwsBatchTaskHandler extends TaskHandler implements BatchHandler<String,Job
final sse = opts.storageEncryption ? " --sse $opts.storageEncryption" : ''
final kms = opts.storageKmsKeyId ? " --sse-kms-key-id $opts.storageKmsKeyId" : ''
final aws = "$cli s3 cp --only-show-errors${sse}${kms}${debug}"
def isUsingLustreFsx = !opts.getFsxFileSystemsMountCommands().isEmpty()
def logCopyCommand = isUsingLustreFsx
def logCopyCommand = LifebitUtils.isUsingLustreFsx(opts)
? "trap \"{ ret=\$?; cp ${TaskRun.CMD_LOG} ${getLogFile()} 2> /dev/null; exit \$ret; }\" EXIT; "
: "trap \"{ ret=\$?; $aws --request-payer ${TaskRun.CMD_LOG} s3:/${getLogFile()}||true; exit \$ret; }\" EXIT; "
// Note(ruben): Since we do not download the .command.run from s3 bucket and due the fact that is auto imported
// through the link capacity of fsx when mounting we have already access to the file. So, we just need to make it
// executable and run it
def runCopyCommand = isUsingLustreFsx
def runCopyCommand = LifebitUtils.isUsingLustreFsx(opts)
? "chmod +x ${getWrapperFile()}; ${getWrapperFile()} 2>&1 | tee ${TaskRun.CMD_LOG}"
: "$aws --request-payer s3:/${getWrapperFile()} - | bash 2>&1 | tee ${TaskRun.CMD_LOG}"
def cmd = "${logCopyCommand}${runCopyCommand}"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package nextflow.cloud.aws.util

import nextflow.cloud.aws.batch.AwsOptions

class LifebitUtils {


static boolean isUsingLustreFsx(AwsOptions opts) {
return !opts.getFsxFileSystemsMountCommands().isEmpty()
}
}

0 comments on commit 964c434

Please sign in to comment.