Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed improvements #85

Merged
merged 21 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bedboss/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.5.0"
__version__ = "0.5.1"
45 changes: 44 additions & 1 deletion bedboss/bbuploader/cli.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import typer

from bedboss._version import __version__

app_bbuploader = typer.Typer(
Expand Down Expand Up @@ -33,17 +34,33 @@ def upload_all(
None,
help="Reference genome [Default: None] (e.g. hg38) - if None, all genomes will be processed",
),
preload: bool = typer.Option(
True, help="Download bedfile before caching it. [Default: True]"
),
create_bedset: bool = typer.Option(
True, help="Create bedset from bed files. [Default: True]"
),
rerun: bool = typer.Option(True, help="Re-run all the samples. [Default: False]"),
overwrite: bool = typer.Option(
False, help="Overwrite existing bedfiles. [Default: False]"
),
overwrite_bedset: bool = typer.Option(
True, help="Overwrite existing bedset. [Default: False]"
),
rerun: bool = typer.Option(False, help="Re-run all the samples. [Default: False]"),
run_skipped: bool = typer.Option(
True, help="Run skipped projects. [Default: False]"
),
run_failed: bool = typer.Option(True, help="Run failed projects. [Default: False]"),
standardize_pep: bool = typer.Option(
False, help="Standardize pep with BEDMESS. [Default: False]"
),
use_skipper: bool = typer.Option(
True,
help="Use skipper to skip projects if they were processed locally [Default: False]",
),
reinit_skipper: bool = typer.Option(
False, help="Reinitialize skipper. [Default: False]"
),
):
from .main import upload_all as upload_all_function

Expand All @@ -57,10 +74,15 @@ def upload_all(
download_limit=download_limit,
genome=genome,
create_bedset=create_bedset,
preload=preload,
rerun=rerun,
run_skipped=run_skipped,
run_failed=run_failed,
standardize_pep=standardize_pep,
use_skipper=use_skipper,
reinit_skipper=reinit_skipper,
overwrite=overwrite,
overwrite_bedset=overwrite_bedset,
)


Expand All @@ -78,14 +100,30 @@ def upload_gse(
None,
help=" reference genome to upload to database. If None, all genomes will be processed",
),
preload: bool = typer.Option(
True, help="Download bedfile before caching it. [Default: True]"
),
rerun: bool = typer.Option(True, help="Re-run all the samples. [Default: False]"),
run_skipped: bool = typer.Option(
True, help="Run skipped projects. [Default: False]"
),
run_failed: bool = typer.Option(True, help="Run failed projects. [Default: False]"),
overwrite: bool = typer.Option(
False, help="Overwrite existing bedfiles. [Default: False]"
),
overwrite_bedset: bool = typer.Option(
True, help="Overwrite existing bedset. [Default: False]"
),
standardize_pep: bool = typer.Option(
False, help="Standardize pep with BEDMESS. [Default: False]"
),
use_skipper: bool = typer.Option(
True,
help="Use local skipper to skip projects if they were processed locally [Default: False]",
),
reinit_skipper: bool = typer.Option(
False, help="Reinitialize skipper. [Default: False]"
),
):
from .main import upload_gse as upload_gse_function

Expand All @@ -95,10 +133,15 @@ def upload_gse(
gse=gse,
create_bedset=create_bedset,
genome=genome,
preload=preload,
rerun=rerun,
run_skipped=run_skipped,
run_failed=run_failed,
standardize_pep=standardize_pep,
use_skipper=use_skipper,
reinit_skipper=reinit_skipper,
overwrite=overwrite,
overwrite_bedset=overwrite_bedset,
)


Expand Down
2 changes: 1 addition & 1 deletion bedboss/bbuploader/constants.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
PKG_NAME = "bbuploader"

FILE_FOLDER_NAME = "files"
FILE_FOLDER_NAME = "geo_files"

DEFAULT_GEO_TAG = "samples"

Expand Down
125 changes: 108 additions & 17 deletions bedboss/bbuploader/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,28 @@
from pephubclient import PEPHubClient
from pephubclient.helpers import MessageHandler
from pephubclient.models import SearchReturnModel
from setuptools.command.egg_info import overwrite_arg
from sqlalchemy import and_, select
from sqlalchemy.orm import Session

from bedboss.bbuploader.constants import DEFAULT_GEO_TAG, PKG_NAME, STATUS
from bedboss.bbuploader.constants import (
DEFAULT_GEO_TAG,
FILE_FOLDER_NAME,
PKG_NAME,
STATUS,
)
from bedboss.bbuploader.models import (
BedBossMetadata,
BedBossRequired,
ProjectProcessingStatus,
)
from bedboss.bbuploader.utils import create_gsm_sub_name
from bedboss.bedboss import run_all
from bedboss.bedbuncher.bedbuncher import run_bedbuncher
from bedboss.exceptions import BedBossException
from bedboss.utils import standardize_genome_name, standardize_pep as pep_standardizer
from bedboss.skipper import Skipper
from bedboss.utils import download_file, standardize_genome_name
from bedboss.utils import standardize_pep as pep_standardizer

