Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prepare assign_clades CLI to use functionality in CladeTime and Tree classes #42

Merged
merged 2 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 83 additions & 95 deletions src/cladetime/assign_clades.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,141 +3,114 @@
import datetime
import os
import subprocess
import tempfile
from pathlib import Path

import polars as pl
import rich_click as click
import structlog

from cladetime import CladeTime
from cladetime.util.config import Config
from cladetime.util.reference import get_nextclade_dataset
from cladetime.util.sequence import (
_unzip_sequence_package,
get_covid_genome_data,
parse_sequence_assignments,
)
from cladetime.util.sequence import _download_from_url, filter_covid_genome_metadata
from cladetime.util.session import _get_session
from cladetime.util.timing import time_function

logger = structlog.get_logger()
session = _get_session()


def setup_config(base_data_dir: str, sequence_released_date: datetime, tree_as_of_date: datetime) -> Config:
def _setup_config(base_data_dir: str) -> Config:
"""Return an initialized Config class for the pipeline run."""

config = Config(
data_path_root=base_data_dir,
sequence_released_date=sequence_released_date,
tree_as_of_date=tree_as_of_date,
)

return config


def get_sequences(config: Config):
"""Download SARS-CoV-2 sequences from Genbank."""
def _save_sequences(ct: CladeTime, tmpdir: Path) -> Path:
"""Download and save SAR-CoV-2 sequences from Nextstrain."""

logger.info("Downloading SARS-CoV-2 sequences from Nextstrain", url=ct.url_sequence)
full_sequence_file = _download_from_url(session=session, url=ct.url_sequence, data_path=Path(tmpdir))
return full_sequence_file


def _save_tree(tree: dict, tmpdir: Path) -> Path:
"""Save a reference tree to disk and return the path."""

sequence_package = config.data_path / config.ncbi_package_name
return Path.home()

# API requires a datetime string for the released_since parameter
sequence_released_date = datetime.datetime.strptime(config.sequence_released_since_date, "%Y-%m-%d")
sequence_released_datetime = sequence_released_date.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z"

get_covid_genome_data(sequence_released_datetime, base_url=config.ncbi_base_url, filename=sequence_package)
_unzip_sequence_package(sequence_package, config.data_path)
def get_sequence_metadata(metadata: pl.DataFrame, sequence_collection_date: datetime.date) -> pl.DataFrame:
"""Download SARS-CoV-2 sequence metadata from Nextstrain."""

logger.info("NCBI SARS-COV-2 genome package downloaded and unzipped", package_location=sequence_package)
# FIXME: the columns we want from the Nextrain metadata are those on on their "standard metata" list
# https://docs.nextstrain.org/projects/ncov/en/latest/reference/metadata-fields.html
cols = [
"clade_nextstrain",
"country",
"date",
"division",
"genbank_accession",
"genbank_accession_rev",
"host",
]

# clean and filter metadata (same process used to generate the weekly clade list)
filtered_metadata = filter_covid_genome_metadata(metadata, cols)

def get_sequence_metadata(config: Config):
"""Generate tabular representation of the downloaded genbank sequences."""
# add filters based on user input
filtered_metadata = filtered_metadata.filter(pl.col("date") >= sequence_collection_date)

fields = "accession,sourcedb,sra-accs,isolate-lineage,geo-region,geo-location,isolate-collection-date,release-date,update-date,virus-pangolin,length,host-name,isolate-lineage-source,biosample-acc,completeness,lab-host,submitter-names,submitter-affiliation,submitter-country"
return filtered_metadata

with open(config.ncbi_sequence_metadata_file, "w") as f:
subprocess.run(
[
"dataformat",
"tsv",
"virus-genome",
"--inputfile",
f"{config.data_path}/ncbi_dataset/data/data_report.jsonl",
"--fields",
f"{fields}",
],
stdout=f,
)

logger.info("extracted sequence metadata", metadata_file=config.ncbi_sequence_metadata_file)
def filter_sequences(filtered_metadata: pl.LazyFrame, full_sequence_file: Path, tmpdir: Path) -> Path:
"""Create input sequence file for clade assignment."""

# This is where we are going to use biopython to filter the sequence file to a smaller version
# that only includes sequences in the filtered metadata.
# Current thinking is that the sequence filtering itself will be a CladeTime method called here
return Path.home()

def assign_clades(config: Config, nextclade_dataset_path: str):

def assign_clades(tree_path: Path, sequence_path: Path):
"""Assign downloaded genbank sequences to a clade."""

# FIXME: restore the nextclade run invocation once we've refactored
# the code for creating its inputs.
# The code below that actually invokes the nextclade will likely move to a CladeTime method
subprocess.run(
[
"nextclade",
"run",
f"{config.ncbi_sequence_file}",
"--input-dataset",
nextclade_dataset_path,
"--output-csv",
f"{config.assignment_no_metadata_file}",
"--version",
]
)

logger.info("Assigned sequences to clades via Nextclade CLI", output_file=config.assignment_no_metadata_file)
logger.info("Assigned sequences to clades via Nextclade CLI", output_file="some path stuff")
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A temporary placeholder so the logger doesn't error when referencing a config attribute that no longer exists.



def merge_metadata(config: Config) -> pl.DataFrame:
def merge_metadata() -> pl.DataFrame:
"""Merge sequence metadata with clade assignments."""

df_metadata = pl.read_csv(config.ncbi_sequence_metadata_file, separator="\t")

