Skip to content

Commit

Permalink
Merge branch 'release/1.0.7'
Browse files Browse the repository at this point in the history
  • Loading branch information
Walt Shands committed Aug 18, 2017
2 parents 65663e5 + dcdd4e0 commit 9675909
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 85 deletions.
95 changes: 50 additions & 45 deletions luigi_task_executor/CNV.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class ConsonanceTask(luigi.Task):

redwood_host = luigi.Parameter("storage.ucsc-cgl.org")
redwood_token = luigi.Parameter("must_be_defined")
dockstore_tool_running_dockstore_tool = luigi.Parameter(default="quay.io/ucsc_cgl/dockstore-tool-runner:1.0.17")
dockstore_tool_running_dockstore_tool = luigi.Parameter(default="quay.io/ucsc_cgl/dockstore-tool-runner:1.0.18")

workflow_version = luigi.Parameter(default="must be defined")

Expand All @@ -56,8 +56,6 @@ class ConsonanceTask(luigi.Task):

sample_name = luigi.Parameter(default='must input sample name')
specimen_type = luigi.Parameter(default='must input sample name')
#output_filename = sample_name
#submitter_sample_id = luigi.Parameter(default='must input submitter sample id')
cnv_job_json = luigi.Parameter(default="must input metadata")
cnv_reference_files_json = luigi.Parameter(default="must input reference file metadata")

Expand All @@ -68,10 +66,6 @@ class ConsonanceTask(luigi.Task):

def run(self):
print("\n\n\n** TASK RUN **")
#get a unique id for this task based on the some inputs
#this id will not change if the inputs are the same
# task_uuid = self.get_task_uuid()

# print "** MAKE TEMP DIR **"
# create a unique temp dir
local_json_dir = "/tmp/" + self.touch_file_path
Expand Down Expand Up @@ -127,12 +121,6 @@ def run(self):
print("** MAKE JSON FOR DOCKSTORE TOOL WRAPPER **")

# create a json for dockstoreRunningDockstoreTool, embed the JSON as a param
# below used to be a list of parent UUIDs; which is correct????
# "parent_uuids": "[%s]",
parent_uuids = ','.join(map("{0}".format, cnv_job['parent_uuids']))

print("parent uuids:%s" % parent_uuids)

p = self.save_dockstore_json().open('w')
p_local = self.save_dockstore_json_local().open('w')

Expand All @@ -145,7 +133,15 @@ def run(self):
dockstore_json["dockstore_url" ] = self.target_tool_url
dockstore_json["redwood_token" ] = self.redwood_token
dockstore_json["redwood_host"] = self.redwood_host
dockstore_json["parent_uuids"] = parent_uuids

#use only one parent uuid even though inputs are from more than one bundle?
#Do this now until file browser code fixed so that it won't
#display duplicate workflow outputs
#parent_uuids = ','.join(map("{0}".format, cnv_job['parent_uuids']))
#print("parent uuids:%s" % parent_uuids)
#dockstore_json["parent_uuids"] = parent_uuids
dockstore_json["parent_uuids"] = cnv_job['parent_uuids'][0]

dockstore_json["workflow_type"] = self.workflow_type
dockstore_json["launch_type"] = 'workflow'
dockstore_json["tmpdir"] = self.tmp_dir
Expand All @@ -168,7 +164,6 @@ def run(self):
# execute consonance run, parse the job UUID

#cmd = ["consonance", "run", "--image-descriptor", self.image_descriptor, "--flavour", "c4.8xlarge", "--run-descriptor", self.save_dockstore_json_local().path]

cmd = ["consonance", "run", "--tool-dockstore-id", self.dockstore_tool_running_dockstore_tool, "--flavour", self.vm_instance_type, "--run-descriptor", self.save_dockstore_json_local().path]
cmd_str = ' '.join(cmd)
if self.test_mode == False:
Expand Down Expand Up @@ -219,13 +214,6 @@ def run(self):
m.close()


