diff --git a/api/data_refinery_api/views/relation_serializers.py b/api/data_refinery_api/views/relation_serializers.py index dbf8bb2c7..34c3611db 100644 --- a/api/data_refinery_api/views/relation_serializers.py +++ b/api/data_refinery_api/views/relation_serializers.py @@ -90,6 +90,7 @@ class Meta: "pretty_platform", "technology", "is_processed", + "is_unable_to_be_processed", ) read_only_fields = fields @@ -112,6 +113,7 @@ class Meta: "manufacturer", "protocol_info", "is_processed", + "is_unable_to_be_processed", "created_at", "last_modified", ) diff --git a/api/data_refinery_api/views/sample.py b/api/data_refinery_api/views/sample.py index 6f08a2ed1..d8333d292 100644 --- a/api/data_refinery_api/views/sample.py +++ b/api/data_refinery_api/views/sample.py @@ -15,8 +15,11 @@ from data_refinery_api.exceptions import InvalidFilters from data_refinery_api.utils import check_filters from data_refinery_api.views.relation_serializers import ( + ComputedFileRelationSerializer, + DownloaderJobRelationSerializer, OrganismIndexRelationSerializer, OrganismRelationSerializer, + ProcessorJobRelationSerializer, ProcessorRelationSerializer, ) from data_refinery_common.models import ( @@ -58,6 +61,10 @@ class DetailedSampleSerializer(serializers.ModelSerializer): annotations = SampleAnnotationSerializer(many=True, source="sampleannotation_set") organism = OrganismRelationSerializer(many=False) results = DetailedSamplesComputationalResultSerializer(many=True) + last_processor_job = ProcessorJobRelationSerializer(many=False) + last_downloader_job = DownloaderJobRelationSerializer(many=False) + most_recent_smashable_file = ComputedFileRelationSerializer(many=False) + most_recent_quant_file = ComputedFileRelationSerializer(many=False) class Meta: model = Sample @@ -90,10 +97,15 @@ class Meta: "compound", "time", "is_processed", + "is_unable_to_be_processed", "created_at", "last_modified", "original_files", "computed_files", + "last_processor_job", + "last_downloader_job", + "most_recent_smashable_file", + "most_recent_quant_file", "contributed_metadata", "contributed_keywords", "experiment_accession_codes", @@ -156,6 +168,7 @@ class SampleListView(generics.ListAPIView): "compound", "time", "is_processed", + "is_unable_to_be_processed", "is_public", ) diff --git a/api/data_refinery_api/views/stats.py b/api/data_refinery_api/views/stats.py index 99141dc7d..19ac6360a 100644 --- a/api/data_refinery_api/views/stats.py +++ b/api/data_refinery_api/views/stats.py @@ -40,7 +40,7 @@ logger = get_and_configure_logger(__name__) -JOB_CREATED_AT_CUTOFF = datetime(2019, 9, 19, tzinfo=timezone.utc) +JOB_CREATED_AT_CUTOFF = datetime(2021, 7, 14, tzinfo=timezone.utc) # We want to cache all stats pages for 10 minutes to reduce the load on our # servers, because computing all of the stats is really expensive diff --git a/api/run_tests.sh b/api/run_tests.sh index e4a75a04a..3bf778973 100755 --- a/api/run_tests.sh +++ b/api/run_tests.sh @@ -19,6 +19,12 @@ if ! [ "$(docker ps --filter name=drdb -q)" ]; then echo "./scripts/run_postgres.sh" >&2 exit 1 fi +# Ensure that elasticsearch is running +if ! [ "$(docker ps --filter name=dres -q)" ]; then + echo "You must start elasticsearchfirst with:" >&2 + echo "./scripts/run_es.sh" >&2 + exit 1 +fi project_root=$(pwd) # "cd .." called above volume_directory="$project_root/test_volume" @@ -33,7 +39,6 @@ fi DB_HOST_IP=$(get_docker_db_ip_address) ES_HOST_IP=$(get_docker_es_ip_address) - # Only run interactively if we are on a TTY if [ -t 1 ]; then INTERACTIVE="-i" diff --git a/common/data_refinery_common/job_management.py b/common/data_refinery_common/job_management.py index f94f6cd4a..4076696fd 100644 --- a/common/data_refinery_common/job_management.py +++ b/common/data_refinery_common/job_management.py @@ -19,6 +19,25 @@ logger = get_and_configure_logger(__name__) +UNFIXABLE_ERRORS = [ + "The annotation package, pd.primeview, could not be loaded.", + "Missing gene index for primeview", + "Missing gene index for zebgene11st", + "This could be indicative of a mismatch between the reference and sample, or a very bad sample.", +] + + +def is_job_unprocessable(job): + if job.success is not False: + return False + + for error in UNFIXABLE_ERRORS: + if job.failure_reason and error in job.failure_reason: + return True + + return False + + def create_downloader_job( undownloaded_files: List[OriginalFile], *, processor_job_id=None, force=False ) -> bool: diff --git a/common/data_refinery_common/migrations/0070_auto_20211208_2118.py b/common/data_refinery_common/migrations/0070_auto_20211208_2118.py new file mode 100644 index 000000000..83a5cf511 --- /dev/null +++ b/common/data_refinery_common/migrations/0070_auto_20211208_2118.py @@ -0,0 +1,57 @@ +# Generated by Django 3.2.7 on 2021-12-08 21:18 + +import django.db.models.deletion +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("data_refinery_common", "0069_rename_compendia_version_computedfile_compendium_version"), + ] + + operations = [ + migrations.AddField( + model_name="sample", + name="is_unable_to_be_processed", + field=models.BooleanField(default=False), + ), + migrations.AddField( + model_name="sample", + name="last_downloader_job", + field=models.ForeignKey( + null=True, + on_delete=django.db.models.deletion.SET_NULL, + to="data_refinery_common.downloaderjob", + ), + ), + migrations.AddField( + model_name="sample", + name="last_processor_job", + field=models.ForeignKey( + null=True, + on_delete=django.db.models.deletion.SET_NULL, + to="data_refinery_common.processorjob", + ), + ), + migrations.AddField( + model_name="sample", + name="most_recent_quant_file", + field=models.ForeignKey( + null=True, + on_delete=django.db.models.deletion.SET_NULL, + related_name="+", + to="data_refinery_common.computedfile", + ), + ), + migrations.AddField( + model_name="sample", + name="most_recent_smashable_file", + field=models.ForeignKey( + null=True, + on_delete=django.db.models.deletion.SET_NULL, + related_name="+", + to="data_refinery_common.computedfile", + ), + ), + ] diff --git a/common/data_refinery_common/models/sample.py b/common/data_refinery_common/models/sample.py index 65cce438d..93f9629ea 100644 --- a/common/data_refinery_common/models/sample.py +++ b/common/data_refinery_common/models/sample.py @@ -71,6 +71,19 @@ def __str__(self): # Crunch Properties is_processed = models.BooleanField(default=False) + is_unable_to_be_processed = models.BooleanField(default=False) + last_processor_job = models.ForeignKey("ProcessorJob", null=True, on_delete=models.SET_NULL) + last_downloader_job = models.ForeignKey("DownloaderJob", null=True, on_delete=models.SET_NULL) + # Set related_name to "+" to prevent the backwards relation, since + # it should be a duplicate of the relation already established by + # the computed_files field. + most_recent_smashable_file = models.ForeignKey( + "ComputedFile", null=True, on_delete=models.SET_NULL, related_name="+" + ) + most_recent_quant_file = models.ForeignKey( + "ComputedFile", null=True, on_delete=models.SET_NULL, related_name="+" + ) + is_unable_to_be_processed = models.BooleanField(default=False) # Blacklisting is_blacklisted = models.BooleanField(default=False) @@ -131,6 +144,11 @@ def get_processor_jobs(self) -> Set: return processor_jobs + def get_most_recent_processor_job(self): + processor_jobs = self.get_processor_jobs() + if processor_jobs: + return min(processor_jobs, key=lambda job: job.created_at) + # Returns a set of DownloaderJob objects but we cannot specify # that in type hints because it hasn't been declared yet. def get_downloader_jobs(self) -> Set: @@ -141,6 +159,11 @@ def get_downloader_jobs(self) -> Set: return downloader_jobs + def get_most_recent_downloader_job(self): + downloader_jobs = self.get_downloader_jobs() + if downloader_jobs: + return min(downloader_jobs, key=lambda job: job.created_at) + def get_result_files(self): """Get all of the ComputedFile objects associated with this Sample""" return self.computed_files.all() diff --git a/common/data_refinery_common/rna_seq.py b/common/data_refinery_common/rna_seq.py index f5b51d968..a9aee68cc 100644 --- a/common/data_refinery_common/rna_seq.py +++ b/common/data_refinery_common/rna_seq.py @@ -28,47 +28,64 @@ EARLY_TXIMPORT_MIN_PERCENT = 0.80 -def should_run_tximport(experiment: Experiment, results, is_tximport_job: bool): - """ Returns whether or not the experiment is eligible to have tximport - run on it. +def get_latest_organism_index(organism): + # Salmon version gets saved as what salmon outputs, which includes this prefix. + current_salmon_version = "salmon " + get_env_variable("SALMON_VERSION", "0.13.1") + return ( + OrganismIndex.objects.filter(salmon_version=current_salmon_version, organism=organism) + .order_by("-created_at") + .first() + ) - results is a set of ComputationalResults for the samples that had salmon quant run on them. - """ - num_quantified = len(results) - if num_quantified == 0: - return False - salmon_versions = set() - for result in results: - if result.organism_index.salmon_version: - salmon_versions.add(result.organism_index.salmon_version) +def get_tximport_inputs_if_eligible(experiment: Experiment, is_tximport_job: bool): + """Returns a list of the most recent quant.sf ComputedFiles for the experiment. - if len(salmon_versions) > 1: - # Tximport requires that all samples are processed with the same salmon version - # https://github.com/AlexsLemonade/refinebio/issues/1496 - return False + If the experiment is not eligble for tximport, then None will be returned instead. + """ + organism_indices = {} + for organism in experiment.organisms.all(): + organism_indices[organism.id] = get_latest_organism_index(organism) + + good_quant_files = [] + num_unprocessable_samples = 0 + for sample in experiment.samples.all(): + if sample.is_unable_to_be_processed: + num_unprocessable_samples += 1 + elif ( + sample.most_recent_quant_file + and sample.most_recent_quant_file.s3_bucket is not None + and sample.most_recent_quant_file.s3_key is not None + ): + sample_organism_index = sample.most_recent_quant_file.result.organism_index + if sample_organism_index == organism_indices[sample.organism.id]: + good_quant_files.append(sample.most_recent_quant_file) + + num_quantified = len(good_quant_files) + if num_quantified == 0: + return None eligible_samples = experiment.samples.filter(source_database="SRA", technology="RNA-SEQ") - num_eligible_samples = eligible_samples.count() - if num_eligible_samples == 0: - return False + num_eligible_samples = eligible_samples.count() - num_unprocessable_samples + if num_eligible_samples <= 0: + return None percent_complete = num_quantified / num_eligible_samples if percent_complete == 1.0: # If an experiment is fully quantified then we should run # tximport regardless of its size. - return True + return good_quant_files if ( is_tximport_job and num_eligible_samples >= EARLY_TXIMPORT_MIN_SIZE and percent_complete >= EARLY_TXIMPORT_MIN_PERCENT ): - return True + return good_quant_files else: - return False + return None def get_quant_results_for_experiment(experiment: Experiment, filter_old_versions=True): @@ -108,25 +125,6 @@ def get_sample_id_set(result): return latest_results -def get_quant_files_for_results(results: List[ComputationalResult]): - """Returns a list of salmon quant results from `experiment`.""" - quant_files = [] - - for result in results: - quant_sf_file = result.get_quant_sf_file() - if quant_sf_file: - quant_files.append(quant_sf_file) - else: - logger.exception( - "Salmon quant result found without quant.sf ComputedFile!", - quant_result=result.id, - sample=result.samples.first(), - ) - raise Exception("Salmon quant result found without quant.sf ComputedFile!") - - return quant_files - - ENA_DOWNLOAD_URL_TEMPLATE = ( "ftp://ftp.sra.ebi.ac.uk/vol1/fastq/{short_accession}{sub_dir}" "/{long_accession}/{long_accession}{read_suffix}.fastq.gz" diff --git a/foreman/data_refinery_foreman/foreman/management/commands/assoc_experiment_results.py b/foreman/data_refinery_foreman/foreman/management/commands/assoc_experiment_results.py deleted file mode 100644 index 60a535666..000000000 --- a/foreman/data_refinery_foreman/foreman/management/commands/assoc_experiment_results.py +++ /dev/null @@ -1,70 +0,0 @@ -"""This command will go through ComputationalResult objects that are a -result of the tximport Processor and associate them with the -experiment that tximport was run on. It's purpose is to populate -missing links in our data model, however once it does so there are -additional PRs planned which will break an assumption that this -command makes, so it should be removed once it has served its purpose. - -The assumption this command is relying on is: - * tximport has only been run on full experiments. -""" - -from django.core.management.base import BaseCommand - -from data_refinery_common.models import * - - -def make_experiment_result_associations(): - """This function performs the function explained at the head of this file. - - It does so by following this general strategy: - 1. Get tximport results by querying based on the processor - 2. Then get one associated sample - 3. Then go through that sample's experiments until an experiment is found - that has all of its samples associated with that result. - 4. Then make an association with that result.""" - - # There are multiple "Processor" objects for tximport because - # we create a new one for each version. However we don't care - # which version was used, we just all tximport results. - tximport_processors = Processor.objects.filter(name="Tximport").all() - - for tximport_processor in tximport_processors: - results = ComputationalResult.objects.filter(processor=tximport_processor) - - for result in results: - result_sample = result.samples.first() - - for experiment in result_sample.experiments.all(): - experiment_samples = experiment.samples.all() - - num_result_associations = 0 - for experiment_sample in experiment_samples: - try: - SampleResultAssociation.objects.get(sample=experiment_sample, result=result) - - # If we've made it here, then the association exists so count it! - num_result_associations += 1 - except Exception: - # If we've made it here, then the - # association doesn't exist so this isn't - # the experiment that the result is for. - break - - if num_result_associations == len(experiment_samples): - # Every sample in the experiment is associated - # with this ComputationalResult, so we can - # safely say the experiment is associated with - # it and make that relationship explicit. - ExperimentResultAssociation.objects.get_or_create( - experiment=experiment, result=result - ) - - -class Command(BaseCommand): - def handle(self, *args, **options): - """This is just the entrypoint for this management command. - - All of its work is done in a separate function because that - makes it much easier to test.""" - make_experiment_result_associations() diff --git a/foreman/data_refinery_foreman/foreman/management/commands/mark_samples_unprocessable.py b/foreman/data_refinery_foreman/foreman/management/commands/mark_samples_unprocessable.py new file mode 100644 index 000000000..be2c9015a --- /dev/null +++ b/foreman/data_refinery_foreman/foreman/management/commands/mark_samples_unprocessable.py @@ -0,0 +1,56 @@ +"""This command will search through all unprocessed samples and check +their most recent processor job's failure reason to see if it was a +type that we know indicates that the sample was either corrupted or of +low enough quality that we cannot hope to ever process it. This will +be stored on the sample's is_unable_to_be_processed field. +""" + +from django.core.management.base import BaseCommand + +from data_refinery_common.job_management import is_job_unprocessable +from data_refinery_common.models import Sample +from data_refinery_common.performant_pagination.pagination import PAGE_SIZE +from data_refinery_common.utils import queryset_page_iterator + + +def mark_unprocessable_samples(): + """This function performs the function explained at the head of this file. + + It does so by following this general strategy: + 1. Find samples that have is_processed=True + 2. Find the most recent processor job for that sample. + 3. Check that processor job's failure reason and mark + the sample accordingly. + + Along the way it will also populate the following fields: + * last_processor_job + * last_downloader_job + * most_recent_smashable_file + * most_recent_quant_file + """ + unprocessed_samples = Sample.objects.filter(is_processed=False).prefetch_related( + "original_files__processor_jobs" + ) + + results = queryset_page_iterator(unprocessed_samples, PAGE_SIZE) + + for page in results: + for sample in page: + sample.last_processor_job = sample.get_most_recent_processor_job() + sample.last_downloader_job = sample.get_most_recent_downloader_job() + sample.most_recent_smashable_file = sample.get_most_recent_smashable_result_file() + sample.most_recent_quant_file = sample.get_most_recent_quant_sf_file() + + if not sample.is_processed and sample.last_processor_job: + sample.is_unable_to_be_processed = is_job_unprocessable(sample.last_processor_job) + + sample.save() + + +class Command(BaseCommand): + def handle(self, *args, **options): + """This is just the entrypoint for this management command. + + All of its work is done in a separate function because that + makes it much easier to test.""" + mark_unprocessable_samples() diff --git a/foreman/data_refinery_foreman/foreman/management/commands/run_tximport.py b/foreman/data_refinery_foreman/foreman/management/commands/run_tximport.py index d6af7aea9..6b93b4324 100644 --- a/foreman/data_refinery_foreman/foreman/management/commands/run_tximport.py +++ b/foreman/data_refinery_foreman/foreman/management/commands/run_tximport.py @@ -20,7 +20,7 @@ ProcessorJobOriginalFileAssociation, ) from data_refinery_common.performant_pagination.pagination import PerformantPaginator as Paginator -from data_refinery_common.rna_seq import get_quant_results_for_experiment, should_run_tximport +from data_refinery_common.rna_seq import get_tximport_inputs_if_eligible logger = get_and_configure_logger(__name__) @@ -35,9 +35,8 @@ def run_tximport_if_eligible(experiment: Experiment, dispatch_jobs=True) -> bool Returns the ProcessorJob if a job was created or None if one was not. """ tximport_pipeline = ProcessorPipeline.TXIMPORT - quant_results = get_quant_results_for_experiment(experiment) - if should_run_tximport(experiment, quant_results, True): + if get_tximport_inputs_if_eligible(experiment, True): processor_job = ProcessorJob() processor_job.pipeline_applied = tximport_pipeline.value processor_job.ram_amount = 32768 diff --git a/foreman/data_refinery_foreman/foreman/management/commands/test_assoc_experiment_results.py b/foreman/data_refinery_foreman/foreman/management/commands/test_assoc_experiment_results.py deleted file mode 100644 index 7b6a066a0..000000000 --- a/foreman/data_refinery_foreman/foreman/management/commands/test_assoc_experiment_results.py +++ /dev/null @@ -1,128 +0,0 @@ -from django.test import TransactionTestCase - -from data_refinery_common.models import ( - ComputationalResult, - Experiment, - ExperimentResultAssociation, - ExperimentSampleAssociation, - Organism, - Processor, - Sample, - SampleResultAssociation, -) -from data_refinery_foreman.foreman.management.commands.assoc_experiment_results import ( - make_experiment_result_associations, -) - - -class SurveyTestCase(TransactionTestCase): - def test_make_experiment_result_associations(self): - """Tests that the correct associations are made. - - The situation we're setting up is basically this: - * tximport has been run for an experiment. - * It made associations between the samples in - the experiment and the ComputationalResult. - * It didn't make associations between the - experiment itself and the ComputationalResult. - * There is a second experiment that hasn't had - tximport run but shares a sample with the - other experiment. - * This second experiment has a sample which has - not yet had tximport run on it. - - And what we're going to test for is: - * An association is created between the tximport - result and the first experiment. - * An association is NOT created between the - tximport result and the second experiment. - """ - # Get an organism to set on samples: - homo_sapiens = Organism.get_object_for_name("HOMO_SAPIENS", taxonomy_id=9606) - - # Create the tximport processor and result: - processor = Processor() - processor.name = "Tximport" - processor.version = "v9.9.9" - processor.docker_image = "dr_salmon" - processor.environment = '{"some": "environment"}' - processor.save() - - result = ComputationalResult() - result.commands.append("tximport invocation") - result.is_ccdl = True - result.processor = processor - result.save() - - # Create the first experiment and it's samples: - processed_experiment = Experiment() - processed_experiment.accession_code = "SRP12345" - processed_experiment.save() - - processed_sample_one = Sample() - processed_sample_one.accession_code = "SRX12345" - processed_sample_one.title = "SRX12345" - processed_sample_one.organism = homo_sapiens - processed_sample_one.save() - - sra = SampleResultAssociation() - sra.sample = processed_sample_one - sra.result = result - sra.save() - - esa = ExperimentSampleAssociation() - esa.experiment = processed_experiment - esa.sample = processed_sample_one - esa.save() - - processed_sample_two = Sample() - processed_sample_two.accession_code = "SRX12346" - processed_sample_two.title = "SRX12346" - processed_sample_two.organism = homo_sapiens - processed_sample_two.save() - - sra = SampleResultAssociation() - sra.sample = processed_sample_two - sra.result = result - sra.save() - - esa = ExperimentSampleAssociation() - esa.experiment = processed_experiment - esa.sample = processed_sample_two - esa.save() - - # Create the second experiment and it's additional sample. - unprocessed_experiment = Experiment() - unprocessed_experiment.accession_code = "SRP6789" - unprocessed_experiment.save() - - unprocessed_sample = Sample() - unprocessed_sample.accession_code = "SRX6789" - unprocessed_sample.title = "SRX6789" - unprocessed_sample.organism = homo_sapiens - unprocessed_sample.save() - - sra = SampleResultAssociation() - sra.sample = unprocessed_sample - sra.result = result - sra.save() - - esa = ExperimentSampleAssociation() - esa.experiment = unprocessed_experiment - esa.sample = unprocessed_sample - esa.save() - - esa = ExperimentSampleAssociation() - esa.experiment = unprocessed_experiment - esa.sample = processed_sample_two - esa.save() - - # Run the function we're testing: - make_experiment_result_associations() - - # Test that only one association was created and that it was - # to the processed experiment: - eras = ExperimentResultAssociation.objects.all() - - self.assertEqual(len(eras), 1) - self.assertEqual(eras.first().experiment, processed_experiment) diff --git a/foreman/data_refinery_foreman/foreman/management/commands/test_run_tximport.py b/foreman/data_refinery_foreman/foreman/management/commands/test_run_tximport.py index 52315e55c..d5eecf070 100644 --- a/foreman/data_refinery_foreman/foreman/management/commands/test_run_tximport.py +++ b/foreman/data_refinery_foreman/foreman/management/commands/test_run_tximport.py @@ -170,6 +170,8 @@ def prep_tximport_at_progress_point( quant_file.s3_key = "key" quant_file.save() + sample.most_recent_quant_file = quant_file + sample.save() SampleResultAssociation.objects.get_or_create(sample=sample, result=quant_result) diff --git a/foreman/data_refinery_foreman/surveyor/transcriptome_index.py b/foreman/data_refinery_foreman/surveyor/transcriptome_index.py index ed9e2df2c..cc84b37b9 100644 --- a/foreman/data_refinery_foreman/surveyor/transcriptome_index.py +++ b/foreman/data_refinery_foreman/surveyor/transcriptome_index.py @@ -90,7 +90,10 @@ def get_species_detail_by_assembly(assembly: str, division: str) -> str: # Generally bad to roll your own CSV parser, but some # encoding issue seemed to have been breaking the csv # parser module and this works. - row = line.decode("utf-8").strip().split("\t") + try: + row = line.decode("utf-8").strip().split("\t") + except UnicodeDecodeError: + row = line.decode("latin").strip().split("\t") if not header: header = row diff --git a/foreman/data_refinery_foreman/test_imports.py b/foreman/data_refinery_foreman/test_imports.py index 4ce330339..ef1cf2633 100644 --- a/foreman/data_refinery_foreman/test_imports.py +++ b/foreman/data_refinery_foreman/test_imports.py @@ -4,7 +4,30 @@ class ImportTestCase(TestCase): def test_imports(self): # Make sure we can import the foreman tests - import data_refinery_foreman.foreman.management.commands.test_assoc_experiment_results + import data_refinery_foreman.foreman.management.commands.check_computed_files + import data_refinery_foreman.foreman.management.commands.check_missing_results + import data_refinery_foreman.foreman.management.commands.correct_affy_cdfs + import data_refinery_foreman.foreman.management.commands.create_compendia + import data_refinery_foreman.foreman.management.commands.create_missing_downloader_jobs + import data_refinery_foreman.foreman.management.commands.create_missing_processor_jobs + import data_refinery_foreman.foreman.management.commands.create_quantpendia + import data_refinery_foreman.foreman.management.commands.feed_the_beast + import data_refinery_foreman.foreman.management.commands.fix_compendia + import data_refinery_foreman.foreman.management.commands.generate_dataset_report + import data_refinery_foreman.foreman.management.commands.get_job_to_be_run + import data_refinery_foreman.foreman.management.commands.get_quant_sf_size + import data_refinery_foreman.foreman.management.commands.import_external_sample_attributes + import data_refinery_foreman.foreman.management.commands.import_external_sample_keywords + import data_refinery_foreman.foreman.management.commands.import_ontology + import data_refinery_foreman.foreman.management.commands.mark_samples_unprocessable + import data_refinery_foreman.foreman.management.commands.organism_shepherd + import data_refinery_foreman.foreman.management.commands.remove_dead_computed_files + import data_refinery_foreman.foreman.management.commands.rerun_salmon_old_samples + import data_refinery_foreman.foreman.management.commands.retry_jobs + import data_refinery_foreman.foreman.management.commands.retry_samples + import data_refinery_foreman.foreman.management.commands.retry_timed_out_jobs + import data_refinery_foreman.foreman.management.commands.run_tximport + import data_refinery_foreman.foreman.management.commands.stop_jobs import data_refinery_foreman.foreman.management.commands.test_correct_affy_cdfs import data_refinery_foreman.foreman.management.commands.test_create_compendia import data_refinery_foreman.foreman.management.commands.test_create_missing_downloader_jobs @@ -17,6 +40,9 @@ def test_imports(self): import data_refinery_foreman.foreman.management.commands.test_retry_samples import data_refinery_foreman.foreman.management.commands.test_run_tximport import data_refinery_foreman.foreman.management.commands.test_update_experiment_metadata + import data_refinery_foreman.foreman.management.commands.update_downloadable_samples + import data_refinery_foreman.foreman.management.commands.update_experiment_metadata + import data_refinery_foreman.foreman.management.commands.update_sample_metadata import data_refinery_foreman.foreman.test_downloader_job_manager import data_refinery_foreman.foreman.test_end_to_end import data_refinery_foreman.foreman.test_job_control diff --git a/infrastructure/api-configuration/api-server-instance-user-data.tpl.sh b/infrastructure/api-configuration/api-server-instance-user-data.tpl.sh index b734341cf..3f475c98b 100644 --- a/infrastructure/api-configuration/api-server-instance-user-data.tpl.sh +++ b/infrastructure/api-configuration/api-server-instance-user-data.tpl.sh @@ -26,51 +26,55 @@ service nginx restart if [[ "${stage}" == "staging" || "${stage}" == "prod" ]]; then # Check here for the cert in S3, if present install, if not run certbot. if [[ $(aws s3 ls "${data_refinery_cert_bucket}" | wc -l) == "0" ]]; then - # Create and install SSL Certificate for the API. - # Only necessary on staging and prod. - # We cannot use ACM for this because *.bio is not a Top Level Domain that Route53 supports. - snap install core - snap refresh core - snap install --classic certbot - apt-get update - apt-get install -y python-certbot-nginx - + # Create and install SSL Certificate for the API. + # Only necessary on staging and prod. + # We cannot use ACM for this because *.bio is not a Top Level Domain that Route53 supports. + snap install core + snap refresh core + snap install --classic certbot + apt-get update + apt-get install -y python-certbot-nginx + + # Temporary to see what this outputs + curl 'http://api.refine.bio' # The certbot challenge cannot be completed until the aws_lb_target_group_attachment resources are created. - sleep 180 - - # g3w4k4t5n3s7p7v8@alexslemonade.slack.com is the email address we - # have configured to forward mail to the #teamcontact channel in - # slack. Certbot will use it for "important account - # notifications". - # In the future, if we ever hit the 5-deploy-a-week limit, changing one of these lines to: - # certbot --nginx -d api.staging.refine.bio -d api2.staging.refine.bio -n --agree-tos --redirect -m g3w4k4t5n3s7p7v8@alexslemonade.slack.com - # will circumvent certbot's limit because the 5-a-week limit only applies to the - # "same set of domains", so by changing that set we get to use the 20-a-week limit. - if [[ "${stage}" == "staging" ]]; then + sleep 300 + # Temporary to see what this outputs + curl 'http://api.refine.bio' + + # g3w4k4t5n3s7p7v8@alexslemonade.slack.com is the email address we + # have configured to forward mail to the #teamcontact channel in + # slack. Certbot will use it for "important account + # notifications". + # In the future, if we ever hit the 5-deploy-a-week limit, changing one of these lines to: + # certbot --nginx -d api.staging.refine.bio -d api2.staging.refine.bio -n --agree-tos --redirect -m g3w4k4t5n3s7p7v8@alexslemonade.slack.com + # will circumvent certbot's limit because the 5-a-week limit only applies to the + # "same set of domains", so by changing that set we get to use the 20-a-week limit. + if [[ "${stage}" == "staging" ]]; then certbot --nginx -d api.staging.refine.bio -n --agree-tos --redirect -m g3w4k4t5n3s7p7v8@alexslemonade.slack.com - elif [[ "${stage}" == "prod" ]]; then + elif [[ "${stage}" == "prod" ]]; then certbot --nginx -d api.refine.bio -n --agree-tos --redirect -m g3w4k4t5n3s7p7v8@alexslemonade.slack.com - fi + fi - # Add the nginx.conf file that certbot setup to the zip dir. - cp /etc/nginx/nginx.conf /etc/letsencrypt/ + # Add the nginx.conf file that certbot setup to the zip dir. + cp /etc/nginx/nginx.conf /etc/letsencrypt/ - cd /etc/letsencrypt/ || exit - sudo zip -r ../letsencryptdir.zip "../$(basename "$PWD")" + cd /etc/letsencrypt/ || exit + sudo zip -r ../letsencryptdir.zip "../$(basename "$PWD")" - # And then cleanup the extra copy. - rm /etc/letsencrypt/nginx.conf + # And then cleanup the extra copy. + rm /etc/letsencrypt/nginx.conf - cd - || exit - mv /etc/letsencryptdir.zip . - aws s3 cp letsencryptdir.zip "s3://${data_refinery_cert_bucket}/" - rm letsencryptdir.zip + cd - || exit + mv /etc/letsencryptdir.zip . + aws s3 cp letsencryptdir.zip "s3://${data_refinery_cert_bucket}/" + rm letsencryptdir.zip else - zip_filename=$(aws s3 ls "${data_refinery_cert_bucket}" | head -1 | awk '{print $4}') - aws s3 cp "s3://${data_refinery_cert_bucket}/$zip_filename" letsencryptdir.zip - unzip letsencryptdir.zip -d /etc/ - mv /etc/letsencrypt/nginx.conf /etc/nginx/ - service nginx restart + zip_filename=$(aws s3 ls "${data_refinery_cert_bucket}" | head -1 | awk '{print $4}') + aws s3 cp "s3://${data_refinery_cert_bucket}/$zip_filename" letsencryptdir.zip + unzip letsencryptdir.zip -d /etc/ + mv /etc/letsencrypt/nginx.conf /etc/nginx/ + service nginx restart fi fi diff --git a/infrastructure/batch/launch_templates.tf b/infrastructure/batch/launch_templates.tf index c4216b008..988c68ab7 100644 --- a/infrastructure/batch/launch_templates.tf +++ b/infrastructure/batch/launch_templates.tf @@ -23,6 +23,16 @@ resource "aws_launch_template" "data_refinery_worker" { } user_data = base64encode(var.data_refinery_worker_user_data) + + tag_specifications { + resource_type = "instance" + tags = var.default_tags + } + + tag_specifications { + resource_type = "volume" + tags = var.default_tags + } } resource "aws_launch_template" "data_refinery_compendia" { @@ -50,4 +60,14 @@ resource "aws_launch_template" "data_refinery_compendia" { } user_data = base64encode(var.data_refinery_worker_user_data) + + tag_specifications { + resource_type = "instance" + tags = var.default_tags + } + + tag_specifications { + resource_type = "volume" + tags = var.default_tags + } } diff --git a/infrastructure/deploy_box_instance_data.sh b/infrastructure/deploy_box_instance_data.sh index 8ee6db880..25f7bf5c7 100644 --- a/infrastructure/deploy_box_instance_data.sh +++ b/infrastructure/deploy_box_instance_data.sh @@ -33,9 +33,8 @@ # - Record the IPv4 Public IP of the instance you just created. -# Finally, go into the CirlceCI web application and select the refinebio project. -# Go to the project settings and navigate to the Environment Variables tab. -# Click 'Add Variable' and set the name to DEPLOY_IP_ADDRESS and the value to the +# Finally, go into the Github Actions's Secrets for the refinebio repo. +# Click 'Add Secret' and set the name to DEPLOY_IP_ADDRESS and the value to the # IP address of the EC2 instance you created. # Also, if you want to be notified on slack after the deploy finishes, you can add diff --git a/infrastructure/security.tf b/infrastructure/security.tf index 5522d2bab..517ecc2cc 100644 --- a/infrastructure/security.tf +++ b/infrastructure/security.tf @@ -5,7 +5,7 @@ # debugging. Long term we may want to remove this entirely. resource "aws_key_pair" "data_refinery" { key_name = "data-refinery-key-${var.user}-${var.stage}" - public_key = "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAACAQDiLOhQIWHZ2cttRDlECfhg/99yCh8BzVGFqRNFRgmPKE7ewAeUAyenItQXRjcea19nVP0o3SoVurEQldJP89U26KgNpp8LVz6BZJCV9b5Bt0tDCi6WstiK/ImZ+gWTkcp4b3hj1UVyhsUxUmhv89DvmmsjZ/b9scUMTdB5DG/DPzQj/gv/AHnGynZ213xC+VXNyqfFXM9FcMtJxu6x55VxLaIuVxkEqTbTVtT0+Wdhh5tfVcVzLrTYTLBTtB1xBYBC+NK9L1tPBV66HIeZ7XasgARaZsIE0loQtvGP5cSGMFqGtQFQRzSZH6lJL1+MkRWVNU7roSp/hRJXBh/MHt8Qtmaf7XpCNurW65YVWQGXHbJejIL9C+zdDmbekD1gROfjpmvGniaBtkgCEFWf2YCNT2jwpHZIT8HmPLJUfP+bJSkBORXDljhxkqdFKjmaVER6f70r0hL5BFTekkWRVmv5EDANgV1CK9G5XsXPIK2SujemcEHmAkapHUMN6fblUN9OSBpmYNzhvqN1fNf98US1Cbf8UNDL9urskDGpCWk+4OWHu3zz0hFnsWB9AbGEdfJ9GhLXtF7Y06upQLIh+6eQiW17/ffMiNFWfRs+jTpZslItXK/MT+jJaGEApZ9vT8GdRGIBL1kR7eXbY1+GJlrOvZ4YZfMjq2HyAgDi7ltEDw==" + public_key = "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAACAQDCImXhaIS+uh0rJ6V88V1ZN6pH7pSXA22t7zYTnlRXXoHgKIUpc1iH07dDwrKIAYmwCO36H0dmlxymsEvnDJA2alPxic2GXe04ExGOEuFeemmdvmw9NUk+2vRNUPrKgRIsZZ2QSrXkvEyoFGgJQ1ywx4ZbkHxLaxgAr0wgzyv1bdOo3RiOPA3pSiN2hzATlxGx655fCPhjTTCsM9T7HzkF+UGY6tJ8z93aC3vDtAjBi9vpgrwEL/xv7l8lU0ECjfYKQFlHFncUrcMIs33MlkOk9e4VbD5lKzghCSaNV2+dBjxmWwCAIpmEag5/Y/07m7qg5NDhjJyB/BC9+f/2hu8lMR7IhZNDP7xhOs4X27R01huhJOdAV3dodb2BPWxRu2jlaaXHorfuH5ORV5csW6ubCMwl2HzF18bH3k87wvo37xvvh+YSeLQwrwaC/lPRU5f8j5AJynb3B/QeBR5RU/3b4TFdLvMakVs2dy5miQdwIuDRD49UYtRsJqaOtRbSPtgWLrsO1yT4yhQQCYXgTGwP8o2Jsnpm4+V1bFSzMNSwPpDa5jUVDGLLZ08ldfc6XiRnd2ATtcrs6Y/YqEbzyW6lv49lMQXqzwUx0OfnQ+vcWjaQmQRIfP6VmEYvBUrrBuxECLZD7w1IYB8+Dyi1JOSfm7n4uoW5kO/PsADA6UtAzQ==" } diff --git a/workers/data_refinery_workers/downloaders/utils.py b/workers/data_refinery_workers/downloaders/utils.py index aea0d91c3..0af248f47 100644 --- a/workers/data_refinery_workers/downloaders/utils.py +++ b/workers/data_refinery_workers/downloaders/utils.py @@ -108,6 +108,11 @@ def end_downloader_job(job: DownloaderJob, success: bool): failure_reason=job.failure_reason, ) + for original_file in job.original_files.all(): + for sample in original_file.samples.all(): + sample.last_downloader_job = job + sample.save() + job.success = success job.end_time = timezone.now() job.save() diff --git a/workers/data_refinery_workers/processors/salmon.py b/workers/data_refinery_workers/processors/salmon.py index 2ae107681..476deda1b 100644 --- a/workers/data_refinery_workers/processors/salmon.py +++ b/workers/data_refinery_workers/processors/salmon.py @@ -29,11 +29,7 @@ SampleComputedFileAssociation, SampleResultAssociation, ) -from data_refinery_common.rna_seq import ( - get_quant_files_for_results, - get_quant_results_for_experiment, - should_run_tximport, -) +from data_refinery_common.rna_seq import get_tximport_inputs_if_eligible from data_refinery_common.utils import get_env_variable from data_refinery_workers.processors import utils @@ -113,6 +109,10 @@ def _prepare_files(job_context: Dict) -> Dict: job_context["failure_reason"] = failure_reason logger.error(failure_reason, sample=sample, processor_job=job_context["job_id"]) + # We can't process it, we should mark it as such. + sample.is_unable_to_be_processed = True + sample.save() + # No need to retry and fail more than once for this reason. job_context["success"] = False job_context["job"].failure_reason = failure_reason @@ -315,6 +315,9 @@ def _determine_index_length(job_context: Dict) -> Dict: job_context["job"].failure_reason = "Unable to determine number_of_reads." job_context["job"].no_retry = True job_context["success"] = False + + job_context["sample"].is_unable_to_be_processed = True + job_context["sample"].save() return job_context index_length_raw = total_base_pairs / number_of_reads @@ -605,7 +608,7 @@ def _run_tximport_for_experiment( return job_context -def get_tximport_inputs(job_context: Dict) -> Dict: +def set_tximport_inputs(job_context: Dict) -> Dict: """Adds to the job_context a mapping from experiments to a list of their quant files. Checks all the experiments which contain a sample from the current @@ -623,17 +626,17 @@ def get_tximport_inputs(job_context: Dict) -> Dict: if not eligible_samples.exists(): continue - salmon_quant_results = get_quant_results_for_experiment(experiment) is_tximport_job = "is_tximport_only" in job_context and job_context["is_tximport_only"] + salmon_quant_files = get_tximport_inputs_if_eligible(experiment, is_tximport_job) - if is_tximport_job and len(salmon_quant_results): + if is_tximport_job and salmon_quant_files: # If the job is only running tximport, then index_length # hasn't been set on the job context because we don't have # a raw file to run it on. Therefore pull it from one of # the result annotations. # Can't just do salmon_quant_results[0] because it's a set. - index_length = next(iter(salmon_quant_results)).get_index_length() + index_length = salmon_quant_files[0].result.get_index_length() if index_length: job_context["index_length"] = index_length elif "index_length" not in job_context: @@ -646,8 +649,8 @@ def get_tximport_inputs(job_context: Dict) -> Dict: no_retry=True, ) - if should_run_tximport(experiment, salmon_quant_results, is_tximport_job): - quantified_experiments[experiment] = get_quant_files_for_results(salmon_quant_results) + if salmon_quant_files: + quantified_experiments[experiment] = salmon_quant_files job_context["tximport_inputs"] = quantified_experiments @@ -928,6 +931,8 @@ def _run_salmon(job_context: Dict) -> Dict: SampleResultAssociation.objects.get_or_create( sample=job_context["sample"], result=result ) + job_context["sample"].most_recent_quant_file = quant_file + job_context["sample"].save() salmon_quant_archive.result = result salmon_quant_archive.save() @@ -1109,7 +1114,7 @@ def salmon(job_id: int) -> None: _determine_index_length, _find_or_download_index, _run_salmon, - get_tximport_inputs, + set_tximport_inputs, tximport, _run_salmontools, utils.end_job, diff --git a/workers/data_refinery_workers/processors/test_salmon.py b/workers/data_refinery_workers/processors/test_salmon.py index 9aa9f420d..afe00ca03 100644 --- a/workers/data_refinery_workers/processors/test_salmon.py +++ b/workers/data_refinery_workers/processors/test_salmon.py @@ -288,7 +288,7 @@ def check_salmon_quant(self, job_context, sample_dir): job_context = salmon._find_or_download_index(job_context) job_context = salmon._run_salmon(job_context) - job_context = salmon.get_tximport_inputs(job_context) + job_context = salmon.set_tximport_inputs(job_context) job_context = salmon.tximport(job_context) output_quant_filename = os.path.join(job_context["output_directory"], "quant.sf") self.assertTrue(os.path.exists(output_quant_filename)) @@ -364,7 +364,7 @@ def test_salmon_quant_one_sample_double_reads(self): # Confirm that this experiment is not ready for tximport yet, # because `salmon quant` is not run on 'fake_sample'. - experiments_ready = salmon.get_tximport_inputs(job_context)["tximport_inputs"] + experiments_ready = salmon.set_tximport_inputs(job_context)["tximport_inputs"] self.assertEqual(len(experiments_ready), 0) @tag("salmon") @@ -435,7 +435,7 @@ def test_salmon_quant_two_samples_single_read(self): # Check quant.sf in `salmon quant` output dir of sample1 self.check_salmon_quant(job1_context, sample1_dir) # Confirm that this experiment is not ready for tximport yet. - experiments_ready = salmon.get_tximport_inputs(job1_context)["tximport_inputs"] + experiments_ready = salmon.set_tximport_inputs(job1_context)["tximport_inputs"] self.assertEqual(len(experiments_ready), 0) # This job should not have produced any tximport output # because the other sample isn't ready yet. @@ -495,13 +495,13 @@ def test_salmon_quant_two_samples_single_read(self): self.assertTrue(os.path.isfile(file.absolute_file_path)) @tag("salmon") - def test_get_tximport_inputs(self): + def test_set_tximport_inputs(self): """"Tests that tximport only considers RNA-Seq samples from GEO. """ # Create one experiment and two related samples, based on: # https://www.ncbi.nlm.nih.gov/sra/?term=SRP040623 # (We don't need any original files because - # get_tximport_inputs doesn't consider them.) + # set_tximport_inputs doesn't consider them.) experiment_accession = "PRJNA242809" experiment = Experiment.objects.create(accession_code=experiment_accession) @@ -552,7 +552,7 @@ def test_get_tximport_inputs(self): comp_file.s3_bucket = "bucket" comp_file.save() - quantified_experiments = salmon.get_tximport_inputs({"sample": sample1})["tximport_inputs"] + quantified_experiments = salmon.set_tximport_inputs({"sample": sample1})["tximport_inputs"] self.assertEqual({}, quantified_experiments) @@ -923,6 +923,8 @@ def create_tximport_job_context( quant_file.save() SampleResultAssociation.objects.get_or_create(sample=sample, result=quant_result) + sample.most_recent_quant_file = quant_file + sample.save() # Processor jobs need at least one original file associated with # them so they know what they're processing. @@ -968,7 +970,7 @@ def create_tximport_job_context( def run_tximport_for_job_context(job_context: Dict) -> Dict: job_context = salmon._find_or_download_index(job_context) - job_context = salmon.get_tximport_inputs(job_context) + job_context = salmon.set_tximport_inputs(job_context) job_context = salmon.tximport(job_context) job_context = utils.end_job(job_context) @@ -1051,7 +1053,7 @@ def test_early_tximport(self): rds_file = ComputedFile.objects.get(filename="txi_out.RDS") for accession_code in complete_accessions: - # Check to make sure that all the associations were madep + # Check to make sure that all the associations were made # correctly. These queries will fail if they weren't. tpm_file = ComputedFile.objects.get( filename=accession_code + "_output_gene_lengthScaledTPM.tsv" @@ -1270,11 +1272,14 @@ def test_missing_computed_file(self): job_context = create_tximport_job_context(complete_accessions, incomplete_accessions) - result = Sample.objects.filter(accession_code="SRR5125621").first().results.first() + sample = Sample.objects.filter(accession_code="SRR5125621").first() + result = sample.results.first() computed_file = ComputedFile.objects.get(result=result, filename="quant.sf") computed_file.s3_bucket = None computed_file.s3_key = None computed_file.save() + sample.most_recent_quant_file = computed_file + sample.save() job_context = run_tximport_for_job_context(job_context) diff --git a/workers/data_refinery_workers/processors/tximport.py b/workers/data_refinery_workers/processors/tximport.py index a28e4d77b..7395815c2 100644 --- a/workers/data_refinery_workers/processors/tximport.py +++ b/workers/data_refinery_workers/processors/tximport.py @@ -63,7 +63,7 @@ def tximport(job_id: int) -> None: utils.start_job, _set_job_prefix, _prepare_files, - salmon.get_tximport_inputs, + salmon.set_tximport_inputs, salmon._find_or_download_index, salmon.tximport, utils.end_job, diff --git a/workers/data_refinery_workers/processors/utils.py b/workers/data_refinery_workers/processors/utils.py index 215b1d7e1..05c860254 100644 --- a/workers/data_refinery_workers/processors/utils.py +++ b/workers/data_refinery_workers/processors/utils.py @@ -251,6 +251,13 @@ def end_job(job_context: Dict, abort=False): job.failure_reason = "Failed to upload computed file." break + # Both of these types of computed files should have a + # sample, but double check anyway. + if computed_file.is_smashable: + for sample in computed_file.samples.all(): + sample.most_recent_smashable_file = computed_file + sample.save() + if not success: for computed_file in job_context.get("computed_files", []): computed_file.delete_local_file() @@ -354,6 +361,11 @@ def end_job(job_context: Dict, abort=False): failure_reason=job.failure_reason, ) + if ProcessorPipeline(job.pipeline_applied) not in SMASHER_JOB_TYPES: + for sample in job_context.get("samples", []): + sample.last_processor_job = job + sample.save() + # Return Final Job context so testers can check it return job_context