Skip to content

Commit

Permalink
Merge pull request #19 from bento-platform/refact/bento-lib-12
Browse files Browse the repository at this point in the history
refact: use file streaming and DRS resolver from bento_lib v12
  • Loading branch information
davidlougheed authored Aug 22, 2024
2 parents 6ec586c + edffcef commit a162435
Show file tree
Hide file tree
Showing 12 changed files with 486 additions and 362 deletions.
2 changes: 2 additions & 0 deletions bento_reference_service/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ class Config(BentoFastAPIBaseConfig):

feature_response_record_limit: int = 1000

drs_cache_ttl: float = 900.0


@lru_cache()
def get_config():
Expand Down
19 changes: 19 additions & 0 deletions bento_reference_service/drs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from bento_lib.drs.resolver import DrsResolver
from fastapi import Depends
from functools import lru_cache
from typing import Annotated

from .config import ConfigDependency

__all__ = [
"get_drs_resolver",
"DrsResolverDependency",
]


@lru_cache
def get_drs_resolver(config: ConfigDependency):
return DrsResolver(cache_ttl=config.drs_cache_ttl)


DrsResolverDependency = Annotated[DrsResolver, Depends(get_drs_resolver)]
23 changes: 16 additions & 7 deletions bento_reference_service/features.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pysam
import traceback