_LOGGER = logging.getLogger(PKG_NAME)
_LOGGER.setLevel(logging.DEBUG)
Expand All @@ -36,10 +45,15 @@ def upload_all(
download_limit: int = 100,
genome: str = None,
create_bedset: bool = True,
preload=True,
rerun: bool = False,
run_skipped: bool = False,
run_failed: bool = True,
standardize_pep: bool = False,
use_skipper=True,
reinit_skipper=False,
overwrite=False,
overwrite_bedset=False,
):
"""
This is main function that is responsible for processing bed files from PEPHub.
Expand All @@ -53,10 +67,14 @@ def upload_all(
:param download_limit: limit of GSE projects to be downloaded (used for testing purposes) [Default: 100]
:param genome: reference genome [Default: None] (e.g. hg38) - if None, all genomes will be processed
:param create_bedset: create bedset from bed files
:param rerun: rerun processing of the series
:param run_skipped: rerun files that were skipped
:param run_failed: rerun failed files
:param preload: pre - download files to the local folder (used for faster reproducibility)
:param rerun: rerun processing of the series. Used in logging system. If you want to reupload file use overwrite
:param run_skipped: rerun files that were skipped. Used in logging system. If you want to reupload file use overwrite
:param run_failed: rerun failed files. Used in logging system. If you want to reupload file use overwrite
:param standardize_pep: standardize pep metadata using BEDMS
:param use_skipper: use skipper to skip already processed logged locally. Skipper creates local log of processed
and failed files.
:param reinit_skipper: reinitialize skipper, if set to True, skipper will be reinitialized and all logs files will be cleaned
"""

