Skip to content

Commit

Permalink
Use biobear to read ZSTD-compressed .fasta files (#89)
Browse files Browse the repository at this point in the history
* Use biobear to read .zst-compressed .fasta files

Biobear has a built-in method that returns the contents
of a .fasta file an arrow batch reader object, in turn,
can be parsed into a polars dataframe so we can filter
the .fasta and write the filtered results as chunks
(instead of reading/filtering/writing line-by-line)

* Update dependences and add biobear

* Use the biobear package when filtering .fasta sequences compressed with ZSTD

Biobear has the ability to read .fasta files as batches that can be
slurped into a Polars Dataframe. This method provides significant performance
over Cladetime's prior method of using biopython to process a .fasta file
line by line.

Related issue:
#82

* Make the GitHub workflow checkout action more secure

https://yossarian.net/til/post/actions-checkout-can-leak-github-credentials/

* Fix up some type errors reported by mypy

* Add a test for missing clade assignments in line report

This test verifies that the final line report returned by assign_clades
(i.e., the clade assignments merged with the sequence metadata) contains
valid clade assignemnts).

The test currently fails due to a bug in the biobear feature (that fix will be
the next commit)

* Add description field to fasta records written by SeqIO

The biobear fasta reader does not bring in a description when processing
sequence records, which resulting in an "unknown" description when writing
record back out using SeqIO. That "unknown" string eventually found its way
into the "strain" field of the nextclade cli output, which resulted in null
values for "clade" after the nextclade output was joined to the metadata.

* Add sequence and assigned sequence counts to assign_clade metadata

This changeset also adds a test for clade assignments when not all
sequences in the input list get an assignment

* Remove warning about assigning large volumes of clades

We get this warning all the time when using Cladetime in variant-nowcast-hub
because we're always assigning more than 30 days worth of sequences. The 30
day trigger for the warning was always arbitrary, and it's not serving us well.

* Remove breakpoint 🙃

* Revert "Remove warning about assigning large volumes of clades"

This reverts commit c1f8ffc.

* Re-word the warning many sequence assignments

* Update src/cladetime/cladetime.py

Fix typo

Co-authored-by: Evan Ray <[email protected]>

---------

Co-authored-by: Evan Ray <[email protected]>
  • Loading branch information
bsweger and elray1 authored Jan 24, 2025
1 parent c89b304 commit f8bbd50
Show file tree
Hide file tree
Showing 10 changed files with 140 additions and 32 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ jobs:
steps:
- name: Checkout 🛎️
uses: actions/checkout@v4
with:
persist-credentials: false

- name: Set up Python 🐍
uses: actions/setup-python@v5
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ readme = "README.md"