from bento_lib.drs.resolver import DrsResolver
from datetime import datetime
from pathlib import Path
from typing import Generator
Expand Down Expand Up @@ -266,10 +267,14 @@ async def ingest_features(
return n_ingested


async def download_uri_into_temporary_file(uri: str, tmp: Path, config: Config, logger: logging.Logger):
async def download_uri_into_temporary_file(
uri: str, tmp: Path, config: Config, drs_resolver: DrsResolver, logger: logging.Logger
):
logger.debug(f"Saving data from URI {uri} into temporary file {tmp}")

_, _, stream_iter = await stream_from_uri(config, logger, uri, range_header=None, impose_response_limit=False)
_, _, stream_iter = await stream_from_uri(
config, drs_resolver, logger, uri, range_header=None, impose_response_limit=False
)

# copy .gff3.gz to temporary directory for ingestion
async with aiofiles.open(tmp, "wb") as fh:
Expand All @@ -279,7 +284,9 @@ async def download_uri_into_temporary_file(uri: str, tmp: Path, config: Config,
logger.debug(f"Wrote downloaded data to {tmp}; size={tmp.stat().st_size}")


async def download_feature_files(genome: m.GenomeWithURIs, config: Config, logger: logging.Logger):
async def download_feature_files(
genome: m.GenomeWithURIs, config: Config, drs_resolver: DrsResolver, logger: logging.Logger
):
tmp_file_id = str(uuid4())

if genome.gff3_gz is None:
Expand All @@ -288,15 +295,17 @@ async def download_feature_files(genome: m.GenomeWithURIs, config: Config, logge
raise AnnotationIngestError(f"Genome {genome.id} is missing a GFF3 Tabix index")

fn = config.file_ingest_tmp_dir / f"{tmp_file_id}.gff3.gz"
await download_uri_into_temporary_file(genome.gff3_gz, fn, config, logger)
await download_uri_into_temporary_file(genome.gff3_gz, fn, config, drs_resolver, logger)

fn_tbi = config.file_ingest_tmp_dir / f"{tmp_file_id}.gff3.gz.tbi"
await download_uri_into_temporary_file(genome.gff3_gz_tbi, fn_tbi, config, logger)
await download_uri_into_temporary_file(genome.gff3_gz_tbi, fn_tbi, config, drs_resolver, logger)

return fn, fn_tbi


async def ingest_features_task(genome_id: str, task_id: int, config: Config, db: Database, logger: logging.Logger):
async def ingest_features_task(
genome_id: str, task_id: int, config: Config, db: Database, drs_resolver: DrsResolver, logger: logging.Logger
):
# the ingest_features task moves from queued -> running -> (success | error)

await db.update_task_status(task_id, "running")
Expand All @@ -311,7 +320,7 @@ async def ingest_features_task(genome_id: str, task_id: int, config: Config, db:
try:
# download GFF3 + GFF3 TBI file for this genome
logger.info(f"Downloading gene feature files for genome {genome_id}")
gff3_gz_path, gff3_gz_tbi_path = await download_feature_files(genome, config, logger)
gff3_gz_path, gff3_gz_tbi_path = await download_feature_files(genome, config, drs_resolver, logger)
except Exception as e:
err = (
f"task {task_id}: encountered exception while downloading feature files: {e}; traceback: "
Expand Down
33 changes: 29 additions & 4 deletions bento_reference_service/routers/genomes.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from ..authz import authz_middleware
from ..config import ConfigDependency
from ..db import Database, DatabaseDependency
from ..drs import DrsResolverDependency
from ..logger import LoggerDependency
from ..streaming import generate_uri_streaming_response
from .constants import DEPENDENCY_DELETE_REFERENCE_MATERIAL, DEPENDENCY_INGEST_REFERENCE_MATERIAL
Expand Down Expand Up @@ -72,7 +73,12 @@ async def genomes_create(

@genome_router.get("/{genome_id}.fa", dependencies=[authz_middleware.dep_public_endpoint()])
async def genomes_detail_fasta(
genome_id: str, config: ConfigDependency, db: DatabaseDependency, logger: LoggerDependency, request: Request
genome_id: str,
config: ConfigDependency,
db: DatabaseDependency,
drs_resolver: DrsResolverDependency,
logger: LoggerDependency,
request: Request,
) -> StreamingResponse:
# need internal FASTA URI:
genome: m.Genome = await get_genome_or_raise_404(db, genome_id, external_resource_uris=False)
Expand All @@ -82,6 +88,7 @@ async def genomes_detail_fasta(
range_header: str | None = request.headers.get("Range", None)
return await generate_uri_streaming_response(
config,
drs_resolver,
logger,
genome.fasta,
range_header,
Expand All @@ -93,7 +100,12 @@ async def genomes_detail_fasta(

@genome_router.get("/{genome_id}.fa.fai", dependencies=[authz_middleware.dep_public_endpoint()])
async def genomes_detail_fasta_index(
genome_id: str, config: ConfigDependency, db: DatabaseDependency, logger: LoggerDependency, request: Request
genome_id: str,
config: ConfigDependency,
db: DatabaseDependency,
drs_resolver: DrsResolverDependency,
logger: LoggerDependency,
request: Request,
) -> StreamingResponse:
# need internal FAI URI:
genome: m.Genome = await get_genome_or_raise_404(db, genome_id, external_resource_uris=False)
Expand All @@ -102,6 +114,7 @@ async def genomes_detail_fasta_index(
range_header: str | None = request.headers.get("Range", None)
return await generate_uri_streaming_response(
config,
drs_resolver,
logger,
genome.fai,
range_header,
Expand Down Expand Up @@ -233,7 +246,12 @@ async def genomes_detail_igv_js_features(

@genome_router.get("/{genome_id}/features.gff3.gz", dependencies=[authz_middleware.dep_public_endpoint()])
async def genomes_detail_features_gff3(
config: ConfigDependency, db: DatabaseDependency, logger: LoggerDependency, request: Request, genome_id: str
config: ConfigDependency,
db: DatabaseDependency,
drs_resolver: DrsResolverDependency,
logger: LoggerDependency,
request: Request,
genome_id: str,
):
# need internal GFF3.gz URI:
genome: m.Genome = await get_genome_or_raise_404(db, genome_id=genome_id, external_resource_uris=False)
Expand All @@ -248,6 +266,7 @@ async def genomes_detail_features_gff3(
range_header: str | None = request.headers.get("Range", None)
return await generate_uri_streaming_response(
config,
drs_resolver,
logger,
genome.gff3_gz,
range_header,
Expand All @@ -259,7 +278,12 @@ async def genomes_detail_features_gff3(

@genome_router.get("/{genome_id}/features.gff3.gz.tbi", dependencies=[authz_middleware.dep_public_endpoint()])
async def genomes_detail_gene_features_gff3_index(
config: ConfigDependency, db: DatabaseDependency, logger: LoggerDependency, request: Request, genome_id: str
config: ConfigDependency,
db: DatabaseDependency,
drs_resolver: DrsResolverDependency,
logger: LoggerDependency,
request: Request,
genome_id: str,
):
# need internal GFF3.gz URI:
genome: m.Genome = await get_genome_or_raise_404(db, genome_id=genome_id, external_resource_uris=False)
Expand All @@ -275,6 +299,7 @@ async def genomes_detail_gene_features_gff3_index(
range_header: str | None = request.headers.get("Range", None)
return await generate_uri_streaming_response(
config,
drs_resolver,
logger,
genome.gff3_gz_tbi,
range_header,
Expand Down
8 changes: 6 additions & 2 deletions bento_reference_service/routers/refget.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from ..authz import authz_middleware
from ..config import ConfigDependency
from ..db import DatabaseDependency
from ..drs import DrsResolverDependency
from ..fai import parse_fai
from ..logger import LoggerDependency
from ..models import Alias
Expand Down Expand Up @@ -76,6 +77,7 @@ async def refget_service_info(
@refget_router.get("/{sequence_checksum}", dependencies=[authz_middleware.dep_public_endpoint()])
async def refget_sequence(
config: ConfigDependency,
drs_resolver: DrsResolverDependency,
logger: LoggerDependency,
db: DatabaseDependency,
request: Request,
Expand Down Expand Up @@ -113,7 +115,9 @@ async def refget_sequence(

# Fetch FAI so we can index into FASTA, properly translating the range header for the contig along the way.
with io.BytesIO() as fb:
_, _, stream = await s.stream_from_uri(config, logger, genome.fai, None, impose_response_limit=False)
_, _, stream = await s.stream_from_uri(
config, drs_resolver, logger, genome.fai, None, impose_response_limit=False
)
async for chunk in stream:
fb.write(chunk)
fb.seek(0)
Expand Down Expand Up @@ -189,7 +193,7 @@ async def refget_sequence(
fasta_range_header = f"bytes={fasta_start_byte}-{fasta_end_byte}"

_, _, fasta_stream = await s.stream_from_uri(
config, logger, genome.fasta, fasta_range_header, impose_response_limit=True
config, drs_resolver, logger, genome.fasta, fasta_range_header, impose_response_limit=True
)

async def _format_response():
Expand Down
4 changes: 3 additions & 1 deletion bento_reference_service/routers/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from ..config import ConfigDependency
from ..db import DatabaseDependency
from ..drs import DrsResolverDependency
from ..features import ingest_features_task
from ..logger import LoggerDependency
from ..models import TaskParams, Task
Expand All @@ -24,6 +25,7 @@ async def tasks_create(
background_tasks: BackgroundTasks,
config: ConfigDependency,
db: DatabaseDependency,
drs_resolver: DrsResolverDependency,
logger: LoggerDependency,
) -> Task:
genome_id = task.genome_id
Expand All @@ -41,7 +43,7 @@ async def tasks_create(

# currently, ingest_features is the only task type, so we don't need an if-statement to decide which task to
# dispatch.
background_tasks.add_task(ingest_features_task, genome_id, task_id, config, db, logger)
background_tasks.add_task(ingest_features_task, genome_id, task_id, config, db, drs_resolver, logger)

return task

Expand Down
Loading

0 comments on commit a162435

Please sign in to comment.