Skip to content

Commit

Permalink
feat: feed all lifebit related changes to this version from 21.04.1
Browse files Browse the repository at this point in the history
  • Loading branch information
mageshwaran-lifebit committed Dec 8, 2023
1 parent 4fc071f commit 21f43f0
Show file tree
Hide file tree
Showing 14 changed files with 160 additions and 50 deletions.
3 changes: 3 additions & 0 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Core analyses is the default owner for this repo.
* @lifebit-ai/core-analyses
* @lifebit-ai/tre-batch-analysis
2 changes: 1 addition & 1 deletion buildSrc/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ version = "1.0.1"
group = "io.nextflow"

dependencies {
implementation ('com.amazonaws:aws-java-sdk-s3:1.11.542')
implementation ('com.amazonaws:aws-java-sdk-s3:1.12.129')
implementation 'com.google.code.gson:gson:2.9.0'
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ class BashWrapperBuilder {
binding.fix_ownership = fixOwnership() ? "[ \${NXF_OWNER:=''} ] && chown -fR --from root \$NXF_OWNER ${workDir}/{*,.*} || true" : null

binding.trace_script = isTraceRequired() ? getTraceScript(binding) : null
binding.temp_dir = "\${1:-${copyStrategy.getTempDir(workDir)}}"

return binding
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,9 @@ interface ScriptFileCopyStrategy {
*/
String getEnvScript(Map environment, boolean container)

/**
* @param targetDir The directory where output files need to be unstaged ie. stored
* @return the path string for the temp directory
*/
String getTempDir( Path targetDir )
}
Original file line number Diff line number Diff line change
Expand Up @@ -415,4 +415,9 @@ class SimpleFileCopyStrategy implements ScriptFileCopyStrategy {
}
}

@Override
String getTempDir( Path workDir ) {
return "/tmp"
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,25 @@ region="${zone::-1}"
#
[[ '!{dockerPull}' ]] && for x in '!{dockerPull}'; do docker pull $x || true; done
#
# Mount fsx file systems if provided
#
mountCommandsString="!{fsxFileSystemsMountCommands}"
IFS=';' read -ra mountCommandsArray <<< "$mountCommandsString"
[[ '!{fsxFileSystemsMountCommands}' ]] && for fsxMountCommand in "${mountCommandsArray[@]}"; do $fsxMountCommand || true; done
#
# Install NEXTFLOW and launch it
#
version="v!{nextflow.version}"
if [[ '!{customNextflowBinaryUrl}' ]]; then
curl -fsSL http://www.nextflow.io/releases/${version}/nextflow > $HOME/nextflow
curl -s https://get.nextflow.io --output nextflow
chmod +x $HOME/nextflow
NXF_PACK="all" NXF_URL="!{customNextflowBinaryUrl}" NXF_VER=${version} NXF_MODE="ignite" NXF_EXECUTOR="ignite" bash nextflow info
$HOME/nextflow -download
chmod +x $HOME/nextflow
$HOME/nextflow -download
fi
# pull the nextflow pipeline repo
[[ '!{nextflow.pull}' ]] && $HOME/nextflow pull '!{nextflow.pull}'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
##
##
## Copyright 2020-2022, Seqera Labs
## Copyright 2013-2019, Centre for Genomic Regulation (CRG)
##
Expand Down Expand Up @@ -64,7 +64,7 @@ nxf_kill() {
}

nxf_mktemp() {
local base=${1:-/tmp}
local base={{temp_dir}}
mkdir -p "$base"
if [[ $(uname) = Darwin ]]; then mktemp -d $base/nxf.XXXXXXXXXX
else TMPDIR="$base" mktemp -d -t nxf.XXXXXXXXXX
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@

package nextflow.cloud.aws

import com.amazonaws.AmazonClientException
import com.amazonaws.auth.AWSCredentials
import com.amazonaws.auth.AWSStaticCredentialsProvider
import com.amazonaws.auth.BasicAWSCredentials
import com.amazonaws.auth.BasicSessionCredentials
import com.amazonaws.regions.InstanceMetadataRegionProvider
import com.amazonaws.regions.Region
import com.amazonaws.regions.RegionUtils
import com.amazonaws.services.batch.AWSBatchClient
Expand All @@ -29,6 +31,8 @@ import com.amazonaws.services.ecs.AmazonECS
import com.amazonaws.services.ecs.AmazonECSClientBuilder
import com.amazonaws.services.logs.AWSLogs
import com.amazonaws.services.logs.AWSLogsAsyncClientBuilder
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder
import com.amazonaws.services.securitytoken.model.GetCallerIdentityRequest
import groovy.transform.CompileStatic
import groovy.transform.Memoized
import groovy.util.logging.Slf4j
Expand Down Expand Up @@ -136,35 +140,19 @@ class AmazonClientFactory {
*/
protected String fetchIamRole() {
try {
def role = getUrl('http://169.254.169.254/latest/meta-data/iam/security-credentials/').readLines()
if( role.size() != 1 )
throw new IllegalArgumentException("Not a valid EC2 IAM role")
return role.get(0)
def stsClient = AWSSecurityTokenServiceClientBuilder.defaultClient();
def roleArn = stsClient.getCallerIdentity(new GetCallerIdentityRequest()).getArn()
if(roleArn){
return roleArn.split('/')[-2]
}
return null
}
catch( IOException e ) {
catch( AmazonClientException e ) {
log.trace "Unable to fetch IAM credentials -- Cause: ${e.message}"
return null
}
}

/**
* Fetch a remote URL resource text content
*
* @param path
* A valid http/https resource URL
* @param timeout
* Max connection timeout in millis
* @return
* The resource URL content
*/
protected String getUrl(String path, int timeout=150) {
final url = new URL(path)
final con = url.openConnection()
con.setConnectTimeout(timeout)
con.setReadTimeout(timeout)
return con.getInputStream().text.trim()
}

/**
* Retrieve the AWS region from the EC2 instance metadata.
* See http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html
Expand All @@ -175,10 +163,9 @@ class AmazonClientFactory {
*/
protected String fetchRegion() {
try {
def zone = getUrl('http://169.254.169.254/latest/meta-data/placement/availability-zone')
zone ? zone.substring(0,zone.length()-1) : null
return new InstanceMetadataRegionProvider().getRegion()
}
catch (IOException e) {
catch (AmazonClientException e) {
log.debug "Cannot fetch AWS region", e
return null
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,13 @@ class AwsBatchExecutor extends Executor implements ExtensionPoint {

protected void validateWorkDir() {
/*
* make sure the work dir is a S3 bucket
* make sure the work dir is a S3 bucket and if we are not usign lustre fsx
*/
if( !(workDir instanceof S3Path) ) {
def isUsingLustre = session.config.navigate('cloud.fsxFileSystemsMountCommands')
log.debug "Checking workdir validation, isUsingLustre $isUsingLustre"
if( !(workDir instanceof S3Path) && !isUsingLustre ) {
session.abort()
throw new AbortOperationException("When using `$name` executor an S3 bucket must be provided as working directory using either the `-bucket-dir` or `-work-dir` command line option")
throw new AbortOperationException("When using `$name` executor and we are not using Lustre storage a S3 bucket must be provided as working directory either using -bucket-dir or -work-dir command line option")
}
}

Expand Down Expand Up @@ -259,6 +261,19 @@ class AwsBatchExecutor extends Executor implements ExtensionPoint {
@PackageScope
ThrottlingExecutor getReaper() { reaper }

String getInstanceIdByQueueAndTaskArn(String queue, String taskArn) {
try {
return helper?.getInstanceIdByQueueAndTaskArn(queue, taskArn)
}
catch ( AccessDeniedException e ) {
log.warn "Unable to retrieve AWS Batch instance Id | ${e.message}"
return null
}
catch( Exception e ) {
log.warn "Unable to retrieve AWS batch instance id for queue=$queue; task=$taskArn | ${e.message}", e
return null
}
}

CloudMachineInfo getMachineInfoByQueueAndTaskArn(String queue, String taskArn) {
try {
Expand Down Expand Up @@ -299,13 +314,4 @@ class AwsBatchExecutor extends Executor implements ExtensionPoint {
ThreadPoolHelper.await(reaper, Duration.of('60min'), waitMsg, exitMsg, )
}

}









}
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,13 @@ class AwsBatchFileCopyStrategy extends SimpleFileCopyStrategy {
copy.remove('PATH')
// when a remote bin directory is provide managed it properly
if( opts.remoteBinDir ) {
result << "${opts.getAwsCli()} s3 cp --recursive --only-show-errors s3:/${opts.remoteBinDir} \$PWD/nextflow-bin\n"
result << "chmod +x \$PWD/nextflow-bin/* || true\n"
final isUsingLustreFsx = !opts.getFsxFileSystemsMountCommands().isEmpty()
final copyCommandWhenUsingLustre = "cp -r ${opts.remoteBinDir} \$PWD/nextflow-bin\n"
final copyCommandWhenUsingS3 = "${opts.getAwsCli()} s3 cp --recursive --only-show-errors s3:/${opts.remoteBinDir} \$PWD/nextflow-bin\n"
final copyCommand = isUsingLustreFsx ? copyCommandWhenUsingLustre : copyCommandWhenUsingS3

result << copyCommand
result << "chmod +x \$PWD/nextflow-bin/*\n"
result << "export PATH=\$PWD/nextflow-bin:\$PATH\n"
}
// finally render the environment
Expand All @@ -83,6 +88,11 @@ class AwsBatchFileCopyStrategy extends SimpleFileCopyStrategy {

@Override
String getStageInputFilesScript(Map<String,Path> inputFiles) {
final isUsingLustreFsx = !opts.getFsxFileSystemsMountCommands().isEmpty()
if( isUsingLustreFsx ) {
log.trace "[USING LUSTRE FSX] stage_inputs."
return super.getStageInputFilesScript(inputFiles) + '\n'
}
def result = 'downloads=(true)\n'
result += super.getStageInputFilesScript(inputFiles) + '\n'
result += 'nxf_parallel "${downloads[@]}"\n'
Expand All @@ -94,6 +104,10 @@ class AwsBatchFileCopyStrategy extends SimpleFileCopyStrategy {
*/
@Override
String stageInputFile( Path path, String targetName ) {
final isUsingLustreFsx = !opts.getFsxFileSystemsMountCommands().isEmpty()
if( isUsingLustreFsx ) {
return "cp -r ${Escape.path(path)} ${Escape.path(targetName)}"
}
// third param should not be escaped, because it's used in the grep match rule
def stage_cmd = opts.maxTransferAttempts > 1 && !opts.retryMode
? "downloads+=(\"nxf_cp_retry nxf_s3_download s3:/${Escape.path(path)} ${Escape.path(targetName)}\")"
Expand All @@ -118,6 +132,20 @@ class AwsBatchFileCopyStrategy extends SimpleFileCopyStrategy {
for( String it : patterns )
escape.add( Escape.path(it) )

def isUsingLustreFsx = !opts.getFsxFileSystemsMountCommands().isEmpty()
if ( isUsingLustreFsx ) {
log.trace "[USING LUSTRE FSX] unstage_outputs."
return """\
uploads=()
IFS=\$'\\n'
for name in \$(eval "ls -1d ${escape.join(' ')}" | sort | uniq); do
uploads+=("cp -r '\$name' ${Escape.path(targetDir)}")
done
unset IFS
nxf_parallel "\${uploads[@]}"
""".stripIndent(true)
}

return """\
uploads=()
IFS=\$'\\n'
Expand All @@ -134,7 +162,11 @@ class AwsBatchFileCopyStrategy extends SimpleFileCopyStrategy {
*/
@Override
String touchFile( Path file ) {
"echo start | nxf_s3_upload - s3:/${Escape.path(file)}"
def encryption = opts.storageEncryption ? "--sse $opts.storageEncryption " : ''
final isUsingLustreFsx = !opts.getFsxFileSystemsMountCommands().isEmpty()
final touchCommandWhenUsingLustre = "echo start > ${Escape.path(file)}"
final touchCommandWhenUsingS3 = "echo start | nxf_s3_upload - s3:/${Escape.path(file)}"
return isUsingLustreFsx ? touchCommandWhenUsingLustre : touchCommandWhenUsingS3
}

/**
Expand All @@ -150,7 +182,10 @@ class AwsBatchFileCopyStrategy extends SimpleFileCopyStrategy {
*/
@Override
String copyFile( String name, Path target ) {
"nxf_s3_upload ${Escape.path(name)} s3:/${Escape.path(target.getParent())}"
final isUsingLustreFsx = !opts.getFsxFileSystemsMountCommands().isEmpty()
final copyCommandWhenUsingLustre = "cp -r ${Escape.path(name)} ${Escape.path(target.getParent())}"
final copyCommandWhenUsingS3 = "nxf_s3_upload ${Escape.path(name)} s3:/${Escape.path(target.getParent())}"
return isUsingLustreFsx ? copyCommandWhenUsingLustre : copyCommandWhenUsingS3
}

static String uploadCmd( String source, Path target ) {
Expand All @@ -161,7 +196,11 @@ class AwsBatchFileCopyStrategy extends SimpleFileCopyStrategy {
* {@inheritDoc}
*/
String exitFile( Path path ) {
"| nxf_s3_upload - s3:/${Escape.path(path)} || true"
def encryption = opts.storageEncryption ? "--sse $opts.storageEncryption " : ''
final isUsingLustreFsx = !opts.getFsxFileSystemsMountCommands().isEmpty()
final exitCommandWhenUsingLustre = "> ${Escape.path(path)}"
final exitCommandWhenUsingS3 = "| nxf_s3_upload - s3:/${Escape.path(path)} || true"
return isUsingLustreFsx ? exitCommandWhenUsingLustre : exitCommandWhenUsingS3
}

/**
Expand All @@ -171,4 +210,10 @@ class AwsBatchFileCopyStrategy extends SimpleFileCopyStrategy {
String pipeInputFile( Path path ) {
" < ${Escape.path(path.getFileName())}"
}

@Override
String getTempDir( Path targetDir ) {
final isUsingLustreFsx = !opts.getFsxFileSystemsMountCommands().isEmpty()
return isUsingLustreFsx ? "${Escape.path(targetDir)}" : super.getTempDir(targetDir)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -219,5 +219,20 @@ class AwsBatchHelper {
return result.toString()
}

private String getInstanceIdByClusterAndTaskArn(String clusterArn, String taskArn) {
final containerId = getContainerIdByClusterAndTaskArn(clusterArn, taskArn)
return containerId ? getInstanceIdByClusterAndContainerId(clusterArn, containerId) : null
}

String getInstanceIdByQueueAndTaskArn(String queue, String taskArn) {
final clusterArnList = getClusterArnByBatchQueue(queue)
for (String cluster : clusterArnList) {
final result = getInstanceIdByClusterAndTaskArn(cluster, taskArn)
if (result)
return result
}
return null
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ class AwsBatchTaskHandler extends TaskHandler implements BatchHandler<String,Job
final job = describeJob(jobId)
final done = job?.status in ['SUCCEEDED', 'FAILED']
if( done ) {
log.trace "[AWS BATCH] Completed task: jobId=$jobId"
// finalize the task
task.exitStatus = readExitFile()
task.stdout = outputFile
Expand Down Expand Up @@ -622,7 +623,17 @@ class AwsBatchTaskHandler extends TaskHandler implements BatchHandler<String,Job
final sse = opts.storageEncryption ? " --sse $opts.storageEncryption" : ''
final kms = opts.storageKmsKeyId ? " --sse-kms-key-id $opts.storageKmsKeyId" : ''
final aws = "$cli s3 cp --only-show-errors${sse}${kms}${debug}"
final cmd = "trap \"{ ret=\$?; $aws ${TaskRun.CMD_LOG} s3:/${getLogFile()}||true; exit \$ret; }\" EXIT; $aws s3:/${getWrapperFile()} - | bash 2>&1 | tee ${TaskRun.CMD_LOG}"
def isUsingLustreFsx = !opts.getFsxFileSystemsMountCommands().isEmpty()
def logCopyCommand = isUsingLustreFsx
? "trap \"{ ret=\$?; cp ${TaskRun.CMD_LOG} ${getLogFile()} 2> /dev/null; exit \$ret; }\" EXIT; "
: "trap \"{ ret=\$?; $aws --request-payer ${TaskRun.CMD_LOG} s3:/${getLogFile()}||true; exit \$ret; }\" EXIT; "
// Note(ruben): Since we do not download the .command.run from s3 bucket and due the fact that is auto imported
// through the link capacity of fsx when mounting we have already access to the file. So, we just need to make it
// executable and run it
def runCopyCommand = isUsingLustreFsx
? "chmod +x ${getWrapperFile()}; ${getWrapperFile()} 2>&1 | tee ${TaskRun.CMD_LOG}"
: "$aws --request-payer s3:/${getWrapperFile()} - | bash 2>&1 | tee ${TaskRun.CMD_LOG}"
def cmd = "${logCopyCommand}${runCopyCommand}"
return ['bash','-o','pipefail','-c', cmd.toString()]
}

Expand Down Expand Up @@ -777,7 +788,8 @@ class AwsBatchTaskHandler extends TaskHandler implements BatchHandler<String,Job
return machineInfo
if( queueName && taskArn && executor.awsOptions.fetchInstanceType ) {
machineInfo = executor.getMachineInfoByQueueAndTaskArn(queueName, taskArn)
log.trace "[AWS BATCH] jobId=$jobId; queue=$queueName; task=$taskArn => machineInfo=$machineInfo"
def instanceId = executor.getInstanceIdByQueueAndTaskArn(queueName, taskArn)
log.trace "[AWS BATCH] jobId=$jobId; queue=$queueName; task=$taskArn => machineInfo=$machineInfo; instanceId=$instanceId\""
}
return machineInfo
}
Expand Down
Loading

0 comments on commit 21f43f0

Please sign in to comment.