Skip to content

Commit

Permalink
feat(prepro): add option to only error when all segments do not align…
Browse files Browse the repository at this point in the history
… but warn if only some do not
  • Loading branch information
anna-parker committed Jan 30, 2025
1 parent 4230b78 commit 74d5eb2
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 30 deletions.
3 changes: 1 addition & 2 deletions ingest/scripts/metadata_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class Config:
def stream_filter_to_fasta(input, output, keep, config: Config):
for record in orjsonl.stream(input):
if (not config.segmented and record["id"] in keep) or (
config.segmented and "".join(record["id"].split("_")[:-1]) in keep
config.segmented and "_".join(record["id"].split("_")[:-1]) in keep
):
orjsonl.append(output, record)

Expand Down Expand Up @@ -62,7 +62,6 @@ def main(
count = 0
for record in orjsonl.stream(input_metadata):
row = record["metadata"]
logger.debug(f"Filtering metadata: {row}")
accession = record["id"]
count += 1
if all(row[key] == value for key, value in config.metadata_filter.items()):
Expand Down
5 changes: 5 additions & 0 deletions preprocessing/nextclade/src/loculus_preprocessing/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class Config:
batch_size: int = 5
processing_spec: dict[str, dict[str, Any]] = dataclasses.field(default_factory=dict)
pipeline_version: int = 1
multi_segment = False
alignment_requirement: str = "ALL"


def load_config_from_yaml(config_file: str, config: Config | None = None) -> Config:
Expand Down Expand Up @@ -117,4 +119,7 @@ def get_config(config_file: str | None = None) -> Config:
if not config.backend_host: # Set here so we can use organism
config.backend_host = f"http://127.0.0.1:8079/{config.organism}"

if len(config.nucleotideSequences) > 1:
config.multi_segment = True

return config
89 changes: 61 additions & 28 deletions preprocessing/nextclade/src/loculus_preprocessing/prepro.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,13 +212,13 @@ def enrich_with_nextclade( # noqa: C901, PLR0912, PLR0914, PLR0915
),
),
processedFields=(
(
AnnotationSource(
name="main",
type=AnnotationSourceType.NUCLEOTIDE_SEQUENCE,
),
)
),
(
AnnotationSource(
name="main",
type=AnnotationSourceType.NUCLEOTIDE_SEQUENCE,
),
)
),
message=(
"Found unknown segments in the input data - "
"check your segments are annotated correctly."
Expand Down Expand Up @@ -292,7 +292,8 @@ def enrich_with_nextclade( # noqa: C901, PLR0912, PLR0914, PLR0915
# TODO: Add warning to each sequence
logger.info(
f"Gene {gene} not found in Nextclade results expected at: {
translation_path}"
translation_path
}"
)

nextclade_metadata = parse_nextclade_json(
Expand Down Expand Up @@ -392,7 +393,9 @@ def add_input_metadata(
spec: ProcessingSpec,
unprocessed: UnprocessedAfterNextclade,
errors: list[ProcessingAnnotation],
warnings: list[ProcessingAnnotation],
input_path: str,
config: Config,
) -> str | None:
"""Returns value of input_path in unprocessed metadata"""
# If field starts with "nextclade.", take from nextclade metadata
Expand Down Expand Up @@ -431,24 +434,27 @@ def add_input_metadata(
if segment == "main"
else f"Nucleotide sequence for {segment} failed to align"
)
errors.append(
ProcessingAnnotation(
unprocessedFields=(
AnnotationSource(
name=segment,
type=AnnotationSourceType.NUCLEOTIDE_SEQUENCE,
),
annotation = ProcessingAnnotation(
unprocessedFields=(
AnnotationSource(
name=segment,
type=AnnotationSourceType.NUCLEOTIDE_SEQUENCE,
),
processedFields=(
AnnotationSource(
name=segment,
type=AnnotationSourceType.NUCLEOTIDE_SEQUENCE,
),
),
processedFields=(
AnnotationSource(
name=segment,
type=AnnotationSourceType.NUCLEOTIDE_SEQUENCE,
),
message=message,
)
),
message=message,
)
if config.multi_segment and config.alignment_requirement == "ANY":
warnings.append(annotation)
return None
errors.append(annotation)
return None
spec.args["segment_aligned"] = True
result: str | None = str(
dpath.get(
unprocessed.nextcladeMetadata[segment],
Expand Down Expand Up @@ -487,6 +493,7 @@ def get_metadata( # noqa: PLR0913, PLR0917
unprocessed: UnprocessedAfterNextclade | UnprocessedData,
errors: list[ProcessingAnnotation],
warnings: list[ProcessingAnnotation],
config: Config,
) -> ProcessingResult:
input_data: InputMetadata = {}
input_fields: list[str] = []
Expand All @@ -500,7 +507,9 @@ def get_metadata( # noqa: PLR0913, PLR0917
args["submitter"] = unprocessed.submitter
else:
for arg_name, input_path in spec.inputs.items():
input_data[arg_name] = add_input_metadata(spec, unprocessed, errors, input_path)
input_data[arg_name] = add_input_metadata(
spec, unprocessed, errors, warnings, input_path, config
)
input_fields.append(input_path)
args = spec.args
args["submitter"] = unprocessed.inputMetadata["submitter"]
Expand Down Expand Up @@ -599,11 +608,11 @@ def process_single( # noqa: C901
)
],
processedFields=(
AnnotationSource(
name="main",
type=AnnotationSourceType.NUCLEOTIDE_SEQUENCE,
),
AnnotationSource(
name="main",
type=AnnotationSourceType.NUCLEOTIDE_SEQUENCE,
),
),
message="No sequence data found - check segments are annotated correctly",
)
)
Expand Down Expand Up @@ -638,6 +647,7 @@ def process_single( # noqa: C901
if key in config.processing_spec:
output_metadata[key] = len(sequence) if sequence else 0

segment_aligned = False
for output_field, spec_dict in config.processing_spec.items():
length_fields = [
"length" if segment == "main" else "length_" + segment
Expand All @@ -652,15 +662,18 @@ def process_single( # noqa: C901
args=spec_dict.get("args", {}),
)
spec.args = {} if spec.args is None else spec.args
spec.args["segment_aligned"] = segment_aligned
processing_result = get_metadata(
id,
spec,
output_field,
unprocessed,
errors,
warnings,
config,
)
output_metadata[output_field] = processing_result.datum
segment_aligned = spec.args["segment_aligned"]
if (
null_per_backend(processing_result.datum)
and spec.required
Expand All @@ -672,7 +685,8 @@ def process_single( # noqa: C901
AnnotationSource(
name=input,
type=AnnotationSourceType.METADATA,
) for input in spec.inputs.values()
)
for input in spec.inputs.values()
],
processedFields=[
AnnotationSource(
Expand All @@ -690,6 +704,25 @@ def process_single( # noqa: C901
id, unprocessed, config, output_metadata, errors, warnings
)

if not segment_aligned:
errors.append(
ProcessingAnnotation(
unprocessedFields=[
AnnotationSource(
name=config.nucleotideSequences[0],
type=AnnotationSourceType.METADATA,
)
],
processedFields=[
AnnotationSource(
name=config.nucleotideSequences[0],
type=AnnotationSourceType.METADATA,
)
],
message=("No segment aligned."),
)
)

return ProcessedEntry(
accession=accession_from_str(id),
version=version_from_str(id),
Expand Down

0 comments on commit 74d5eb2

Please sign in to comment.