phc = PEPHubClient()
Expand Down Expand Up @@ -130,7 +148,12 @@ def upload_all(
sa_session=session,
gse_status_sa_model=gse_status,
standardize_pep=standardize_pep,
rerun=rerun,
# rerun=rerun,
use_skipper=use_skipper,
reinit_skipper=reinit_skipper,
preload=preload,
overwrite=overwrite,
overwrite_bedset=overwrite_bedset,
)
except Exception as err:
_LOGGER.error(
Expand Down Expand Up @@ -182,7 +205,7 @@ def process_pep_sample(
return BedBossRequired(
sample_name=bed_sample.sample_name,
file_path=bed_sample.file_url,
ref_genome=standardize_genome_name(bed_sample.ref_genome),
ref_genome=bed_sample.ref_genome,
type=file_type,
narrowpeak=is_narrowpeak,
pep=project_metadata,
Expand Down Expand Up @@ -251,10 +274,15 @@ def upload_gse(
outfolder: str = os.getcwd(),
create_bedset: bool = True,
genome: str = None,
preload: bool = True,
rerun: bool = False,
run_skipped: bool = False,
run_failed: bool = True,
standardize_pep: bool = False,
use_skipper=True,
reinit_skipper=False,
overwrite=False,
overwrite_bedset=False,
):
"""
Upload bed files from GEO series to BedBase
Expand All @@ -264,10 +292,16 @@ def upload_gse(
:param outfolder: working directory, where files will be downloaded, processed and statistics will be saved
:param create_bedset: create bedset from bed files
:param genome: reference genome to upload to database. If None, all genomes will be processed
:param preload: pre - download files to the local folder (used for faster reproducibility)
:param rerun: rerun processing of the series
:param run_skipped: rerun files that were skipped
:param run_failed: rerun failed files
:param standardize_pep: standardize pep metadata using BEDMS
:param use_skipper: use skipper to skip already processed logged locally. Skipper creates local log of processed
and failed files.
:param reinit_skipper: reinitialize skipper, if set to True, skipper will be reinitialized and all logs files will be cleaned
:param overwrite: overwrite existing bedfiles
:param overwrite_bedset: overwrite existing bedset

:return: None
"""
Expand Down Expand Up @@ -306,14 +340,18 @@ def upload_gse(
try:
upload_result = _upload_gse(
gse=gse,
bedbase_config=bedbase_config,
bedbase_config=bbagent,
outfolder=outfolder,
create_bedset=create_bedset,
genome=genome,
sa_session=session,
gse_status_sa_model=gse_status,
standardize_pep=standardize_pep,
rerun=rerun,
preload=preload,
overwrite=overwrite,
overwrite_bedset=overwrite_bedset,
use_skipper=use_skipper,
reinit_skipper=reinit_skipper,
)
except Exception as e:
_LOGGER.error(f"Processing of '{gse}' failed with error: {e}")
Expand Down Expand Up @@ -360,7 +398,11 @@ def _upload_gse(
sa_session: Session = None,
gse_status_sa_model: GeoGseStatus = None,
standardize_pep: bool = False,
rerun: bool = False,
overwrite: bool = False,
overwrite_bedset: bool = False,
use_skipper: bool = True,
reinit_skipper: bool = False,
preload: bool = True,
) -> ProjectProcessingStatus:
"""
Upload bed files from GEO series to BedBase
Expand All @@ -373,8 +415,12 @@ def _upload_gse(
:param sa_session: opened session to the database
:param gse_status_sa_model: sqlalchemy model for project status
:param standardize_pep: standardize pep metadata using BEDMS
:param rerun: force overwrite data in the database

:param overwrite: overwrite existing bedfiles
:param overwrite_bedset: overwrite existing bedset
:param use_skipper: use skipper to skip already processed logged locally. Skipper creates local log of processed
and failed files.
:param reinit_skipper: reinitialize skipper, if set to True, skipper will be reinitialized and all logs will be
:param preload: pre - download files to the local folder (used for faster reproducibility)
:return: None
"""
if isinstance(bedbase_config, str):
Expand All @@ -394,9 +440,34 @@ def _upload_gse(
uploaded_files = []
gse_status_sa_model.number_of_files = len(project.samples)
sa_session.commit()
for project_sample in project.samples:

total_sample_number = len(project.samples)

if use_skipper:
skipper_obj = Skipper(output_path=outfolder, name=gse)
if reinit_skipper:
skipper_obj.reinitialize()
_LOGGER.info(f"Skipper initialized for: '{gse}'")
else:
skipper_obj = None

for counter, project_sample in enumerate(project.samples):
_LOGGER.info(f">> Processing {counter+1} / {total_sample_number}")
sample_gsm = project_sample.get("sample_geo_accession", "").lower()

# if int(project_sample.get("file_size") or 0) > 10000000:
# _LOGGER.info(f"Skipping: '{sample_gsm}' - file size is too big")
# project_status.number_of_skipped += 1
# skipper_obj.add_failed(sample_gsm, f"File size is too big. {int(project_sample.get('file_size'))/1000000} MB")
# continue

if skipper_obj:
is_processed = skipper_obj.is_processed(sample_gsm)
if is_processed:
_LOGGER.info(f"Skipping: '{sample_gsm}' - already processed")
uploaded_files.append(is_processed)
continue

required_metadata = process_pep_sample(
bed_sample=project_sample,
)
Expand Down Expand Up @@ -440,10 +511,25 @@ def _upload_gse(
sample_status.status = STATUS.PROCESSING
sa_session.commit()

if preload:
gsm_folder = create_gsm_sub_name(sample_gsm)
files_path = os.path.join(outfolder, FILE_FOLDER_NAME, gsm_folder)
os.makedirs(files_path, exist_ok=True)
file_abs_path = os.path.abspath(
os.path.join(files_path, project_sample.file)
)
download_file(project_sample.file_url, file_abs_path, no_fail=True)
else:
file_abs_path = required_metadata.file_path

required_metadata.ref_genome = standardize_genome_name(
required_metadata.ref_genome, file_abs_path
)

try:
file_digest = run_all(
name=required_metadata.title,
input_file=required_metadata.file_path,
input_file=file_abs_path,
input_type=required_metadata.type,
outfolder=os.path.join(outfolder, "outputs"),
genome=required_metadata.ref_genome,
Expand All @@ -453,9 +539,11 @@ def _upload_gse(
upload_pephub=True,
upload_s3=True,
upload_qdrant=True,
force_overwrite=rerun,
force_overwrite=overwrite,
)
uploaded_files.append(file_digest)
if skipper_obj:
skipper_obj.add_processed(sample_gsm, file_digest)
sample_status.status = STATUS.SUCCESS
project_status.number_of_processed += 1

Expand All @@ -464,6 +552,9 @@ def _upload_gse(
sample_status.error = str(exc)
project_status.number_of_failed += 1

if skipper_obj:
skipper_obj.add_failed(sample_gsm, f"Error: {str(exc)}")

sa_session.commit()

if create_bedset and uploaded_files:
Expand All @@ -475,11 +566,11 @@ def _upload_gse(
output_folder=os.path.join(outfolder, "outputs"),
name=gse,
description=project.description,
heavy=True,
heavy=False,
upload_pephub=True,
upload_s3=True,
no_fail=True,
force_overwrite=False,
force_overwrite=overwrite_bedset,
)

else:
Expand Down
Loading
Loading