# we're expecting one row per sequence id (aka Accession)
# TODO: how do we want to handle the case where the metadata file has
# duplicate Accession values?
assert df_metadata["Accession"].n_unique() == df_metadata.shape[0]

df_assignments = pl.read_csv(config.assignment_no_metadata_file, separator=";", infer_schema_length=5000)
df_assignments = parse_sequence_assignments(df_assignments)

joined = df_metadata.join(df_assignments, left_on="Accession", right_on="seq", how="left")
joined = joined.with_columns(
sequence_released_since=pl.lit(config.sequence_released_since_date),
reference_tree_date=pl.lit(config.reference_tree_date),
sequence_retrieved_datetime=pl.lit(config.run_time),
)
num_sequences = joined.shape[0]

# ?? what is the difference between "clade" and "clade_nextstrain" ??
missing_clade_assignments = joined.filter(pl.col("clade_nextstrain").is_null())
num_missing_assignments = missing_clade_assignments.shape[0]

if num_missing_assignments == 0:
logger.info("Sequence metadata merged with clade assignments", num_sequences=num_sequences)
else:
logger.warning(
"Some sequences are missing clade assignments",
num_sequences=num_sequences,
missing_clade_assignments=missing_clade_assignments,
)

# TBD: include only the columns we need
# https://github.com/reichlab/cladetime/issues/11
if len(config.assignment_file_columns) > 0:
joined = joined.select(config.assignment_file_columns)

return joined
# FIXME: this will all be different now
# Seems like another candidate for CladeTime, to be invoked here
return pl.DataFrame()


@click.command()
@click.option(
"--sequence-released-since-date",
"--sequence-collection-date",
type=click.DateTime(formats=["%Y-%m-%d"]),
prompt="Include SARS CoV-2 genome data released on or after this date (YYYY-MM-DD)",
required=True,
help="Limit the downloaded SARS CoV-2 package to sequences released on or after this date (YYYY-MM-DD format)",
)
@click.option(
"--reference-tree-date",
"--tree-as-of",
type=click.DateTime(formats=["%Y-%m-%d"]),
prompt="The reference tree as of date (YYYY-MM-DD)",
required=True,
Expand All @@ -151,25 +124,40 @@ def merge_metadata(config: Config) -> pl.DataFrame:
default=None,
help="Directory where the clade assignment file will be saved. Default: [home dir]/covid_variant/",
)
def main(sequence_released_since_date: datetime.date, reference_tree_date: datetime.date, data_dir: str | None):
# TODO: do we need additional date validations (e.g., no future dates)?
@time_function
def main(sequence_collection_date: datetime.date, tree_as_of: datetime.date, data_dir: str | None):
# TODO: update CLI options as discussed, including an option to save the linefile

config = setup_config(data_dir, sequence_released_since_date, reference_tree_date)
logger.info("Starting pipeline", reference_tree_date=reference_tree_date, run_time=config.run_time)
config = _setup_config(data_dir)
logger.info("Starting pipeline", reference_tree_date=tree_as_of, run_time=config.run_time)

os.makedirs(config.data_path, exist_ok=True)
nextclade_dataset_path = get_nextclade_dataset(config.reference_tree_date, config.data_path)
get_sequences(config)
get_sequence_metadata(config)
assign_clades(config, nextclade_dataset_path)
merged_data = merge_metadata(config)
merged_data.write_csv(config.assignment_file)

with tempfile.TemporaryDirectory() as tmpdir:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unlike prior versions of assign_clade, this new thinking assumes that we can save the intermediate files required for clade assignment in a temporary directory and only worry about saving the final output(s) to disk.

### The lines in this context manager mock out a newer, non-NCBI-based
### approach to a cladetime CLI that allows custom clade assignments.
### Some of the steps are implemented, since the supporting code already
### exists in cladetime. Some of the steps are placeholders to be completed
### in future pull requests.

ct = CladeTime()
filtered_metadata: pl.DataFrame = get_sequence_metadata(ct.sequence_metadata, sequence_collection_date)
filtered_metadata.sink_parquet(Path(tmpdir) / "filtered_metadata.parquet", maintain_order=False)
full_sequence_file = _save_sequences(ct, tmpdir)
logger.info("Temp sequence file saved", sequence_file=full_sequence_file)
filtered_sequence_file = filter_sequences(filtered_metadata, full_sequence_file, Path(tmpdir))
# once tree PR is merged: ref_tree = Tree(ct, tree_as_of).tree
ref_tree = {}
tree_file = _save_tree(ref_tree, Path(tmpdir))
assign_clades(tree_path=tree_file, sequence_path=filtered_sequence_file)
merged_metadata = merge_metadata()
# counts = get_clade_counts(merged_metadata)
return merged_metadata

logger.info(
"Sequence clade assignments are ready",
assignment_file=config.assignment_file,
run_time=config.run_time,
reference_tree_date=config.reference_tree_date,
reference_tree_date=tree_as_of,
)


Expand Down
4 changes: 1 addition & 3 deletions src/cladetime/cladetime.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,7 @@ def __str__(self):

def _get_config(self) -> Config:
"""Return a config object."""
# dates passed to Config don't actually do anything in this case
# (config needs a refactor)
config = Config(datetime.now(), datetime.now())
config = Config()

return config

Expand Down
118 changes: 0 additions & 118 deletions src/cladetime/get_clade_list.py

This file was deleted.

Loading