Skip to content

Commit

Permalink
remove filter fasta headers option
Browse files Browse the repository at this point in the history
  • Loading branch information
anna-parker committed Jan 30, 2025
1 parent d5b8e1f commit 6e34ffd
Show file tree
Hide file tree
Showing 6 changed files with 154 additions and 117 deletions.
101 changes: 60 additions & 41 deletions ingest/Snakefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,29 +23,32 @@ TAXON_ID = config["taxon_id"]
SEGMENTED = config["segmented"]
LOG_LEVEL = config.get("log_level", "INFO")
NCBI_API_KEY = os.getenv("NCBI_API_KEY")
FILTER_FASTA_HEADERS = config.get("filter_fasta_headers", None)
APPROVE_TIMEOUT_MIN = config.get("approve_timeout_min") # time in minutes
CHECK_ENA_DEPOSITION = config.get("check_ena_deposition", False)
ALIGN = True
GROUPS_OVERRIDE_JSON = None
FILTER = config.get("metadata_filter", None)
GROUPS_OVERRIDE_JSON = config.get("grouping_override", None) # JSON map from group to segments' insdcAccessionFull

dataset_server_map = {}
dataset_name_map = {}

if SEGMENTED:
GROUPS_OVERRIDE_JSON = config.get("grouping_override", None) # JSON map from group to segments' insdcAccessionFull
if config.get("minimizer_index") and config.get("minimizer_parser"):
if config.get("nextclade_sort"):
ALIGN = False
if ALIGN:
elif config.get("nextclade_align"):
ALIGN = True
nextclade_align_params = config["nextclade_align"]
for segment in config["nucleotide_sequences"]:
if config.get("nextclade_dataset_server_map") and segment in config["nextclade_dataset_server_map"]:
dataset_server_map[segment] = config["nextclade_dataset_server_map"][segment]
if nextclade_align_params.get("nextclade_dataset_server_map") and segment in nextclade_align_params["nextclade_dataset_server_map"]:
dataset_server_map[segment] = nextclade_align_params["nextclade_dataset_server_map"][segment]
else:
dataset_server_map[segment] = config.get("nextclade_dataset_server")
if config.get("nextclade_dataset_name_map") and segment in config["nextclade_dataset_name_map"]:
dataset_name_map[segment] = config["nextclade_dataset_name_map"][segment]
dataset_server_map[segment] = nextclade_align_params.get("nextclade_dataset_server")
if nextclade_align_params.get("nextclade_dataset_name_map") and segment in nextclade_align_params["nextclade_dataset_name_map"]:
dataset_name_map[segment] = nextclade_align_params["nextclade_dataset_name_map"][segment]
else:
dataset_name_map[segment] = config.get("nextclade_dataset_name") + "/" + segment
dataset_name_map[segment] = nextclade_align_params.get("nextclade_dataset_name") + "/" + segment
else:
raise ValueError("No alignment or sorting specified for segmented sequences")

if os.uname().sysname == "Darwin":
# Don't use conda-forge unzip on macOS
Expand Down Expand Up @@ -157,13 +160,12 @@ rule extract_ncbi_dataset_sequences:
output:
ncbi_dataset_sequences="results/sequences.fasta",
params:
header_arg="" if FILTER_FASTA_HEADERS else "-i", # Keep full header if segmented
unzip=unzip,
shell:
"""
{params.unzip} -jp {input.dataset_package} \
ncbi_dataset/data/genomic.fna \
| seqkit seq -w0 {params.header_arg} \
| seqkit seq -w0 -i \
> {output.ncbi_dataset_sequences}
"""