dependencies = [
"awscli>=1.32.92",
"biobear",
"biopython",
"boto3",
"cloudpathlib",
Expand Down
6 changes: 5 additions & 1 deletion requirements/requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
# uv pip compile pyproject.toml --extra dev -o requirements/requirements-dev.txt
awscli==1.35.20
# via cladetime (pyproject.toml)
biobear==0.23.5
# via cladetime (pyproject.toml)
biopython==1.84
# via cladetime (pyproject.toml)
boto3==1.35.54
Expand Down Expand Up @@ -81,7 +83,9 @@ pluggy==1.5.0
polars==1.17.1
# via cladetime (pyproject.toml)
pyarrow==18.0.0
# via cladetime (pyproject.toml)
# via
# cladetime (pyproject.toml)
# biobear
pyasn1==0.6.1
# via rsa
pycparser==2.22
Expand Down
6 changes: 5 additions & 1 deletion requirements/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
# uv pip compile pyproject.toml -o requirements/requirements.txt
awscli==1.35.20
# via cladetime (pyproject.toml)
biobear==0.23.5
# via cladetime (pyproject.toml)
biopython==1.84
# via cladetime (pyproject.toml)
boto3==1.35.54
Expand Down Expand Up @@ -44,7 +46,9 @@ pandas==2.2.3
polars==1.17.1
# via cladetime (pyproject.toml)
pyarrow==18.0.0
# via cladetime (pyproject.toml)
# via
# cladetime (pyproject.toml)
# biobear
pyasn1==0.6.1
# via rsa
pygments==2.18.0
Expand Down
33 changes: 22 additions & 11 deletions src/cladetime/cladetime.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ def _get_config(self) -> Config:

return config

def assign_clades(self, sequence_metadata: pl.LazyFrame, output_file: str | None = None) -> Clade:
def assign_clades(self, sequence_metadata: pl.LazyFrame, output_file: Path | str | None = None) -> Clade:
"""Assign clades to a specified set of sequences.
For each sequence in a sequence file (.fasta), assign a Nextstrain
Expand Down Expand Up @@ -289,6 +289,7 @@ def assign_clades(self, sequence_metadata: pl.LazyFrame, output_file: str | None

# drop any clade-related columns from sequence_metadata (if any exists, it will be replaced
# by the results of the clade assignment)
logger.info("Removing current sequence assignments from metadata")
sequence_metadata = sequence_metadata.drop(
[
col
Expand Down Expand Up @@ -318,8 +319,10 @@ def assign_clades(self, sequence_metadata: pl.LazyFrame, output_file: str | None
# take a long time and require a lot of resources
if sequence_count > self._config.clade_assignment_warning_threshold:
msg = (
f"Sequence count is {sequence_count}: clade assignment will run longer than usual. "
"You may want to run clade assignments on smaller subsets of sequences."
f"About to assign clades to {sequence_count} sequences. \n"
"The assignment process is resource intensive. \n"
"Depending on the limitations of your machine, \n"
"you may want to use a smaller subset of sequences."
)
warnings.warn(
msg,
Expand All @@ -331,9 +334,9 @@ def assign_clades(self, sequence_metadata: pl.LazyFrame, output_file: str | None
with tempfile.TemporaryDirectory() as tmpdir:
filtered_sequences = sequence.filter(ids, self.url_sequence, Path(tmpdir))
nextclade_dataset = _get_nextclade_dataset(
tree.ncov_metadata.get("nextclade_version_num"),
tree.ncov_metadata.get("nextclade_dataset_name").lower(),
tree.ncov_metadata.get("nextclade_dataset_version"),
tree.ncov_metadata.get("nextclade_version_num", ""),
tree.ncov_metadata.get("nextclade_dataset_name", "").lower(),
tree.ncov_metadata.get("nextclade_dataset_version", ""),
Path(tmpdir),
)
logger.info(
Expand All @@ -342,26 +345,34 @@ def assign_clades(self, sequence_metadata: pl.LazyFrame, output_file: str | None
nextclade_dataset_version=tree.ncov_metadata.get("nextclade_dataset_version"),
)
assignments = _get_clade_assignments(
tree.ncov_metadata.get("nextclade_version_num"), filtered_sequences, nextclade_dataset, output_file
tree.ncov_metadata.get("nextclade_version_num", ""), filtered_sequences, nextclade_dataset, output_file
)
assigned_clades_df = pl.read_csv(assignments, separator="\t", infer_schema_length=100000)
# get a count of non-null clade_nextstrain values
# (this is the number of sequences that were assigned to a clade)
assigned_sequence_count = assigned_clades_df.select(pl.count("clade_nextstrain")).to_series().to_list()[0]

logger.info(
"Clade assignments done",
"Nextclade assignments done",
sequences_to_assign=sequence_count,
sequences_assigned=assigned_sequence_count,
assignment_file=assignments,
nextclade_dataset=tree.ncov_metadata.get("nextclade_dataset_version"),
)

assigned_clades = pl.read_csv(assignments, separator="\t", infer_schema_length=100000)

# join the assigned clades with the original sequence metadata, create a summarized LazyFrame
# of clade counts by location, date, and host, and return both (along with metadata) in a
# Clade object
assigned_clades = sequence_metadata.join(
assigned_clades.lazy(), left_on="strain", right_on="seqName", how="left"
assigned_clades_df.lazy(), left_on="strain", right_on="seqName", how="left"
)
summarized_clades = sequence.summarize_clades(
assigned_clades, group_by=["location", "date", "host", "clade_nextstrain", "country"]
)

metadata = {
"sequences_to_assign": sequence_count,
"sequences_assigned": assigned_sequence_count,
"sequence_as_of": self.sequence_as_of,
"tree_as_of": self.tree_as_of,
"nextclade_dataset_version": tree.ncov_metadata.get("nextclade_dataset_version"),
Expand Down
43 changes: 31 additions & 12 deletions src/cladetime/sequence.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""Functions for retrieving and parsing SARS-CoV-2 virus genome data."""

import io
import lzma
import os
import re
Expand All @@ -9,13 +8,15 @@
from pathlib import Path
from urllib.parse import urlparse

import biobear as bb
import polars as pl
import requests
import structlog
import us
import zstandard as zstd
from Bio import SeqIO
from Bio.Seq import Seq
from Bio.SeqIO import FastaIO
from Bio.SeqRecord import SeqRecord
from requests import Session

from cladetime.types import StateFormat
Expand Down Expand Up @@ -435,7 +436,7 @@ def filter(sequence_ids: set, url_sequence: str, output_path: Path) -> Path:
------
ValueError
If url_sequence points to a file that doesn't have a
.zst or .xz extension.
.zst or .xz extension or if sequence_ids is empty
"""
session = _get_session()

Expand All @@ -447,6 +448,10 @@ def filter(sequence_ids: set, url_sequence: str, output_path: Path) -> Path:
raise ValueError(f"Unsupported compression format: {file_extension}")
filtered_sequence_file = output_path / "sequences_filtered.fasta"

# if there are no sequence_ids to filter on, return an empty file
if len(sequence_ids) == 0:
raise ValueError("Set of sequence ids to filter on is empty")

logger.info("Downloading sequence file", url=url_sequence)
sequence_file = _download_from_url(session, url_sequence, output_path)
logger.info("Sequence file saved", path=sequence_file)
Expand All @@ -465,15 +470,29 @@ def filter(sequence_ids: set, url_sequence: str, output_path: Path) -> Path:
sequence_match_count += 1
SeqIO.write(record, fasta_output, "fasta")
else:
with open(sequence_file, "rb") as handle:
dctx = zstd.ZstdDecompressor()
with dctx.stream_reader(handle) as reader:
text_stream = io.TextIOWrapper(reader, encoding="utf-8")
for record in FastaIO.FastaIterator(text_stream):
sequence_count += 1
if record.id in sequence_ids:
sequence_match_count += 1
SeqIO.write(record, fasta_output, "fasta")
# for .zst files, use biobear + polars to read the .fasta file
# and filter it in batches (instead of reading the the fasta file
# line by line)
bb_session = bb.new_session()

# stream .fasta file in batches (biobear reader -> pyarrow RecordBatchReader)
sequence_batches = bb_session.read_fasta_file(str(sequence_file)).to_arrow_record_batch_reader()

for batch in sequence_batches:
# convert each batch of biobear sequence records to a polars DataFrame
batch_df = pl.DataFrame(batch)
sequence_count += len(batch_df)
# create two lists: one for sequence ids and one for sequences
id_list = batch_df.get_column("id").to_list()
sequence_list = batch_df.get_column("sequence").to_list()
# zip the above lists into a final list of biopython SeqRecord
# objects that match one of the sequence_ids passed to this function
seq_match_list = [
SeqRecord(Seq(sequence), id=id, description=id)
for id, sequence in zip(id_list, sequence_list)
if id in sequence_ids
]
sequence_match_count += SeqIO.write(seq_match_list, fasta_output, "fasta")

logger.info(
"Filtered sequence file saved",
Expand Down
12 changes: 11 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,17 @@ def moto_file_path() -> Path:

@pytest.fixture(scope="function")
def demo_mode(monkeypatch):
"Set demo mode to True for tests using the Nextstrain 100K sequence files."
"""
Set demo mode to True for testing.
This fixture activates CladeTime's demo mode, which uses the Nextstrain
100k dataset instead of the entire universe of SARS-CoV-2 sequences.
Use with caution: the 100K dataset is compressed using LSTD, which
follows a different code path than the full dataset normally used by
Cladetime (which is compressed using ZSTD and is read in batches
using biobear).
"""
demo_mode = "true"
monkeypatch.setenv("CLADETIME_DEMO", demo_mode)
yield demo_mode
Expand Down
2 changes: 1 addition & 1 deletion tests/data/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ This directory contains test files used by CladeTime's test suite.

* `moto_fixture` directory contains files used when recreating Nextstrain/Nextclade data in the moto mocked S3 bucket
* `test_metadata.tsv` was used to test `get_clade_list` before that functionality moved to variant-nowcast-hub
* `metadata.tsv.xz` and `metadata.tsv.xz` are used to test setting CladeTime's sequence_metadata property.
* `metadata.tsv.xz` and `metadata.tsv.zst` are used to test setting CladeTime's sequence_metadata property.
* `test_sequences.fasta` isn't used by tests directly, but is the human-readable version of test_sequences.fasta.[xz|zst] below
* `test_sequences.fasta.xz` and `test_sequences.fasta.zst` are used to test the sequence filter function
* `test_sequences.fasta`, `test_sequences_fake.fasta`, and `test_nexclade_dataset.zip` are used in Nextclade integration tests
Expand Down
59 changes: 59 additions & 0 deletions tests/integration/test_cladetime_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,65 @@ def test_assign_old_tree(test_file_path, tmp_path, test_sequences):
assert old_assigned_clades.meta.get("nextclade_version_num") == "3.8.2"
assert old_assigned_clades.meta.get("assignment_as_of") == "2024-11-01 00:00"

@pytest.mark.skipif(not docker_enabled, reason="Docker is not installed")
@pytest.mark.parametrize("sequence_file", ["test_sequences.fasta.xz", "test_sequences.fasta.zst"])
def test_assign_clade_detail(test_file_path, tmpdir, sequence_file):
"""Test the final clade assignment linefile."""
test_sequence_file = test_file_path / sequence_file

# The list below represents sequences in the test fasta file AND test metadata file
expected_sequence_assignments = {
"USA/WV064580/2020": "20G"
}

mock_download = MagicMock(return_value=test_sequence_file, name="_download_from_url_mock")
with patch("cladetime.sequence._download_from_url", mock_download):
ct = CladeTime()
ct.url_sequence = test_sequence_file.as_uri() # used to determine extension when reading .fasta file
test_metadata = pl.read_csv(test_file_path / "metadata.tsv.zst", separator="\t", infer_schema_length=100000) \
.filter(pl.col("strain").is_in(expected_sequence_assignments.keys())) \
.lazy()

clades = ct.assign_clades(test_metadata, output_file=tmpdir / "assignments.tsv")
detailed_df = clades.detail.collect()

# assign_clades detail output should have the same number of records as the input metadata
assert len(detailed_df) == len(expected_sequence_assignments)

# check actual clade assignments against expected assignments
strain_clade_dict = detailed_df.select("strain", "clade_nextstrain").to_dicts()
for item in strain_clade_dict:
assert expected_sequence_assignments[item["strain"]] == item["clade_nextstrain"]

# check metadata
assert clades.meta.get("sequences_to_assign") == 1
assert clades.meta.get("sequences_assigned") == 1


@pytest.mark.skipif(not docker_enabled, reason="Docker is not installed")
@pytest.mark.parametrize("sequence_file", ["test_sequences.fasta.xz", "test_sequences.fasta.zst"])
def test_assign_clade_detail_missing_assignments(test_file_path, tmpdir, sequence_file):
"""Test the final clade assignment linefile when some sequences are not assigned clades."""
test_sequence_file = test_file_path / sequence_file

mock_download = MagicMock(return_value=test_sequence_file, name="_download_from_url_mock")
with patch("cladetime.sequence._download_from_url", mock_download):
ct = CladeTime()
ct.url_sequence = test_sequence_file.as_uri() # used to determine extension when reading .fasta file
test_metadata = pl.read_csv(test_file_path / "metadata.tsv.zst", separator="\t", infer_schema_length=100000).lazy()
test_metadata_count = len(test_metadata.collect())

clades = ct.assign_clades(test_metadata, output_file=tmpdir / "assignments.tsv")
detailed_df = clades.detail.collect()

# assign_clades detail output should have the same number of records as the input metadata
assert len(detailed_df) == test_metadata_count

# check metadata
# only one sequence in the test metadata is in the test fasta
assert clades.meta.get("sequences_to_assign") == test_metadata_count
assert clades.meta.get("sequences_assigned") == 1


@pytest.mark.skipif(not docker_enabled, reason="Docker is not installed")
@pytest.mark.parametrize(
Expand Down
8 changes: 3 additions & 5 deletions tests/unit/test_sequence.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ def test_filter(test_file_path, tmpdir, sequence_file):
actual_headers = []
with open(filtered_sequence_file, "r") as fasta_test:
for record in SeqIO.parse(fasta_test, "fasta"):
actual_headers.append(record.description)
actual_headers.append(record.id)
assert set(actual_headers) == test_sequence_set


Expand All @@ -271,10 +271,8 @@ def test_filter_no_sequences(test_file_path, tmpdir, sequence_file):
test_sequence_set = {}
mock_download = MagicMock(return_value=test_sequence_file, name="_download_from_url_mock")
with patch("cladetime.sequence._download_from_url", mock_download):
filtered_no_sequence = sequence.filter(test_sequence_set, f"http://thisismocked.com/{sequence_file}", tmpdir)

contents = filtered_no_sequence.read_text(encoding=None)
assert len(contents) == 0
with pytest.raises(ValueError):
sequence.filter(test_sequence_set, f"http://thisismocked.com/{sequence_file}", tmpdir)


def test_filter_empty_fasta_xz(tmpdir):
Expand Down

0 comments on commit f8bbd50

Please sign in to comment.