# if result == 0:
# cmd = "rm -rf "+self.data_dir+"/"+self.bundle_uuid+"/bamstats_report.zip "+self.data_dir+"/"+self.bundle_uuid+"/datastore/"
# print "CLEANUP CMD: "+cmd
# result = subprocess.call(cmd, shell=True)
# if result == 0:
# print "CLEANUP SUCCESSFUL"

# NOW MAke a final report
f = self.output().open('w')
# TODO: could print report on what was successful and what failed? Also, provide enough details like donor ID etc
Expand All @@ -236,26 +224,22 @@ def run(self):
def save_metadata_json(self):
#task_uuid = self.get_task_uuid()
#return luigi.LocalTarget('%s/consonance-jobs/RNASeq_3_1_x_Coordinator/fastq_gz/%s/metadata.json' % (self.tmp_dir, task_uuid))
#return S3Target('s3://cgl-core-analysis-run-touch-files/consonance-jobs/RNASeq_3_1_x_Coordinator/%s/metadata.json' % ( task_uuid))
return S3Target('s3://%s/%s_meta_data.json' % (self.touch_file_path, self.sample_name + "_" + self.specimen_type ))

def save_dockstore_json_local(self):
#task_uuid = self.get_task_uuid()
#luigi.LocalTarget('%s/consonance-jobs/RNASeq_3_1_x_Coordinator/fastq_gz/%s/dockstore_tool.json' % (self.tmp_dir, task_uuid))
#return S3Target('s3://cgl-core-analysis-run-touch-files/consonance-jobs/RNASeq_3_1_x_Coordinator/%s/dockstore_tool.json' % ( task_uuid))
#return S3Target('%s/%s_dockstore_tool.json' % (self.touch_file_path, self.submitter_sample_id ))
return luigi.LocalTarget('/tmp/%s/%s_dockstore_tool.json' % (self.touch_file_path, self.sample_name + "_" + self.specimen_type ))

def save_dockstore_json(self):
#task_uuid = self.get_task_uuid()
#luigi.LocalTarget('%s/consonance-jobs/RNASeq_3_1_x_Coordinator/fastq_gz/%s/dockstore_tool.json' % (self.tmp_dir, task_uuid))
#return S3Target('s3://cgl-core-analysis-run-touch-files/consonance-jobs/RNASeq_3_1_x_Coordinator/%s/dockstore_tool.json' % ( task_uuid))
return S3Target('s3://%s/%s_dockstore_tool.json' % (self.touch_file_path, self.sample_name + "_" + self.specimen_type ))

def output(self):
#task_uuid = self.get_task_uuid()
#return luigi.LocalTarget('%s/consonance-jobs/RNASeq_3_1_x_Coordinator/fastq_gz/%s/finished.txt' % (self.tmp_dir, task_uuid))
#return S3Target('s3://cgl-core-analysis-run-touch-files/consonance-jobs/RNASeq_3_1_x_Coordinator/%s/finished.txt' % ( task_uuid))
return S3Target('s3://%s/%s_finished.json' % (self.touch_file_path, self.sample_name + "_" + self.specimen_type ))


Expand All @@ -266,11 +250,11 @@ class CNVCoordinator(luigi.Task):
redwood_token = luigi.Parameter("must_be_defined")
redwood_host = luigi.Parameter(default='storage.ucsc-cgp.org')
image_descriptor = luigi.Parameter("must be defined")
dockstore_tool_running_dockstore_tool = luigi.Parameter(default="quay.io/ucsc_cgl/dockstore-tool-runner:1.0.17")
dockstore_tool_running_dockstore_tool = luigi.Parameter(default="quay.io/ucsc_cgl/dockstore-tool-runner:1.0.18")
tmp_dir = luigi.Parameter(default='/datastore')
max_jobs = luigi.Parameter(default='-1')
bundle_uuid_filename_to_file_uuid = {}
process_sample_uuid = luigi.Parameter(default = "master")
process_sample_uuid = luigi.Parameter(default = "")