Expand All @@ -184,36 +186,10 @@ rule calculate_sequence_hashes:
--output-sequences {output.sequence_json}
"""


if FILTER_FASTA_HEADERS:

rule filter_fasta_headers:
input:
sequences="results/sequences.fasta",
script="scripts/filter_sequences.py",
config="results/config.yaml",
output:
results="results/sequences_filtered.fasta",
params:
filter_fasta_headers=FILTER_FASTA_HEADERS,
log_level=LOG_LEVEL,
shell:
"""
python {input.script} \
--input-seq {input.sequences} \
--output-seq {output.results} \
--log-level {params.log_level} \
--config-file {input.config} \
"""

if ALIGN:
rule align:
input:
sequences=(
"results/sequences_filtered.fasta"
if FILTER_FASTA_HEADERS
else "results/sequences.fasta"
),
sequences= "results/sequences.fasta",
output:
results="results/nextclade_{segment}.tsv",
params:
Expand Down Expand Up @@ -261,7 +237,7 @@ if not ALIGN:
output:
results="results/minimizer.json",
params:
minimizer=config.get("minimizer_index")
minimizer=config.get("nextclade_sort").get("minimizer_index"),
shell:
"""
curl -L -o {output.results} {params.minimizer}
Expand Down Expand Up @@ -397,6 +373,37 @@ rule heuristic_group_segments:
fi
"""

if FILTER:
rule metadata_filter:
input:
metadata=(
"results/metadata_post_group.ndjson"
if SEGMENTED
else "results/metadata_post_prepare.ndjson"
),
sequences=(
"results/sequences_post_group.ndjson"
if SEGMENTED
else "results/sequences.ndjson"
),
script="scripts/metadata_filter.py",
config="results/config.yaml",
output:
sequences="results/sequences_filtered.ndjson",
metadata="results/metadata_filtered.ndjson",
params:
log_level=LOG_LEVEL,
shell:
"""
python {input.script} \
--input-seq {input.sequences} \
--output-seq {output.sequences} \
--input-metadata {input.metadata} \
--output-metadata {output.metadata} \
--log-level {params.log_level} \
--config-file {input.config} \
"""


rule get_previous_submissions:
"""Download metadata and sequence hashes of all previously submitted sequences
Expand All @@ -418,6 +425,9 @@ rule get_previous_submissions:
# By delaying the start of the script
script="scripts/call_loculus.py",
prepped_metadata=(
"results/metadata_filtered.ndjson"
if FILTER
else
"results/metadata_post_group.ndjson"
if SEGMENTED
else "results/metadata_post_prepare.ndjson"
Expand Down Expand Up @@ -445,6 +455,9 @@ rule compare_hashes:
config="results/config.yaml",
old_hashes="results/previous_submissions.json",
metadata=(
"results/metadata_filtered.ndjson"
if FILTER
else
"results/metadata_post_group.ndjson"
if SEGMENTED
else "results/metadata_post_prepare.ndjson"
Expand Down Expand Up @@ -481,11 +494,17 @@ rule prepare_files:
script="scripts/prepare_files.py",
config="results/config.yaml",
metadata=(
"results/metadata_filtered.ndjson"
if FILTER
else
"results/metadata_post_group.ndjson"
if SEGMENTED
else "results/metadata_post_prepare.ndjson"
),
sequences=(
"results/sequences_filtered.ndjson"
if FILTER
else
"results/sequences_post_group.ndjson"
if SEGMENTED
else "results/sequences.ndjson"
Expand Down
4 changes: 2 additions & 2 deletions ingest/scripts/call_loculus.py
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ def record_factory(*args, **kwargs):
logger.info(f"Config: {config}")

if mode in {"submit", "revise"}:
logging.info(f"Starting {mode}")
logger.info(f"Starting {mode}")
try:
group_id = get_or_create_group_and_return_group_id(
config, allow_creation=mode == "submit"
Expand All @@ -541,7 +541,7 @@ def record_factory(*args, **kwargs):
logger.error(f"Aborting {mode} due to error: {e}")
return
response = submit_or_revise(metadata, sequences, config, group_id, mode=mode)
logging.info(f"Completed {mode}")
logger.info(f"Completed {mode}")

if mode == "approve":
while True:
Expand Down
68 changes: 0 additions & 68 deletions ingest/scripts/filter_sequences.py

This file was deleted.

78 changes: 78 additions & 0 deletions ingest/scripts/metadata_filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
"""filters sequences by metadata fields"""

import logging
from dataclasses import dataclass

import click
import orjsonl
import yaml


@dataclass
class Config:
metadata_filter: dict[str, str]
nucleotide_sequences: list[str]
segmented: bool


logger = logging.getLogger(__name__)
logging.basicConfig(
encoding="utf-8",
level=logging.DEBUG,
format="%(asctime)s %(levelname)8s (%(filename)20s:%(lineno)4d) - %(message)s ",
datefmt="%H:%M:%S",
)


def stream_filter_to_fasta(input, output, keep, config: Config):
for record in orjsonl.stream(input):
if (not config.segmented and record["id"] in keep) or (
config.segmented and "".join(record["id"].split("_")[:-1]) in keep
):
orjsonl.append(output, record)


@click.command(help="Parse metadata and filter sequences based on config.filter values")
@click.option("--config-file", required=True, type=click.Path(exists=True))
@click.option("--input-seq", required=True, type=click.Path(exists=True))
@click.option("--output-seq", required=True, type=click.Path())
@click.option("--input-metadata", required=True, type=click.Path(exists=True))
@click.option("--output-metadata", required=True, type=click.Path())
@click.option(
"--log-level",
default="INFO",
type=click.Choice(["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]),
)
def main(
config_file: str,
input_seq: str,
output_seq: str,
input_metadata: str,
output_metadata: str,
log_level: str,
) -> None:
logger.setLevel(log_level)
with open(config_file, encoding="utf-8") as file:
full_config = yaml.safe_load(file)
relevant_config = {key: full_config.get(key, []) for key in Config.__annotations__}
config = Config(**relevant_config)

logger.info(f"Filtering metadata with {config.metadata_filter}")
submission_ids = set()
count = 0
for record in orjsonl.stream(input_metadata):
row = record["metadata"]
logger.debug(f"Filtering metadata: {row}")
accession = record["id"]
count += 1
if all(row[key] == value for key, value in config.metadata_filter.items()):
orjsonl.append(output_metadata, record)
submission_ids.add(accession)

logger.info(f"Filtered out {count - len(submission_ids)} entries")
logger.info(f"Filtered metadata has {len(submission_ids)} entries")
stream_filter_to_fasta(input=input_seq, output=output_seq, keep=submission_ids, config=config)


if __name__ == "__main__":
main()
15 changes: 11 additions & 4 deletions ingest/scripts/parse_nextclade_sort_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,22 @@


@dataclass
class Config:
class NextcladeSortParams:
minimizer_index: str
minimizer_parser: list[str]


@dataclass
class Config:
nextclade_sort: NextcladeSortParams
nucleotide_sequences: list[str]


def parse(field: str, index: int) -> str:
return field.split("_")[index] if len(field.split("_")) > index else ""


def parse_file(config: Config, sort_results: str, output_file: str):
def parse_file(config: NextcladeSortParams, sort_results: str, output_file: str):
df = pd.read_csv(sort_results, sep="\t", dtype={"index": "Int64"})

no_rows = df.shape[0]
Expand Down Expand Up @@ -71,12 +76,14 @@ def main(config_file: str, sort_results: str, output: str, log_level: str) -> No
with open(config_file, encoding="utf-8") as file:
full_config = yaml.safe_load(file)
relevant_config = {key: full_config.get(key, []) for key in Config.__annotations__}
relevant_config["nextclade_sort"] = NextcladeSortParams(**relevant_config["nextclade_sort"])
config = Config(**relevant_config)

logger.info(f"Config: {config}")
if "segment" not in config.minimizer_parser:
if "segment" not in config.nextclade_sort.minimizer_parser:
error_msg = "minimizer_parser must include 'segment'"
raise ValueError(error_msg)
parse_file(config, sort_results, output)
parse_file(config.nextclade_sort, sort_results, output)


if __name__ == "__main__":
Expand Down
Loading

0 comments on commit 6e34ffd

Please sign in to comment.