workflow_version = luigi.Parameter(default="")
touch_file_bucket = luigi.Parameter(default="must be input")
Expand Down Expand Up @@ -310,8 +294,8 @@ def requires(self):
# now query elasticsearch
print("setting up elastic search Elasticsearch([\"http:\/\/"+self.es_index_host+":"+self.es_index_port+"]")
es = Elasticsearch([{'host': self.es_index_host, 'port': self.es_index_port}])
res = es.search(index="analysis_index", body={"query" : {"bool" : {"should" : [{"term" : { "flags.normal_cnv_workflow" : "false"}}, \
{"term" : {"flags.tumor_cnv_workflow" : "false" }}],"minimum_should_match" : 1 }}}, size=5000)
res = es.search(index="analysis_index", body={"query" : {"bool" : {"should" : [{"term" : { "flags.normal_cnv_workflow_1_0_x" : "false"}}, \
{"term" : {"flags.tumor_cnv_workflow_1_0_x" : "false" }}],"minimum_should_match" : 1 }}}, size=5000)
# see jqueryflag_alignment_qc
# curl -XPOST http://localhost:9200/analysis_index/_search?pretty -d @jqueryflag_alignment_qc

Expand Down Expand Up @@ -365,10 +349,11 @@ def requires(self):
print("Next sample of %d samples:" % len(specimen["samples"]))
for sample in specimen["samples"]:
print("Next analysis of %d analysis:" % len(sample["analysis"]))

#if a particular sample uuid is requested for processing and
#the current sample uuid does not match go on to the next sample
#if self.process_sample_uuid and (self.process_sample_uuid != sample["sample_uuid"]):
# continue
if self.process_sample_uuid and (self.process_sample_uuid != sample["sample_uuid"]):
continue

sample_name = hit["_source"]["submitter_donor_id"]
print('sample name (donor id):{}'.format(sample_name))
Expand All @@ -385,7 +370,7 @@ def requires(self):
+hit["_source"]["program"]+"_" \
+hit["_source"]["project"]

#should we remove all white space from the path in the case where i.e. the program name is two works separated by blanks?
#should we remove all white space from the path in the case where i.e. the program name is two words separated by blanks?
# remove all whitespace from touch file path
#touch_file_path = ''.join(touch_file_path.split())

Expand All @@ -401,23 +386,21 @@ def requires(self):
print(output)

if ( (analysis["analysis_type"] == "alignment" and \
#if ( (analysis["analysis_type"] == "cnv_variant_calling" and \
((
#hit["_source"]["flags"]["normal_cnv_workflow"] == False and \
# sample["sample_uuid"] in hit["_source"]["missing_items"]["normal_cnv_workflow"] and \
hit["_source"]["flags"]["normal_cnv_workflow_1_0_x"] == False and \
sample["sample_uuid"] in hit["_source"]["missing_items"]["normal_cnv_workflow_1_0_x"] and \
re.match("^Normal - ", specimen["submitter_specimen_type"])) or \
(
#hit["_source"]["flags"]["tumor_cnv_workflow"] == False and \
# sample["sample_uuid"] in hit["_source"]["missing_items"]["tumor_cnv_workflow"] and \
hit["_source"]["flags"]["tumor_cnv_workflow_1_0_x"] == False and \
sample["sample_uuid"] in hit["_source"]["missing_items"]["tumor_cnv_workflow_1_0_x"] and \
re.match("^Primary tumour - |^Recurrent tumour - |^Metastatic tumour - |^Cell line -", specimen["submitter_specimen_type"]))))):


for file in analysis["workflow_outputs"]:
print("\nfile type:"+file["file_type"])
print("\nfile name:"+file["file_path"])

#if (file["file_type"] != "fastq" or
# file["file_type"] != "fastq.gz"):
#if (file["file_type"] != "bam"): output an error message?

file_path = 'redwood' + '://' + self.redwood_host + '/' + analysis['bundle_uuid'] + '/' + \
self.fileToUUID(file["file_path"], analysis["bundle_uuid"]) + \
Expand Down Expand Up @@ -474,8 +457,8 @@ def requires(self):
cnv_jobs['samples'][sample_name]["submitter_experimental_design"] = specimen["submitter_experimental_design"]
#cnv_jobs['samples'][sample_name]["submitter_sample_id"] = sample["submitter_sample_id"]
#cnv_jobs['samples'][sample_name]["sample_uuid"] = sample["sample_uuid"]
cnv_jobs['samples'][sample_name]["analysis_type"] = "somatic_variant_calling"
cnv_jobs['samples'][sample_name]["workflow_name"] = "quay.io/BD2KGenomics/dockstore_workflow_cnv"
cnv_jobs['samples'][sample_name]["analysis_type"] = "cnv_variant_calling"
cnv_jobs['samples'][sample_name]["workflow_name"] = "https://dockstore.org/workflows/BD2KGenomics/dockstore_workflow_cnv"
cnv_jobs['samples'][sample_name]["workflow_version"] = self.workflow_version

print("\nCNV jobs with meta data:", cnv_jobs)
Expand Down Expand Up @@ -503,12 +486,34 @@ def requires(self):
for specimen_type, value in cnv_jobs['samples'][sample_name]['tumor_bams'].iteritems():
print("specimen type:{}".format(specimen_type))
cnv_jobs['samples'][sample_name]['TUMOR_BAM'] = cnv_jobs['samples'][sample_name]['tumor_bams'][specimen_type]['input_json']

#just attach the workflow output metadata to the tumor input sample
#don't also attach it to the normal input sample but to the tumor
#input sample so that the we can tell in the file browser that
#it is associated with Baseline vs Progression
cnv_jobs['samples'][sample_name]['parent_uuids'] = \
cnv_jobs['samples'][sample_name]['normal_bams']['Normal']['parent_uuids'] + \
cnv_jobs['samples'][sample_name]['tumor_bams'][specimen_type]['parent_uuids']
#cnv_jobs['samples'][sample_name]['normal_bams']['Normal']['parent_uuids'] + \

#we hard code the output file names of the individual tools in
#the workflow becuase DockstoreRunner.py will use the filename
#from cwltool.stdout.txt which is 'varscan.cnv' and 'adtex.cnv'
#at dockstore.org
#https://dockstore.org/containers/quay.io/ucsc_cgl/dockstore_tool_varscan_cnv
#https://dockstore.org/containers/quay.io/ucsc_cgl/dockstore_tool_adtex
#and this is the default name of the output file from each tool

#TODO we should use the file name in the registration.tsv file
#created by dockstore_tool_runner, and replace the file name
#in the metadata.json file which is found originally in cwltool.stdout.txt

#output_file_name = '/tmp/' + sample_name + '_' + specimen_type + '_ADTEX.cnv'
output_file_name = '/tmp/adtex.cnv'
cnv_jobs['samples'][sample_name]['ADTEX_OUTCNV'] = {"class" : "File", "path" : output_file_name}
#output_file_name = '/tmp/' + sample_name + '_' + specimen_type + '_VARSCAN.cnv'
output_file_name = '/tmp/varscan.cnv'
cnv_jobs['samples'][sample_name]['VARSCAN_OUTCNV'] = {"class" : "File", "path" : output_file_name}

cnv_jobs['samples'][sample_name]['ADTEX_OUTCNV'] = sample_name + '_' + specimen_type + '_ADTEX.cnv'
cnv_jobs['samples'][sample_name]['VARSCAN_OUTCNV'] = sample_name + '_' + specimen_type + '_VARSCAN.cnv'
full_touch_file_path = touch_file_path + "_" + sample_name + "_" + specimen_type

if (sample_num < int(self.max_jobs) or int(self.max_jobs) < 0):
Expand Down
Loading

0 comments on commit 9675909

Please sign in to comment.