From 222e3e5be083f5f1df79d232fccad6a74329ca73 Mon Sep 17 00:00:00 2001 From: Bernt Popp Date: Sat, 11 Jan 2025 10:19:38 +0100 Subject: [PATCH 01/10] refactor: Enhance scoring functions with type hints and improve logic for frameshift identification --- vntyper/scripts/scoring.py | 150 +++++++++++++++++++++---------------- 1 file changed, 84 insertions(+), 66 deletions(-) diff --git a/vntyper/scripts/scoring.py b/vntyper/scripts/scoring.py index bdfcb39..861f038 100644 --- a/vntyper/scripts/scoring.py +++ b/vntyper/scripts/scoring.py @@ -24,19 +24,19 @@ import logging import pandas as pd +import numpy as np -def split_depth_and_calculate_frame_score(df): +def split_depth_and_calculate_frame_score(df: pd.DataFrame) -> pd.DataFrame: """ Splits the 'Sample' column to obtain alternate/depth coverage, - then calculates a 'Frame_Score' = (alt_len - ref_len)/3. + then calculates a 'Frame_Score' = (alt_len - ref_len) / 3. Steps: 1) df['Sample'].split(':') -> e.g., Del:AltDepth:ActiveDepth 2) Re-map to new columns (Estimated_Depth_AlternateVariant, etc.). - 3) Compute frame difference: (ALT length - REF length)/3. - Round to two decimals, converting e.g. "3.0" -> "3C" to denote - a multiple of 3. + 3) Compute frame difference: (ALT length - REF length) / 3. + Determine if it's a frameshift variant based on modulo operation. Args: df (pd.DataFrame): @@ -45,7 +45,7 @@ def split_depth_and_calculate_frame_score(df): Returns: pd.DataFrame: Adds 'Frame_Score' and keeps only frameshift - (non-"C") variants. The others are filtered out. + (non-multiple of 3) variants. The others are filtered out. """ logging.debug("Entering split_depth_and_calculate_frame_score") logging.debug(f"Initial row count: {len(df)}, columns: {df.columns.tolist()}") @@ -57,9 +57,13 @@ def split_depth_and_calculate_frame_score(df): # Step 1) Split the Sample column into 3 parts pre_split_rows = len(df) pre_split_cols = df.columns.tolist() - df[['Del', 'Estimated_Depth_AlternateVariant', 'Estimated_Depth_Variant_ActiveRegion']] = ( - df['Sample'].str.split(':', expand=True) - ) + split_columns = df['Sample'].str.split(':', expand=True) + split_columns.columns = [ + 'Del', + 'Estimated_Depth_AlternateVariant', + 'Estimated_Depth_Variant_ActiveRegion' + ] + df = pd.concat([df, split_columns], axis=1) logging.debug("After splitting 'Sample':") logging.debug(f"Changed from {pre_split_rows} rows, {pre_split_cols} columns") logging.debug(f"To {len(df)} rows, {df.columns.tolist()} columns") @@ -67,46 +71,43 @@ def split_depth_and_calculate_frame_score(df): # Step 2) Keep only necessary columns in a new DataFrame pre_select_rows = len(df) pre_select_cols = df.columns.tolist() - df = df[ - [ - 'Motifs', - 'Variant', - 'POS', - 'REF', - 'ALT', - 'Motif_sequence', - 'Estimated_Depth_AlternateVariant', - 'Estimated_Depth_Variant_ActiveRegion', - ] - ].copy() + necessary_columns = [ + 'Motifs', + 'Variant', + 'POS', + 'REF', + 'ALT', + 'Motif_sequence', + 'Estimated_Depth_AlternateVariant', + 'Estimated_Depth_Variant_ActiveRegion', + ] + df = df[necessary_columns].copy() logging.debug("After selecting necessary columns:") logging.debug(f"Changed from {pre_select_rows} rows, {pre_select_cols} columns") logging.debug(f"To {len(df)} rows, {df.columns.tolist()} columns") - # Step 3) Compute lengths, then frame score + # Step 3) Compute lengths and frame score pre_frame_rows = len(df) pre_frame_cols = df.columns.tolist() df["ref_len"] = df["REF"].str.len() df["alt_len"] = df["ALT"].str.len() - df["Frame_Score"] = ( - (df["alt_len"] - df["ref_len"]) / 3 - ).round(2).astype(str).apply(lambda x: x.replace('.0', 'C')) + df["Frame_Score"] = (df["alt_len"] - df["ref_len"]) / 3 logging.debug("After computing 'Frame_Score':") logging.debug(f"Changed from {pre_frame_rows} rows, {pre_frame_cols} columns") logging.debug(f"To {len(df)} rows, {df.columns.tolist()} columns") - # Step 4) Mark non-frameshift + # Step 4) Mark frameshift variants pre_truefalse_rows = len(df) pre_truefalse_cols = df.columns.tolist() - df["TrueFalse"] = df['Frame_Score'].str.contains('C', regex=True) - logging.debug("After marking non-frameshift variants:") + df["is_frameshift"] = ((df["alt_len"] - df["ref_len"]) % 3 != 0) + logging.debug("After marking frameshift variants:") logging.debug(f"Changed from {pre_truefalse_rows} rows, {pre_truefalse_cols} columns") logging.debug(f"To {len(df)} rows, {df.columns.tolist()} columns") - # Keep only rows where Frame_Score does not contain 'C' + # Keep only frameshift variants pre_filter_rows = len(df) pre_filter_cols = df.columns.tolist() - df = df[df["TrueFalse"] == False].copy() + df = df[df["is_frameshift"]].copy() logging.debug("After filtering out non-frameshift variants:") logging.debug(f"Changed from {pre_filter_rows} rows, {pre_filter_cols} columns") logging.debug(f"To {len(df)} rows, {df.columns.tolist()} columns") @@ -116,22 +117,23 @@ def split_depth_and_calculate_frame_score(df): return df -def split_frame_score(df): +def split_frame_score(df: pd.DataFrame) -> pd.DataFrame: """ - Splits 'Frame_Score' into 'left' and 'right' parts (e.g., "-1.33" → ["-1","33"]) + Splits 'Frame_Score' into 'direction' and 'frameshift_amount' for further logic in identifying insertion vs. deletion frameshifts. Steps: - 1) df['Frame_Score'].split('.', 2) - 2) If 'left' is '-0', replace with '-1' to standardize - 3) Drop intermediate columns (TrueFalse, ref_len, alt_len) + 1) Determine the direction based on the sign of (alt_len - ref_len). + 2) Calculate frameshift_amount as abs(alt_len - ref_len) % 3. + 3) Adjust direction if necessary. + 4) Drop intermediate columns. Args: df (pd.DataFrame): - Should contain 'Frame_Score', 'TrueFalse', 'ref_len', 'alt_len'. + Should contain 'Frame_Score', 'is_frameshift', 'ref_len', 'alt_len'. Returns: - pd.DataFrame: Now has columns 'left' and 'right' for analyzing frames. + pd.DataFrame: Now has columns 'direction' and 'frameshift_amount' for analyzing frames. """ logging.debug("Entering split_frame_score") logging.debug(f"Initial row count: {len(df)}, columns: {df.columns.tolist()}") @@ -140,26 +142,36 @@ def split_frame_score(df): logging.debug("DataFrame is empty. Exiting split_frame_score.") return df - # Step 1) Split 'Frame_Score' - pre_split_rows = len(df) - pre_split_cols = df.columns.tolist() - df[['left', 'right']] = df['Frame_Score'].str.split('.', expand=True) - logging.debug("After splitting 'Frame_Score':") - logging.debug(f"Changed from {pre_split_rows} rows, {pre_split_cols} columns") + # Step 1) Determine the direction + pre_direction_rows = len(df) + pre_direction_cols = df.columns.tolist() + df["direction"] = np.sign(df["alt_len"] - df["ref_len"]) + logging.debug("After determining 'direction':") + logging.debug(f"Changed from {pre_direction_rows} rows, {pre_direction_cols} columns") logging.debug(f"To {len(df)} rows, {df.columns.tolist()} columns") - # Step 2) Replace '-0' with '-1' - pre_replace_rows = len(df) - pre_replace_cols = df.columns.tolist() - df['left'] = df['left'].replace('-0', '-1') - logging.debug("After replacing '-0' with '-1' in 'left':") - logging.debug(f"Changed from {pre_replace_rows} rows, {pre_replace_cols} columns") + # Step 2) Calculate frameshift_amount + pre_frameshift_rows = len(df) + pre_frameshift_cols = df.columns.tolist() + df["frameshift_amount"] = (df["alt_len"] - df["ref_len"]).abs() % 3 + logging.debug("After calculating 'frameshift_amount':") + logging.debug(f"Changed from {pre_frameshift_rows} rows, {pre_frameshift_cols} columns") logging.debug(f"To {len(df)} rows, {df.columns.tolist()} columns") - # Step 3) Drop intermediate columns + # Step 3) Adjust direction if necessary (optional based on original logic) + # In the original code, '-0' was replaced with '-1', but since (alt_len - ref_len) cannot be zero here + # because is_frameshift is True, this step might be redundant. Including for completeness. + pre_adjust_rows = len(df) + pre_adjust_cols = df.columns.tolist() + df["direction"] = df["direction"].replace(0, -1) + logging.debug("After adjusting 'direction':") + logging.debug(f"Changed from {pre_adjust_rows} rows, {pre_adjust_cols} columns") + logging.debug(f"To {len(df)} rows, {df.columns.tolist()} columns") + + # Step 4) Drop intermediate columns pre_drop_rows = len(df) pre_drop_cols = df.columns.tolist() - df.drop(['TrueFalse', 'ref_len', 'alt_len'], axis=1, inplace=True) + df.drop(['is_frameshift', 'ref_len', 'alt_len'], axis=1, inplace=True) logging.debug("After dropping intermediate columns:") logging.debug(f"Changed from {pre_drop_rows} rows, {pre_drop_cols} columns") logging.debug(f"To {len(df)} rows, {df.columns.tolist()} columns") @@ -169,19 +181,19 @@ def split_frame_score(df): return df -def extract_frameshifts(df): +def extract_frameshifts(df: pd.DataFrame) -> pd.DataFrame: """ Extracts frameshift variants that follow known insertion/deletion patterns. Steps: - - For insertion frameshifts: (left not negative) & right = "33" + - For insertion frameshifts: direction > 0 & frameshift_amount == 1 (3n+1 logic). - - For deletion frameshifts: (left negative) & right = "67" + - For deletion frameshifts: direction < 0 & frameshift_amount == 2 (3n+2 logic). Args: df (pd.DataFrame): - Must contain 'left' and 'right' columns. + Must contain 'direction' and 'frameshift_amount' columns. Returns: pd.DataFrame: @@ -194,30 +206,36 @@ def extract_frameshifts(df): logging.debug("DataFrame is empty. Exiting extract_frameshifts.") return df - # Identify insertion frameshifts + # Identify insertion frameshifts: direction > 0 and frameshift_amount == 1 pre_ins_rows = len(df) pre_ins_cols = df.columns.tolist() ins = df[ - df["left"].apply(lambda x: '-' not in x) & - df["right"].apply(lambda y: '33' in y) + (df["direction"] > 0) & + (df["frameshift_amount"] == 1) ] logging.debug("After identifying insertion frameshifts:") - logging.debug(f"Sliced from {pre_ins_rows} rows, {pre_ins_cols} columns to {len(ins)} rows.") + logging.debug( + f"Sliced from {pre_ins_rows} rows, {pre_ins_cols} columns to {len(ins)} rows." + ) - # Identify deletion frameshifts + # Identify deletion frameshifts: direction < 0 and frameshift_amount == 2 pre_del_rows = len(df) pre_del_cols = df.columns.tolist() del_ = df[ - df["left"].apply(lambda x: '-' in x) & - df["right"].apply(lambda y: '67' in y) + (df["direction"] < 0) & + (df["frameshift_amount"] == 2) ] logging.debug("After identifying deletion frameshifts:") - logging.debug(f"Sliced from {pre_del_rows} rows, {pre_del_cols} columns to {len(del_)} rows.") + logging.debug( + f"Sliced from {pre_del_rows} rows, {pre_del_cols} columns to {len(del_)} rows." + ) - # Combine - combined = pd.concat([ins, del_], axis=0) + # Combine insertion and deletion frameshifts + combined = pd.concat([ins, del_], axis=0).reset_index(drop=True) logging.debug("After concatenating insertion and deletion frameshifts:") - logging.debug(f"Resulting row count: {len(combined)}, columns: {combined.columns.tolist()}") + logging.debug( + f"Resulting row count: {len(combined)}, columns: {combined.columns.tolist()}" + ) logging.debug("Exiting extract_frameshifts") return combined From 29cb68906411959a248518d4970c5cad8688ee0a Mon Sep 17 00:00:00 2001 From: Bernt Popp Date: Sat, 11 Jan 2025 11:35:24 +0100 Subject: [PATCH 02/10] fix: Uncomment summary_text in generate_summary_report for inclusion in context --- vntyper/scripts/generate_report.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vntyper/scripts/generate_report.py b/vntyper/scripts/generate_report.py index 46f7859..cc73a20 100644 --- a/vntyper/scripts/generate_report.py +++ b/vntyper/scripts/generate_report.py @@ -382,7 +382,7 @@ def warn_icon(value, cutoff, higher_better=True): 'passed_filter_icon': pf_icon, 'passed_filter_color': pf_color, 'sequencing_str': sequencing_str, - # 'summary_text': summary_text # Assuming it's included in the template + 'summary_text': summary_text # **Uncommented to include in context** } try: From 8b2ac43beb9e3457b18c6efb74d6347a6b0438ff Mon Sep 17 00:00:00 2001 From: Bernt Popp Date: Sat, 11 Jan 2025 12:04:38 +0100 Subject: [PATCH 03/10] refactor: Improve VCF reading and filtering functions with error handling and logging --- vntyper/scripts/confidence_assignment.py | 111 ++++++++--------------- vntyper/scripts/variant_parsing.py | 107 +++++++++++----------- 2 files changed, 91 insertions(+), 127 deletions(-) diff --git a/vntyper/scripts/confidence_assignment.py b/vntyper/scripts/confidence_assignment.py index 73b36ff..6483805 100644 --- a/vntyper/scripts/confidence_assignment.py +++ b/vntyper/scripts/confidence_assignment.py @@ -25,17 +25,14 @@ import logging import pandas as pd +import numpy as np # Import NumPy directly -def calculate_depth_score_and_assign_confidence(df, kestrel_config): +def calculate_depth_score_and_assign_confidence(df: pd.DataFrame, kestrel_config: dict) -> pd.DataFrame: """ Calculates Depth_Score for each variant and assigns a confidence label based on coverage thresholds defined in kestrel_config['confidence_assignment']. - This step is crucial for distinguishing between likely real variants - vs. potential false positives. Variants below a certain coverage ratio - or having fewer than a certain alt-depth are labeled "Low_Precision". - Args: df (pd.DataFrame): DataFrame with columns: @@ -55,85 +52,51 @@ def calculate_depth_score_and_assign_confidence(df, kestrel_config): - 'Confidence' (str: 'Low_Precision', 'High_Precision', etc.) """ logging.debug("Entering calculate_depth_score_and_assign_confidence") - logging.debug(f"Initial row count: {len(df)}, columns: {df.columns.tolist()}") + logging.debug(f"Initial DataFrame shape: {df.shape}") if df.empty: - logging.debug("DataFrame is empty. Exiting calculate_depth_score_and_assign_confidence.") + logging.debug("DataFrame is empty. Exiting function.") return df - # Step 1: Convert depth columns from string to integer - pre_convert_rows = len(df) - pre_convert_cols = df.columns.tolist() - df['Estimated_Depth_AlternateVariant'] = df['Estimated_Depth_AlternateVariant'].astype(int) - df['Estimated_Depth_Variant_ActiveRegion'] = df['Estimated_Depth_Variant_ActiveRegion'].astype(int) - logging.debug("After converting depth columns to integer:") - logging.debug(f"Changed from {pre_convert_rows} rows, {pre_convert_cols} columns") - logging.debug(f"To {len(df)} rows, {df.columns.tolist()} columns") + # Step 1: Convert depth columns to integers + depth_cols = ['Estimated_Depth_AlternateVariant', 'Estimated_Depth_Variant_ActiveRegion'] + df[depth_cols] = df[depth_cols].astype(int) + logging.debug("Converted depth columns to integers.") # Step 2: Calculate Depth_Score - pre_calculate_rows = len(df) - pre_calculate_cols = df.columns.tolist() - df['Depth_Score'] = ( - df['Estimated_Depth_AlternateVariant'] / - df['Estimated_Depth_Variant_ActiveRegion'] - ) - logging.debug("After calculating 'Depth_Score':") - logging.debug(f"Changed from {pre_calculate_rows} rows, {pre_calculate_cols} columns") - logging.debug(f"To {len(df)} rows, {df.columns.tolist()} columns") + df['Depth_Score'] = df['Estimated_Depth_AlternateVariant'] / df['Estimated_Depth_Variant_ActiveRegion'] + logging.debug("Calculated 'Depth_Score'.") # Step 3: Assign confidence conf_assign = kestrel_config['confidence_assignment'] - depth_score_thresholds = conf_assign['depth_score_thresholds'] - alt_depth_thresholds = conf_assign['alt_depth_thresholds'] - var_active_region_threshold = conf_assign['var_active_region_threshold'] + thresholds = conf_assign['depth_score_thresholds'] + alt_thresholds = conf_assign['alt_depth_thresholds'] + var_region_threshold = conf_assign['var_active_region_threshold'] confidence_levels = conf_assign['confidence_levels'] - def assign_confidence(row): - depth_score = row['Depth_Score'] - alt_depth = row['Estimated_Depth_AlternateVariant'] - var_region = row['Estimated_Depth_Variant_ActiveRegion'] - - # Compare depth_score, alt_depth, and var_region to thresholds - if ( - depth_score <= depth_score_thresholds['low'] - or var_region <= var_active_region_threshold - ): - return confidence_levels['low_precision'] - elif ( - alt_depth_thresholds['mid_low'] - <= alt_depth - <= alt_depth_thresholds['mid_high'] - and depth_score_thresholds['low'] - <= depth_score - <= depth_score_thresholds['high'] - ): - return confidence_levels['low_precision'] - elif alt_depth > alt_depth_thresholds['mid_high']: - return confidence_levels['high_precision'] - elif alt_depth <= alt_depth_thresholds['low']: - return confidence_levels['low_precision'] - elif ( - alt_depth_thresholds['mid_low'] - <= alt_depth - < alt_depth_thresholds['mid_high'] - and depth_score >= depth_score_thresholds['high'] - ): - return confidence_levels['high_precision'] - elif ( - alt_depth >= alt_depth_thresholds['mid_high'] - and depth_score >= depth_score_thresholds['high'] - ): - return confidence_levels['high_precision_star'] - else: - return confidence_levels['low_precision'] - - pre_conf_rows = len(df) - pre_conf_cols = df.columns.tolist() - df['Confidence'] = df.apply(assign_confidence, axis=1) - logging.debug("After assigning 'Confidence':") - logging.debug(f"Changed from {pre_conf_rows} rows, {pre_conf_cols} columns") - logging.debug(f"To {len(df)} rows, {df.columns.tolist()} columns") - + conditions = [ + (df['Depth_Score'] <= thresholds['low']) | (df['Estimated_Depth_Variant_ActiveRegion'] <= var_region_threshold), + ((df['Estimated_Depth_AlternateVariant'] > alt_thresholds['mid_high']) & + (df['Depth_Score'] >= thresholds['high'])), + ((df['Estimated_Depth_AlternateVariant'].between(alt_thresholds['mid_low'], alt_thresholds['mid_high'])) & + (df['Depth_Score'].between(thresholds['low'], thresholds['high']))), + (df['Estimated_Depth_AlternateVariant'] <= alt_thresholds['low']), + ((df['Estimated_Depth_AlternateVariant'].between(alt_thresholds['mid_low'], alt_thresholds['mid_high'])) & + (df['Depth_Score'] >= thresholds['high'])) + ] + + choices = [ + confidence_levels['low_precision'], + confidence_levels['high_precision'], + confidence_levels['low_precision'], + confidence_levels['low_precision'], + confidence_levels['high_precision'] + ] + + # Replace pd.np.select with np.select + df['Confidence'] = np.select(conditions, choices, default=confidence_levels.get('low_precision', 'Low_Precision')) + logging.debug("Assigned 'Confidence' labels based on conditions.") + + logging.debug(f"Final DataFrame shape: {df.shape}") logging.debug("Exiting calculate_depth_score_and_assign_confidence") - logging.debug(f"Final row count: {len(df)}, columns: {df.columns.tolist()}") return df diff --git a/vntyper/scripts/variant_parsing.py b/vntyper/scripts/variant_parsing.py index 6893196..f2bc2ce 100644 --- a/vntyper/scripts/variant_parsing.py +++ b/vntyper/scripts/variant_parsing.py @@ -25,11 +25,14 @@ - Saei et al., iScience 26, 107171 (2023) """ +import gzip import logging +from typing import Optional + import pandas as pd -def read_vcf_without_comments(vcf_file): +def read_vcf_without_comments(vcf_file: str) -> pd.DataFrame: """ Reads a VCF file (possibly gzipped) ignoring lines starting with '##'. The line starting with '#CHROM' is used to define DataFrame columns. @@ -42,27 +45,32 @@ def read_vcf_without_comments(vcf_file): pd.DataFrame: Contains the main variant records. May be empty if the file has no variants. """ - import gzip - open_func = gzip.open if vcf_file.endswith(".gz") else open data = [] - header_line = None - - with open_func(vcf_file, 'rt') as f: - for line in f: - # The "##" lines are meta lines to be skipped - if line.startswith("#CHROM"): - header_line = line.strip().split('\t') - elif not line.startswith("##"): - data.append(line.strip().split('\t')) - - if data: - return pd.DataFrame(data, columns=header_line) - else: + header: Optional[list] = None + + try: + with open_func(vcf_file, 'rt') as f: + for line in f: + if line.startswith("#CHROM"): + header = line.strip().split('\t') + elif not line.startswith("##") and header: + data.append(line.strip().split('\t')) + except FileNotFoundError: + logging.error(f"VCF file not found: {vcf_file}") return pd.DataFrame() + except Exception as e: + logging.error(f"Error reading VCF file {vcf_file}: {e}") + return pd.DataFrame() + + if data and header: + logging.debug(f"VCF read successfully with {len(data)} records.") + return pd.DataFrame(data, columns=header) + logging.debug("No variant records found in VCF.") + return pd.DataFrame() -def filter_by_alt_values_and_finalize(df, kestrel_config): +def filter_by_alt_values_and_finalize(df: pd.DataFrame, kestrel_config: dict) -> pd.DataFrame: """ Applies final filtering rules based on ALT values, e.g., removing certain ALTs or requiring a minimal Depth_Score if ALT='GG'. @@ -87,50 +95,43 @@ def filter_by_alt_values_and_finalize(df, kestrel_config): pd.DataFrame: Filtered, finalized DataFrame ready for final steps. """ logging.debug("Entering filter_by_alt_values_and_finalize") - logging.debug(f"Initial row count: {len(df)}, columns: {df.columns.tolist()}") + logging.debug(f"Initial DataFrame shape: {df.shape}") if df.empty: - logging.debug("DataFrame is empty. Exiting filter_by_alt_values_and_finalize.") + logging.debug("DataFrame is empty. Exiting function.") return df - alt_filter = kestrel_config['alt_filtering'] - gg_alt_value = alt_filter['gg_alt_value'] - gg_depth_score_threshold = alt_filter['gg_depth_score_threshold'] - exclude_alts = alt_filter['exclude_alts'] - - # Step 1) If 'GG' in ALT, keep rows only if Depth_Score >= threshold - pre_gg_rows = len(df) - pre_gg_cols = df.columns.tolist() - if df['ALT'].str.contains(r'\b' + gg_alt_value + r'\b').any(): - gg_condition = df['ALT'] == gg_alt_value - df = pd.concat( - [ - df[~gg_condition], - df[gg_condition & (df['Depth_Score'] >= gg_depth_score_threshold)], - ] - ) - logging.debug("After applying 'GG' depth_score filter:") - logging.debug(f"Changed from {pre_gg_rows} rows, {pre_gg_cols} columns") - logging.debug(f"To {len(df)} rows, {df.columns.tolist()} columns") - - # Step 2) Exclude specified ALTs - pre_exclude_rows = len(df) - pre_exclude_cols = df.columns.tolist() + # Validate required columns + required_columns = {'ALT', 'Depth_Score'} + missing_columns = required_columns - set(df.columns) + if missing_columns: + logging.error(f"Missing required columns: {missing_columns}") + raise KeyError(f"Missing required columns: {missing_columns}") + + alt_filter = kestrel_config.get('alt_filtering', {}) + gg_alt_value = alt_filter.get('gg_alt_value', 'GG') + gg_depth_threshold = alt_filter.get('gg_depth_score_threshold', 0.0) + exclude_alts = alt_filter.get('exclude_alts', []) + + # Step 1: Filter 'GG' ALT with Depth_Score threshold + gg_mask = df['ALT'] == gg_alt_value + if gg_mask.any(): + non_gg = df[~gg_mask] + gg_filtered = df[gg_mask & (df['Depth_Score'].astype(float) >= gg_depth_threshold)] + df = pd.concat([non_gg, gg_filtered], ignore_index=True) + logging.debug(f"Applied 'GG' depth score filter: {df.shape}") + + # Step 2: Exclude specified ALTs + initial_count = len(df) df = df[~df['ALT'].isin(exclude_alts)] - logging.debug("After excluding specified ALTs:") - logging.debug(f"Changed from {pre_exclude_rows} rows, {pre_exclude_cols} columns") - logging.debug(f"To {len(df)} rows, {df.columns.tolist()} columns") + logging.debug(f"Excluded specified ALTs: {initial_count} -> {df.shape[0]} records") - # Step 3) Drop intermediate columns from frame splitting + # Step 3: Drop intermediate columns if they exist drop_cols = [col for col in ['left', 'right'] if col in df.columns] if drop_cols: - pre_drop_rows = len(df) - pre_drop_cols = df.columns.tolist() - df.drop(drop_cols, axis=1, inplace=True) - logging.debug("After dropping columns for frame splitting:") - logging.debug(f"Changed from {pre_drop_rows} rows, {pre_drop_cols} columns") - logging.debug(f"To {len(df)} rows, {df.columns.tolist()} columns") + df = df.drop(columns=drop_cols) + logging.debug(f"Dropped intermediate columns: {drop_cols}") + logging.debug(f"Final DataFrame shape: {df.shape}") logging.debug("Exiting filter_by_alt_values_and_finalize") - logging.debug(f"Final row count: {len(df)}, columns: {df.columns.tolist()}") return df From 2f3a74b953dfd1d80df10421a459fb570f9a55e0 Mon Sep 17 00:00:00 2001 From: Bernt Popp Date: Sun, 12 Jan 2025 13:53:31 +0100 Subject: [PATCH 04/10] fix: Update confidence assignment logic to include high precision star condition and improve error handling --- vntyper/scripts/confidence_assignment.py | 29 +++++++++++++++++------- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/vntyper/scripts/confidence_assignment.py b/vntyper/scripts/confidence_assignment.py index 6483805..628d412 100644 --- a/vntyper/scripts/confidence_assignment.py +++ b/vntyper/scripts/confidence_assignment.py @@ -75,25 +75,38 @@ def calculate_depth_score_and_assign_confidence(df: pd.DataFrame, kestrel_config confidence_levels = conf_assign['confidence_levels'] conditions = [ + # Condition 1: Low Precision (df['Depth_Score'] <= thresholds['low']) | (df['Estimated_Depth_Variant_ActiveRegion'] <= var_region_threshold), - ((df['Estimated_Depth_AlternateVariant'] > alt_thresholds['mid_high']) & - (df['Depth_Score'] >= thresholds['high'])), - ((df['Estimated_Depth_AlternateVariant'].between(alt_thresholds['mid_low'], alt_thresholds['mid_high'])) & - (df['Depth_Score'].between(thresholds['low'], thresholds['high']))), + + # Condition 2: High Precision Star (Updated to include alt_depth >= mid_high) + (df['Estimated_Depth_AlternateVariant'] >= alt_thresholds['mid_high']) & + (df['Depth_Score'] >= thresholds['high']), + + # Condition 3: Low Precision + (df['Estimated_Depth_AlternateVariant'].between(alt_thresholds['mid_low'], alt_thresholds['mid_high'])) & + (df['Depth_Score'].between(thresholds['low'], thresholds['high'])), + + # Condition 4: Low Precision (df['Estimated_Depth_AlternateVariant'] <= alt_thresholds['low']), - ((df['Estimated_Depth_AlternateVariant'].between(alt_thresholds['mid_low'], alt_thresholds['mid_high'])) & - (df['Depth_Score'] >= thresholds['high'])) + + # Condition 5: High Precision + (df['Estimated_Depth_AlternateVariant'].between(alt_thresholds['mid_low'], alt_thresholds['mid_high'], inclusive='left')) & + (df['Depth_Score'] >= thresholds['high']) ] choices = [ confidence_levels['low_precision'], - confidence_levels['high_precision'], + confidence_levels['high_precision_star'], # Updated to assign 'High_Precision*' confidence_levels['low_precision'], confidence_levels['low_precision'], confidence_levels['high_precision'] ] - # Replace pd.np.select with np.select + # Ensure 'high_precision_star' exists in confidence_levels + if 'high_precision_star' not in confidence_levels: + logging.error("'high_precision_star' not found in confidence_levels.") + raise KeyError("'high_precision_star' not found in confidence_levels.") + df['Confidence'] = np.select(conditions, choices, default=confidence_levels.get('low_precision', 'Low_Precision')) logging.debug("Assigned 'Confidence' labels based on conditions.") From 498c9f9a25d037817a26d4ed5923f9cd85390760 Mon Sep 17 00:00:00 2001 From: Bernt Popp Date: Sun, 12 Jan 2025 15:16:57 +0100 Subject: [PATCH 05/10] test: Add unit tests for scoring functions including frame-score calculations and frameshift extraction --- tests/unit/test_scoring.py | 143 +++++++++++++++++++++++++++++++++++++ 1 file changed, 143 insertions(+) create mode 100644 tests/unit/test_scoring.py diff --git a/tests/unit/test_scoring.py b/tests/unit/test_scoring.py new file mode 100644 index 0000000..64aee20 --- /dev/null +++ b/tests/unit/test_scoring.py @@ -0,0 +1,143 @@ +#!/usr/bin/env python3 +# tests/unit/test_scoring.py + +""" +Unit tests for the scoring functionality in vntyper/scripts/scoring.py. +Validates frame-score calculations, depth splitting, and frameshift extraction. +""" + +import pytest +import pandas as pd +import numpy as np + +from vntyper.scripts.scoring import ( + split_depth_and_calculate_frame_score, + split_frame_score, + extract_frameshifts, +) + + +@pytest.mark.parametrize("df_input,expected_len", [ + (pd.DataFrame(), 0), +]) +def test_split_depth_and_calculate_frame_score_empty_df(df_input, expected_len): + """ + Verify that an empty input DataFrame remains empty. + """ + out = split_depth_and_calculate_frame_score(df_input) + assert len(out) == expected_len, ( + "Empty input should yield empty output after split_depth_and_calculate_frame_score." + ) + + +def test_split_depth_and_calculate_frame_score_no_frameshift(): + """ + If the difference (ALT length - REF length) is a multiple of 3, + the variant should not be retained (non-frameshift). + """ + df = pd.DataFrame({ + "Sample": ["Del:10:100"], # Only the first 'Del' part is not used, but we keep format for test + "REF": ["ATG"], # length 3 + "ALT": ["ATGATG"], # length 6 -> difference = 3 -> multiple of 3 + "Motifs": ["mock_motif"], + "Variant": ["mock_variant"], + "POS": [123], + "Motif_sequence": ["mock_sequence"] + }) + out = split_depth_and_calculate_frame_score(df) + # Because it's a multiple of 3 difference, is_frameshift == False => filtered out + assert out.empty, ( + "Variants with multiple-of-3 difference should be filtered out as non-frameshift." + ) + + +def test_split_depth_and_calculate_frame_score_frameshift(): + """ + If the difference (ALT length - REF length) is not a multiple of 3, + the variant should be retained and a 'Frame_Score' should be added. + """ + df = pd.DataFrame({ + "Sample": ["Del:50:500"], + "REF": ["ATG"], # length 3 + "ALT": ["ATGA"], # length 4 -> difference = 1 -> frameshift + "Motifs": ["mock_motif"], + "Variant": ["mock_variant"], + "POS": [456], + "Motif_sequence": ["mock_sequence"] + }) + out = split_depth_and_calculate_frame_score(df) + assert not out.empty, "Expected to retain a frameshift variant (difference not multiple of 3)." + assert "Frame_Score" in out.columns, "Output should have a 'Frame_Score' column." + # Check that is_frameshift was True + assert "is_frameshift" in out.columns, "Output should have 'is_frameshift' marking frameshift or not." + assert all(out["is_frameshift"]), "All retained rows should be frameshift variants." + + +def test_split_frame_score_empty_df(): + """ + Verify that an empty input DataFrame remains empty when split_frame_score is called. + """ + df = pd.DataFrame() + out = split_frame_score(df) + assert out.empty, "Empty input should yield empty output after split_frame_score." + + +def test_split_frame_score_basic(): + """ + Test basic splitting of frame score into 'direction' and 'frameshift_amount'. + """ + df = pd.DataFrame({ + "Frame_Score": [1.0, -2.0], # not directly used, but indicates frameshift + "ref_len": [3, 6], + "alt_len": [4, 4], # alt_len - ref_len => [1, -2] + "is_frameshift": [True, True] # frameshift is assumed True from previous step + }) + out = split_frame_score(df) + + # We drop 'is_frameshift', 'ref_len', 'alt_len' + # We keep 'direction', 'frameshift_amount', 'Frame_Score', etc. + expected_columns = {"Frame_Score", "direction", "frameshift_amount"} + assert expected_columns.issubset(set(out.columns)), ( + f"Output must contain at least: {expected_columns}" + ) + + # direction = sign(alt_len - ref_len) + # frameshift_amount = abs(alt_len - ref_len) % 3 + # For row0: alt_len - ref_len = 1 => direction=1, frameshift_amount=1 + # For row1: alt_len - ref_len = -2 => direction < 0 => -1, frameshift_amount=2 + assert out.loc[0, "direction"] == 1, "Expected direction=1 for alt_len-ref_len=1." + assert out.loc[0, "frameshift_amount"] == 1, "Expected frameshift_amount=1 for difference=1." + + assert out.loc[1, "direction"] == -1, "Expected direction=-1 for alt_len-ref_len=-2." + assert out.loc[1, "frameshift_amount"] == 2, "Expected frameshift_amount=2 for difference=-2." + + +def test_extract_frameshifts_empty_df(): + """ + Verify that an empty input DataFrame remains empty in extract_frameshifts. + """ + df = pd.DataFrame() + out = extract_frameshifts(df) + assert out.empty, "Empty input should yield empty output after extract_frameshifts." + + +def test_extract_frameshifts_mixed(): + """ + Test that only frameshift rows meeting the 3n+1 insertion (direction>0) or + 3n+2 deletion (direction<0) are retained. + """ + df = pd.DataFrame({ + "direction": [1, 1, -1, -1, 1], + "frameshift_amount": [1, 2, 2, 1, 1], + "Frame_Score": [0.33, 0.66, -0.66, -0.33, 0.33], + "Variant": ["ins_ok", "ins_wrong", "del_ok", "del_wrong", "ins_ok2"] + }) + # insertion frameshift => direction>0 and frameshift_amount=1 + # deletion frameshift => direction<0 and frameshift_amount=2 + out = extract_frameshifts(df) + # Expect to retain rows: 0 (ins_ok), 2 (del_ok), 4 (ins_ok2) + # Indices 1 (ins_wrong) and 3 (del_wrong) should be dropped + assert len(out) == 3, "Expected to keep 3 rows that match frameshift patterns." + assert sorted(out["Variant"].tolist()) == sorted(["ins_ok", "del_ok", "ins_ok2"]), ( + "Wrong set of frameshift variants retained." + ) From 28754f64c395149df894da5e4a354e0693deba5b Mon Sep 17 00:00:00 2001 From: Bernt Popp Date: Sun, 12 Jan 2025 15:29:56 +0100 Subject: [PATCH 06/10] test: Add unit tests for ALT filtering in variant_parsing.py --- tests/unit/test_variant_parsing.py | 115 +++++++++++++++++++++++++++++ 1 file changed, 115 insertions(+) create mode 100644 tests/unit/test_variant_parsing.py diff --git a/tests/unit/test_variant_parsing.py b/tests/unit/test_variant_parsing.py new file mode 100644 index 0000000..15ed8ab --- /dev/null +++ b/tests/unit/test_variant_parsing.py @@ -0,0 +1,115 @@ +#!/usr/bin/env python3 +# tests/unit/test_variant_parsing.py + +""" +Unit tests for variant_parsing.py, focusing on: + filter_by_alt_values_and_finalize() + +Ensures that ALT-based filtering rules are correctly applied: + - 'GG' alt requires a minimum Depth_Score. + - exclude_alts removed from final DataFrame. + - left/right columns dropped at the end. +""" + +import pytest +import pandas as pd +from vntyper.scripts.variant_parsing import filter_by_alt_values_and_finalize + + +@pytest.fixture +def kestrel_config_mock(): + """ + Provide a mock kestrel_config dict specifically for ALT filtering tests. + Real config may contain more fields, but we only need 'alt_filtering' here. + """ + return { + "alt_filtering": { + "gg_alt_value": "GG", + "gg_depth_score_threshold": 0.02, + "exclude_alts": ["BAD_ALT", "ZZZ"] + } + } + + +def test_filter_by_alt_values_empty_df(kestrel_config_mock): + """ + Test that an empty DataFrame simply returns empty without error. + """ + df = pd.DataFrame() + out = filter_by_alt_values_and_finalize(df, kestrel_config_mock) + assert out.empty, "Empty input should yield empty output." + + +def test_filter_by_alt_values_missing_columns(kestrel_config_mock): + """ + Test that missing 'ALT' or 'Depth_Score' columns raises KeyError. + """ + df = pd.DataFrame({ + "ALT": ["GG", "ABC"], + # 'Depth_Score' is missing here + }) + + with pytest.raises(KeyError) as exc_info: + filter_by_alt_values_and_finalize(df, kestrel_config_mock) + + assert "Missing required columns" in str(exc_info.value), ( + "Expected KeyError due to missing required 'Depth_Score' column." + ) + + +def test_filter_by_alt_values_gg_filter_below_threshold(kestrel_config_mock): + """ + If ALT='GG' but Depth_Score < threshold, that row should be removed. + """ + df = pd.DataFrame({ + "ALT": ["GG", "GG", "XYZ"], + "Depth_Score": [0.019, 0.02, 0.5] # 0.019 < threshold => remove, 0.02 >= threshold => keep + }) + + out = filter_by_alt_values_and_finalize(df, kestrel_config_mock) + # The first row has Depth_Score=0.019 => < 0.02 => removed + # The second row has Depth_Score=0.02 => OK => keep + # The third row has ALT=XYZ => unaffected by the GG filter => keep + assert len(out) == 2, ( + "Expected only 2 rows to remain after removing GG with insufficient Depth_Score." + ) + # Check that 'GG' row with Depth_Score=0.019 was removed + assert (out["Depth_Score"] < 0.02).sum() == 0, "No row should have Depth_Score < 0.02 for 'GG' alt." + + +def test_filter_by_alt_values_exclude_alts(kestrel_config_mock): + """ + Test that ALTs in 'exclude_alts' are removed from the DataFrame. + """ + df = pd.DataFrame({ + "ALT": ["GG", "BAD_ALT", "OK_ALT", "ZZZ", "ANOTHER"], + "Depth_Score": [0.5, 0.1, 0.3, 0.2, 0.6] # just some placeholder scores + }) + + out = filter_by_alt_values_and_finalize(df, kestrel_config_mock) + # Excluded: "BAD_ALT" and "ZZZ" + # Keep: "GG", "OK_ALT", "ANOTHER" + kept_alts = out["ALT"].tolist() + assert len(out) == 3, "Expected 3 ALTs after excluding 'BAD_ALT' and 'ZZZ'." + assert "BAD_ALT" not in kept_alts, "'BAD_ALT' should be removed." + assert "ZZZ" not in kept_alts, "'ZZZ' should be removed." + + +def test_filter_by_alt_values_drop_left_right(kestrel_config_mock): + """ + Test that 'left' and 'right' columns (if present) are dropped. + """ + df = pd.DataFrame({ + "ALT": ["GG", "ABC"], + "Depth_Score": [0.05, 0.02], + "left": ["some_left_data", "some_left_data"], + "right": ["some_right_data", "some_right_data"] + }) + out = filter_by_alt_values_and_finalize(df, kestrel_config_mock) + + # We keep both rows because 'GG' with Depth_Score=0.05 is fine, + # and "ABC" is not in exclude_alts => all good. + assert len(out) == 2, "Expected 2 rows total." + assert "left" not in out.columns and "right" not in out.columns, ( + "Expected the 'left' and 'right' columns to be dropped." + ) From 3646e4a288ff17222d6892abba0dd854de055c3a Mon Sep 17 00:00:00 2001 From: Bernt Popp Date: Sun, 12 Jan 2025 16:37:39 +0100 Subject: [PATCH 07/10] fix: Update confidence values in test data --- tests/test_data_config.json | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_data_config.json b/tests/test_data_config.json index 982fcf2..f10b9f8 100644 --- a/tests/test_data_config.json +++ b/tests/test_data_config.json @@ -208,7 +208,7 @@ "value": 0.05869074492099323, "tolerance_percentage": 5 }, - "Confidence": "High_Precision" + "Confidence": "High_Precision*" }, "check_igv_report": true }, @@ -248,7 +248,7 @@ "value": 0.02168057579370336, "tolerance_percentage": 5 }, - "Confidence": "High_Precision" + "Confidence": "High_Precision*" }, "check_igv_report": true }, @@ -288,7 +288,7 @@ "value": 0.010855245823892079, "tolerance_percentage": 5 }, - "Confidence": "High_Precision" + "Confidence": "High_Precision*" }, "check_igv_report": false } From 4df6efc06eb73daf72e37ca5a22634252741d404 Mon Sep 17 00:00:00 2001 From: Bernt Popp Date: Sun, 12 Jan 2025 16:37:53 +0100 Subject: [PATCH 08/10] refactor: Enhance motif processing and variant filtering logic with early exits and improved comments --- vntyper/scripts/motif_processing.py | 32 ++++++++++++++++++----------- vntyper/scripts/variant_parsing.py | 23 ++++++++++----------- 2 files changed, 31 insertions(+), 24 deletions(-) diff --git a/vntyper/scripts/motif_processing.py b/vntyper/scripts/motif_processing.py index d2c0434..434afe6 100644 --- a/vntyper/scripts/motif_processing.py +++ b/vntyper/scripts/motif_processing.py @@ -174,19 +174,20 @@ def motif_correction_and_annotation(df, merged_motifs, kestrel_config): logging.debug("Entering motif_correction_and_annotation") logging.debug(f"Initial row count: {len(df)}, columns: {df.columns.tolist()}") + # Exit early if empty if df.empty: logging.debug("DataFrame is empty. Exiting motif_correction_and_annotation.") return df mf = kestrel_config['motif_filtering'] - position_threshold = mf.get('position_threshold', 1000) + position_threshold = mf.get('position_threshold', 60) exclude_motifs_right = mf.get('exclude_motifs_right', []) alt_for_motif_right_gg = mf.get('alt_for_motif_right_gg', 'GG') motifs_for_alt_gg = mf.get('motifs_for_alt_gg', []) exclude_alts_combined = mf.get('exclude_alts_combined', []) exclude_motifs_combined = mf.get('exclude_motifs_combined', []) - # Step 1) Possible rename to 'Motif_fasta' + splitting 'Motifs' + # Step 1) Rename to 'Motif_fasta' + split 'Motifs' pre_split_rows = len(df) pre_split_cols = df.columns.tolist() if 'Motifs' in df.columns: @@ -197,11 +198,12 @@ def motif_correction_and_annotation(df, merged_motifs, kestrel_config): else: logging.error("Unexpected format in 'Motifs' column: cannot split as 'left-right'.") return pd.DataFrame() + logging.debug("After splitting 'Motifs' into left/right:") logging.debug(f"Changed from {pre_split_rows} rows, {pre_split_cols} columns") logging.debug(f"To {len(df)} rows, {df.columns.tolist()} columns") - # Step 2) Filter by 'POS' into motif_left, motif_right + # Step 2) Separate into motif_left, motif_right by POS threshold df['POS'] = df['POS'].astype(int) pre_filter_rows = len(df) pre_filter_cols = df.columns.tolist() @@ -211,12 +213,12 @@ def motif_correction_and_annotation(df, merged_motifs, kestrel_config): logging.debug(f"Original had {pre_filter_rows} rows, columns: {pre_filter_cols}") logging.debug(f"motif_left has {len(motif_left)} rows, motif_right has {len(motif_right)} rows") - # Step 3) Merge with additional motifs on left side + # Step 3) Merge + filter left side if not motif_left.empty: pre_left_merge_rows = len(motif_left) pre_left_merge_cols = motif_left.columns.tolist() motif_left.rename(columns={'Motif_right': 'Motif'}, inplace=True) - motif_left.drop(['Motif_sequence'], axis=1, inplace=True) + motif_left.drop(['Motif_sequence'], axis=1, inplace=True) # if it exists motif_left = motif_left.merge(merged_motifs, on='Motif', how='left') logging.debug("After merging left motif with 'merged_motifs':") logging.debug(f"Changed from {pre_left_merge_rows} rows, {pre_left_merge_cols} columns") @@ -237,21 +239,26 @@ def motif_correction_and_annotation(df, merged_motifs, kestrel_config): ] motif_left = motif_left[keep_cols] pre_left_filter_rows = len(motif_left) - motif_left = ( - motif_left.sort_values('Depth_Score', ascending=False) - .drop_duplicates('ALT', keep='first') - .sort_values('POS', ascending=False) - .tail(1) + + # Sort by depth, drop duplicate ALTs but DO NOT do a final .tail(1) + motif_left = motif_left.sort_values( + ["Depth_Score", "POS"], ascending=[False, False] ) + + # Drop duplicates on ALT, keep first in this order: + motif_left = motif_left.drop_duplicates("ALT", keep="first") + + + logging.debug("After final filtering on left motif variants:") logging.debug(f"Changed from {pre_left_filter_rows} rows to {len(motif_left)} rows") - # Step 4) Merge with additional motifs on right side + # Step 4) Merge + filter right side if not motif_right.empty: pre_right_merge_rows = len(motif_right) pre_right_merge_cols = motif_right.columns.tolist() motif_right.rename(columns={'Motif_left': 'Motif'}, inplace=True) - motif_right.drop(['Motif_sequence'], axis=1, inplace=True) + motif_right.drop(['Motif_sequence'], axis=1, inplace=True) # if it exists motif_right = motif_right.merge(merged_motifs, on='Motif', how='left') logging.debug("After merging right motif with 'merged_motifs':") logging.debug(f"Changed from {pre_right_merge_rows} rows, {pre_right_merge_cols} columns") @@ -288,6 +295,7 @@ def motif_correction_and_annotation(df, merged_motifs, kestrel_config): motif_right.sort_values('Depth_Score', ascending=False) .drop_duplicates('ALT', keep='first') ) + logging.debug("After handling 'GG' alt on right motif:") logging.debug(f"Changed from {pre_right_gg_rows} rows to {len(motif_right)} rows") diff --git a/vntyper/scripts/variant_parsing.py b/vntyper/scripts/variant_parsing.py index f2bc2ce..b7327ed 100644 --- a/vntyper/scripts/variant_parsing.py +++ b/vntyper/scripts/variant_parsing.py @@ -97,6 +97,7 @@ def filter_by_alt_values_and_finalize(df: pd.DataFrame, kestrel_config: dict) -> logging.debug("Entering filter_by_alt_values_and_finalize") logging.debug(f"Initial DataFrame shape: {df.shape}") + # Early exit if empty if df.empty: logging.debug("DataFrame is empty. Exiting function.") return df @@ -108,26 +109,24 @@ def filter_by_alt_values_and_finalize(df: pd.DataFrame, kestrel_config: dict) -> logging.error(f"Missing required columns: {missing_columns}") raise KeyError(f"Missing required columns: {missing_columns}") + # Fetch filtering parameters from config alt_filter = kestrel_config.get('alt_filtering', {}) gg_alt_value = alt_filter.get('gg_alt_value', 'GG') gg_depth_threshold = alt_filter.get('gg_depth_score_threshold', 0.0) exclude_alts = alt_filter.get('exclude_alts', []) - # Step 1: Filter 'GG' ALT with Depth_Score threshold - gg_mask = df['ALT'] == gg_alt_value - if gg_mask.any(): - non_gg = df[~gg_mask] - gg_filtered = df[gg_mask & (df['Depth_Score'].astype(float) >= gg_depth_threshold)] - df = pd.concat([non_gg, gg_filtered], ignore_index=True) - logging.debug(f"Applied 'GG' depth score filter: {df.shape}") + # Convert Depth_Score to float to ensure numeric comparison + df['Depth_Score'] = df['Depth_Score'].astype(float) - # Step 2: Exclude specified ALTs + # STEP 1 & 2) Filter rows with ALT=GG by depth threshold and exclude any ALTs in exclude_alts initial_count = len(df) - df = df[~df['ALT'].isin(exclude_alts)] - logging.debug(f"Excluded specified ALTs: {initial_count} -> {df.shape[0]} records") + is_gg = df['ALT'] == gg_alt_value + meets_gg_threshold = df['Depth_Score'] >= gg_depth_threshold + df = df[(~is_gg | meets_gg_threshold) & (~df['ALT'].isin(exclude_alts))] + logging.debug(f"Filtered 'GG' by depth and excluded ALTs: {initial_count} -> {len(df)} records") - # Step 3: Drop intermediate columns if they exist - drop_cols = [col for col in ['left', 'right'] if col in df.columns] + # STEP 3) Drop intermediate columns 'left' and 'right' if they exist + drop_cols = [col for col in ('left', 'right') if col in df.columns] if drop_cols: df = df.drop(columns=drop_cols) logging.debug(f"Dropped intermediate columns: {drop_cols}") From 957d2b4062255a48ce5010975afac3a0fba979a4 Mon Sep 17 00:00:00 2001 From: Bernt Popp Date: Mon, 13 Jan 2025 13:36:46 +0100 Subject: [PATCH 09/10] refactor: Update kestrel filter logic to output the complete df always fixes problems previously introduced with logging introduces High_star class which was previously missing Update report generation to handle new class Closes: #73 Closes: #74 Bump version to 2.0.0-alpha.55 --- vntyper/cli.py | 76 +++--- vntyper/config.json | 3 +- vntyper/scripts/cohort_summary.py | 7 +- vntyper/scripts/confidence_assignment.py | 156 +++++++----- vntyper/scripts/generate_report.py | 110 +++++++-- vntyper/scripts/kestrel_genotyping.py | 89 +++++-- vntyper/scripts/motif_processing.py | 289 +++++++++++------------ vntyper/scripts/pipeline.py | 4 +- vntyper/scripts/scoring.py | 150 +++--------- vntyper/scripts/variant_parsing.py | 44 ++-- vntyper/version.py | 2 +- 11 files changed, 474 insertions(+), 456 deletions(-) diff --git a/vntyper/cli.py b/vntyper/cli.py index ff108a0..fb08def 100644 --- a/vntyper/cli.py +++ b/vntyper/cli.py @@ -58,19 +58,6 @@ def main(): the subcommand. """ - # Load an initial config for CLI defaults. Use try/except in case loading fails. - try: - initial_config = load_config(None) - logging.debug("Initial configuration loaded successfully.") - except Exception: - initial_config = {} - logging.debug("Failed to load initial_config; using empty dictionary.") - - # Fallback lookups for CLI defaults - default_cli = initial_config.get("cli_defaults", {}) - default_log_level = default_cli.get("log_level", "INFO") - default_log_file = default_cli.get("log_file", None) - # Parent parser for global arguments parent_parser = argparse.ArgumentParser(add_help=False, conflict_handler='resolve') parent_parser.add_argument( @@ -364,7 +351,7 @@ def main(): # Parse all arguments first args = parser.parse_args() - # Display help if no command is provided + # If no command is provided, display help and exit if args.command is None: parser.print_help() sys.exit(0) @@ -372,7 +359,7 @@ def main(): # Load the main configuration based on the provided config path try: # IMPORTANT: Use `initial_config` as fallback when `--config-path` is not provided - config = load_config(args.config_path) if args.config_path else initial_config + config = load_config(args.config_path) if args.config_path else load_config(None) logging.debug("Configuration loaded successfully.") except Exception as exc: logging.critical(f"Failed to load configuration: {exc}") @@ -381,24 +368,33 @@ def main(): def get_conf(key, fallback): return config.get("default_values", {}).get(key, fallback) - # Setup base logging now (only once) - # Command-line arguments take precedence over config file + # Determine log level if args.log_level: log_level_value = getattr(logging, args.log_level.upper(), logging.INFO) else: - log_level_value = getattr(logging, get_conf("log_level", "INFO").upper(), logging.INFO) + log_level_value = getattr(logging, config.get("cli_defaults", {}).get("log_level", "INFO").upper(), logging.INFO) + # Determine log file if args.log_file: log_file_value = args.log_file else: - log_file_value = get_conf("log_file", None) + # If the command is 'pipeline' and output_dir is provided, set log_file accordingly + if args.command == "pipeline" and args.output_dir: + log_file_value = Path(args.output_dir) / "pipeline.log" + else: + log_file_value = config.get("cli_defaults", {}).get("log_file", None) - # Log the determined log level and log file - logging.debug(f"log_level_value is set to {log_level_value}") - logging.debug(f"log_file_value is set to {log_file_value}") + # Ensure that the log file directory exists + if log_file_value: + log_file_path = Path(log_file_value) + log_file_path.parent.mkdir(parents=True, exist_ok=True) + log_file_str = str(log_file_path) + else: + log_file_str = None - setup_logging(log_level=log_level_value, log_file=str(log_file_value) if log_file_value else None) - logging.debug(f"Logging has been set up with level {log_level_value} and log_file {log_file_value}") + # Setup logging now (only once) with the determined log level and log file + setup_logging(log_level=log_level_value, log_file=log_file_str) + logging.debug(f"Logging has been set up with level {log_level_value} and log_file {log_file_str}") # Log current logging handlers and their levels for handler in logging.getLogger().handlers: @@ -416,32 +412,17 @@ def get_conf(key, fallback): ) sys.exit(0) - # No need to reload config; it's already loaded correctly - # # ------------------------------------------------------------------------- # pipeline command # ------------------------------------------------------------------------- + # if args.command == "pipeline": - if not args.log_file: - if args.output_dir: - log_file = Path(args.output_dir) / "pipeline.log" - log_file.parent.mkdir(parents=True, exist_ok=True) - logging.debug(f"Setting log file to {log_file}") - # Update log_file in logging handlers - for handler in logging.getLogger().handlers: - if isinstance(handler, logging.FileHandler): - handler.baseFilename = str(log_file) - logging.debug(f"Updated handler baseFilename to {handler.baseFilename}") - else: - # Fallback to current dir if no output_dir - log_file = Path("pipeline.log") - logging.debug(f"Setting log file to {log_file}") - # Update log_file in logging handlers - for handler in logging.getLogger().handlers: - if isinstance(handler, logging.FileHandler): - handler.baseFilename = str(log_file) - logging.debug(f"Updated handler baseFilename to {handler.baseFilename}") + # If log_file was not explicitly provided and output_dir is set, ensure log_file is correctly set + if not args.log_file and args.output_dir: + log_file = Path(args.output_dir) / "pipeline.log" + log_file.parent.mkdir(parents=True, exist_ok=True) + logging.debug(f"Setting log file to {log_file}") if args.output_dir is None: args.output_dir = get_conf("output_dir", "out") @@ -472,13 +453,13 @@ def get_conf(key, fallback): 1 if (args.fastq1 or args.fastq2) else 0 ]) if input_types > 1: - parser_pipeline.error("Provide either BAM, CRAM, or FASTQ files (not multiples).") + parser.error("Provide either BAM, CRAM, or FASTQ files (not multiples).") logging.debug("Multiple input types detected.") sys.exit(1) if (not args.bam and not args.cram and (args.fastq1 is None or args.fastq2 is None)): - parser_pipeline.error( + parser.error( "When not providing BAM/CRAM, both --fastq1 and --fastq2 must " "be specified for paired-end sequencing." ) @@ -550,6 +531,7 @@ def get_conf(key, fallback): bed_file=args.bed_file, log_level=log_level_value, # Pass log_level to run_pipeline sample_name=sample_name_val, + log_file=log_file_str # Pass the correctly determined log_file ) # diff --git a/vntyper/config.json b/vntyper/config.json index a0524b0..159be96 100644 --- a/vntyper/config.json +++ b/vntyper/config.json @@ -5,14 +5,13 @@ "output_name": "processed", "archive_format": "zip", "report_file": "summary_report.html", - "log_file": null, "flanking": 50, "summary_file": "cohort_summary.html", "reference_assembly": "hg19" }, "cli_defaults": { "log_level": "INFO", - "log_file": null + "log_file": "pipeline.log" }, "api": { "base_url": "https://vntyper.org/api" diff --git a/vntyper/scripts/cohort_summary.py b/vntyper/scripts/cohort_summary.py index 9c6722f..2072a2b 100644 --- a/vntyper/scripts/cohort_summary.py +++ b/vntyper/scripts/cohort_summary.py @@ -79,7 +79,7 @@ def load_kestrel_results(kestrel_result_file): f'{x}' if x == 'Low_Precision' else f'{x}' - if x == 'High_Precision' + if x in ['High_Precision', 'High_Precision*'] # Updated to include 'High_Precision*' else f'{x}' if x == 'Negative' else x @@ -394,8 +394,9 @@ def generate_cohort_summary_report(output_dir, kestrel_df, advntr_df, summary_fi if 'Confidence' in kestrel_df.columns: try: kestrel_df_conf = kestrel_df['Confidence'].fillna('') - kestrel_positive = len(kestrel_df[kestrel_df_conf.str.contains('Low_Precision|High_Precision', na=False)]) - kestrel_negative = len(kestrel_df[~kestrel_df_conf.str.contains('Low_Precision|High_Precision', na=False)]) + # Updated to include 'High_Precision*' in positive counts + kestrel_positive = len(kestrel_df[kestrel_df_conf.str.contains('Low_Precision|High_Precision\\*?', na=False)]) + kestrel_negative = len(kestrel_df[~kestrel_df_conf.str.contains('Low_Precision|High_Precision\\*?', na=False)]) except Exception as e: logging.error(f"Error processing 'Confidence' values: {e}") kestrel_positive = 0 diff --git a/vntyper/scripts/confidence_assignment.py b/vntyper/scripts/confidence_assignment.py index 628d412..bbfbacb 100644 --- a/vntyper/scripts/confidence_assignment.py +++ b/vntyper/scripts/confidence_assignment.py @@ -32,84 +32,116 @@ def calculate_depth_score_and_assign_confidence(df: pd.DataFrame, kestrel_config """ Calculates Depth_Score for each variant and assigns a confidence label based on coverage thresholds defined in kestrel_config['confidence_assignment']. + + (Refactored) + - All rows remain in the DataFrame. + - A new boolean column 'depth_confidence_pass' is True if the row's + final Confidence is not 'Negative'. Args: df (pd.DataFrame): - DataFrame with columns: - - 'Estimated_Depth_AlternateVariant' (alt depth) - - 'Estimated_Depth_Variant_ActiveRegion' (region depth) + Must have numeric columns: + - 'Estimated_Depth_AlternateVariant' + - 'Estimated_Depth_Variant_ActiveRegion' kestrel_config (dict): - Contains subdict 'confidence_assignment', which has: - - 'depth_score_thresholds' - - 'alt_depth_thresholds' - - 'var_active_region_threshold' - - 'confidence_levels' + Must contain a subdict 'confidence_assignment' with keys: + - 'depth_score_thresholds' (e.g. {'low': 0.2, 'high': 0.4}) + - 'alt_depth_thresholds' (e.g. {'low': 5, 'mid_low': 10, 'mid_high': 20}) + - 'var_active_region_threshold' (e.g. 30) + - 'confidence_levels' (dict with keys like + 'low_precision', 'high_precision', 'high_precision_star') Returns: pd.DataFrame: - Same DataFrame with two new columns: + Same shape as input, with: - 'Depth_Score' (float) - - 'Confidence' (str: 'Low_Precision', 'High_Precision', etc.) + - 'Confidence' (str, e.g., 'Low_Precision', 'High_Precision', etc.) + - 'depth_confidence_pass' (bool; True if Confidence != 'Negative') """ logging.debug("Entering calculate_depth_score_and_assign_confidence") logging.debug(f"Initial DataFrame shape: {df.shape}") if df.empty: - logging.debug("DataFrame is empty. Exiting function.") + logging.debug("DataFrame is empty. Exiting function without changes.") return df - # Step 1: Convert depth columns to integers + # Extract relevant config subdict + conf_assign = kestrel_config.get('confidence_assignment', {}) + thresholds = conf_assign.get('depth_score_thresholds', {}) + alt_thresholds = conf_assign.get('alt_depth_thresholds', {}) + var_region_threshold = conf_assign.get('var_active_region_threshold', 0) + confidence_levels = conf_assign.get('confidence_levels', {}) + + # Fallback for confidence levels + low_prec_label = confidence_levels.get('low_precision', 'Low_Precision') + high_prec_label = confidence_levels.get('high_precision', 'High_Precision') + high_prec_star_label = confidence_levels.get('high_precision_star', 'High_Precision*') + + # Default to 0.2 for 'low' and 0.4 for 'high' if not specified + low_threshold = thresholds.get('low', 0.2) + high_threshold = thresholds.get('high', 0.4) + + # Default alt-depth thresholds (in case they are missing) + alt_low = alt_thresholds.get('low', 5) + alt_mid_low = alt_thresholds.get('mid_low', 10) + alt_mid_high = alt_thresholds.get('mid_high', 20) + + # Convert depth columns to integer (or float) for arithmetic depth_cols = ['Estimated_Depth_AlternateVariant', 'Estimated_Depth_Variant_ActiveRegion'] - df[depth_cols] = df[depth_cols].astype(int) - logging.debug("Converted depth columns to integers.") - - # Step 2: Calculate Depth_Score - df['Depth_Score'] = df['Estimated_Depth_AlternateVariant'] / df['Estimated_Depth_Variant_ActiveRegion'] - logging.debug("Calculated 'Depth_Score'.") - - # Step 3: Assign confidence - conf_assign = kestrel_config['confidence_assignment'] - thresholds = conf_assign['depth_score_thresholds'] - alt_thresholds = conf_assign['alt_depth_thresholds'] - var_region_threshold = conf_assign['var_active_region_threshold'] - confidence_levels = conf_assign['confidence_levels'] - - conditions = [ - # Condition 1: Low Precision - (df['Depth_Score'] <= thresholds['low']) | (df['Estimated_Depth_Variant_ActiveRegion'] <= var_region_threshold), - - # Condition 2: High Precision Star (Updated to include alt_depth >= mid_high) - (df['Estimated_Depth_AlternateVariant'] >= alt_thresholds['mid_high']) & - (df['Depth_Score'] >= thresholds['high']), - - # Condition 3: Low Precision - (df['Estimated_Depth_AlternateVariant'].between(alt_thresholds['mid_low'], alt_thresholds['mid_high'])) & - (df['Depth_Score'].between(thresholds['low'], thresholds['high'])), - - # Condition 4: Low Precision - (df['Estimated_Depth_AlternateVariant'] <= alt_thresholds['low']), - - # Condition 5: High Precision - (df['Estimated_Depth_AlternateVariant'].between(alt_thresholds['mid_low'], alt_thresholds['mid_high'], inclusive='left')) & - (df['Depth_Score'] >= thresholds['high']) - ] - - choices = [ - confidence_levels['low_precision'], - confidence_levels['high_precision_star'], # Updated to assign 'High_Precision*' - confidence_levels['low_precision'], - confidence_levels['low_precision'], - confidence_levels['high_precision'] - ] - - # Ensure 'high_precision_star' exists in confidence_levels - if 'high_precision_star' not in confidence_levels: - logging.error("'high_precision_star' not found in confidence_levels.") - raise KeyError("'high_precision_star' not found in confidence_levels.") - - df['Confidence'] = np.select(conditions, choices, default=confidence_levels.get('low_precision', 'Low_Precision')) - logging.debug("Assigned 'Confidence' labels based on conditions.") + for col in depth_cols: + df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0) + + # Step 2: Calculate Depth_Score (avoid division by zero -> np.nan) + with pd.option_context('mode.use_inf_as_na', True): + df['Depth_Score'] = df['Estimated_Depth_AlternateVariant'] / df['Estimated_Depth_Variant_ActiveRegion'] + + # Step 3: Assign Confidence + # We set a default of 'Negative' for all rows, then update them using np.select or logical conditions + df['Confidence'] = 'Negative' + + # Condition-based assignment logic + # Condition 1: Low Precision if Depth_Score <= low_threshold + # OR region depth <= var_region_threshold + cond1 = ( + (df['Depth_Score'] <= low_threshold) | + (df['Estimated_Depth_Variant_ActiveRegion'] <= var_region_threshold) + ) + + # Condition 2: High Precision STAR if alt depth >= alt_mid_high AND Depth_Score >= high_threshold + cond2 = ( + (df['Estimated_Depth_AlternateVariant'] >= alt_mid_high) & + (df['Depth_Score'] >= high_threshold) + ) + + # Condition 3: Low Precision if alt_depth is between mid_low and mid_high, + # and Depth_Score between low_threshold and high_threshold + cond3 = ( + df['Estimated_Depth_AlternateVariant'].between(alt_mid_low, alt_mid_high) & + df['Depth_Score'].between(low_threshold, high_threshold) + ) + + # Condition 4: Low Precision if alt depth <= alt_low + cond4 = (df['Estimated_Depth_AlternateVariant'] <= alt_low) + + # Condition 5: High Precision if alt_depth is between mid_low and mid_high + # and Depth_Score >= high_threshold + cond5 = ( + df['Estimated_Depth_AlternateVariant'].between(alt_mid_low, alt_mid_high, inclusive='left') & + (df['Depth_Score'] >= high_threshold) + ) + + # Apply the conditions in order of precedence + # to handle potential overlap (later conditions can overwrite earlier ones). + df.loc[cond1, 'Confidence'] = low_prec_label + df.loc[cond2, 'Confidence'] = high_prec_star_label + df.loc[cond3, 'Confidence'] = low_prec_label + df.loc[cond4, 'Confidence'] = low_prec_label + df.loc[cond5, 'Confidence'] = high_prec_label + + # Step 4: Mark pass/fail for coverage-based logic + # "Passing" means the Confidence is anything except 'Negative' + df['depth_confidence_pass'] = df['Confidence'] != 'Negative' - logging.debug(f"Final DataFrame shape: {df.shape}") logging.debug("Exiting calculate_depth_score_and_assign_confidence") + logging.debug(f"Final DataFrame shape: {df.shape}") return df diff --git a/vntyper/scripts/generate_report.py b/vntyper/scripts/generate_report.py index cc73a20..bb359ed 100644 --- a/vntyper/scripts/generate_report.py +++ b/vntyper/scripts/generate_report.py @@ -13,6 +13,11 @@ def load_kestrel_results(kestrel_result_file): + """ + Loads Kestrel results (kestrel_result.tsv) and adjusts the 'Confidence' column + to color-code values in HTML. Now also handles 'High_Precision*' by applying + the same styling as 'High_Precision'. + """ logging.info(f"Loading Kestrel results from {kestrel_result_file}") if not os.path.exists(kestrel_result_file): logging.warning(f"Kestrel result file not found: {kestrel_result_file}") @@ -34,12 +39,14 @@ def load_kestrel_results(kestrel_result_file): } df = df[list(columns_to_display.keys())] df = df.rename(columns=columns_to_display) + + # Handle Low_Precision in orange, High_Precision & High_Precision* in red df['Confidence'] = df['Confidence'].apply( lambda x: ( f'{x}' if x == 'Low_Precision' else f'{x}' - if x == 'High_Precision' + if x in ['High_Precision', 'High_Precision*'] else x ) ) @@ -53,6 +60,10 @@ def load_kestrel_results(kestrel_result_file): def load_advntr_results(advntr_result_file): + """ + Loads adVNTR results (output_adVNTR_result.tsv) if present. Returns + an empty DataFrame and a False flag if not found or parsing fails. + """ logging.info(f"Loading adVNTR results from {advntr_result_file}") if not os.path.exists(advntr_result_file): logging.warning(f"adVNTR result file not found: {advntr_result_file}") @@ -69,6 +80,30 @@ def load_advntr_results(advntr_result_file): return pd.DataFrame(), False +def load_pipeline_log(log_file): + """ + Loads the pipeline log content from the specified log_file. + Returns a placeholder string if not found or on error. + """ + logging.info(f"Loading pipeline log from {log_file}") + # If log_file is None or empty, just return a placeholder. + if not log_file: + logging.warning("No pipeline log file provided; skipping log loading.") + return "No pipeline log file was provided." + + # If log_file is not None, check if it exists: + if not os.path.exists(log_file): + logging.warning(f"Pipeline log file not found: {log_file}") + return "Pipeline log file not found." + + try: + with open(log_file, 'r') as f: + return f.read() + except Exception as e: + logging.error(f"Failed to read pipeline log file: {e}") + return "Failed to load pipeline log." + + def run_igv_report(bed_file, bam_file, fasta_file, output_html, flanking=50, vcf_file=None, config=None): """ Wrapper around `create_report` IGV command. If config is provided and flanking @@ -92,33 +127,25 @@ def run_igv_report(bed_file, bam_file, fasta_file, output_html, flanking=50, vcf fasta_file = str(fasta_file) if fasta_file else None output_html = str(output_html) if output_html else None - # We'll build the IGV command piece by piece, skipping None tracks igv_report_cmd = [ 'create_report', - bed_file, # The region/bed + bed_file, '--flanking', str(flanking), '--fasta', fasta_file, '--tracks' ] - # Collect tracks in a list, skipping any that are None tracks = [] if vcf_file: tracks.append(str(vcf_file)) if bam_file: tracks.append(str(bam_file)) - - # If you want to enforce at least one track, you can check here: if not tracks: logging.warning("No valid tracks (VCF or BAM) provided to IGV. The IGV report may be empty.") igv_report_cmd.extend(tracks) + igv_report_cmd.extend(['--output', output_html]) - igv_report_cmd.extend([ - '--output', output_html - ]) - - # Now run the command try: logging.info(f"Running IGV report: {' '.join([str(x) for x in igv_report_cmd if x])}") subprocess.run(igv_report_cmd, check=True) @@ -132,6 +159,11 @@ def run_igv_report(bed_file, bam_file, fasta_file, output_html, flanking=50, vcf def extract_igv_content(igv_report_html): + """ + Reads the generated IGV HTML report and extracts the IGV content, + the tableJson variable, and the sessionDictionary variable from the script. + Returns empty strings if not found or on error. + """ logging.debug(f"extract_igv_content called with igv_report_html={igv_report_html}") try: with open(igv_report_html, 'r') as f: @@ -165,6 +197,10 @@ def extract_igv_content(igv_report_html): def load_fastp_output(fastp_file): + """ + Loads fastp JSON output (e.g., output.json) for summary metrics if available. + Returns an empty dict if file not found or if parsing fails. + """ logging.debug(f"load_fastp_output called with fastp_file={fastp_file}") if not os.path.exists(fastp_file): logging.warning(f"fastp output file not found: {fastp_file}") @@ -183,6 +219,7 @@ def generate_summary_report( output_dir, template_dir, report_file, + log_file, bed_file=None, bam_file=None, fasta_file=None, @@ -197,17 +234,18 @@ def generate_summary_report( Generates a summary report. Args: - output_dir (Path): Output directory for the report. + output_dir (str): Output directory for the report. template_dir (str): Directory containing the report template. report_file (str): Name of the report file. - bed_file (Path, optional): Path to the BED file for IGV reports. - bam_file (Path, optional): Path to the BAM file for IGV reports. - fasta_file (Path, optional): Path to the reference FASTA file for IGV reports. + log_file (str): Path to the pipeline log file. + bed_file (str, optional): Path to the BED file for IGV reports. + bam_file (str, optional): Path to the BAM file for IGV reports. + fasta_file (str, optional): Path to the reference FASTA file for IGV reports. flanking (int, optional): Size of the flanking region for IGV reports. input_files (dict, optional): Dictionary of input filenames. pipeline_version (str, optional): The version of the VNtyper pipeline. mean_vntr_coverage (float, optional): Mean coverage over the VNTR region. - vcf_file (Path, optional): Path to the sorted and indexed VCF file. + vcf_file (str, optional): Path to the sorted and indexed VCF file. config (dict, optional): Configuration dictionary. Raises: @@ -216,7 +254,7 @@ def generate_summary_report( logging.debug("---- DEBUG: Entered generate_summary_report ----") logging.debug(f"Called with output_dir={output_dir}, template_dir={template_dir}, report_file={report_file}") logging.debug(f"bed_file={bed_file}, bam_file={bam_file}, fasta_file={fasta_file}, flanking={flanking}") - logging.debug(f"vcf_file={vcf_file}, mean_vntr_coverage={mean_vntr_coverage}") + logging.debug(f"log_file={log_file}, vcf_file={vcf_file}, mean_vntr_coverage={mean_vntr_coverage}") if config is None: raise ValueError("Config dictionary must be provided to generate_summary_report") @@ -227,6 +265,12 @@ def generate_summary_report( logging.debug(f"Absolute bed_file => {abs_bed_file}") logging.debug(f"Exists? => {os.path.exists(abs_bed_file)}") + # Debug checks for log_file existence + if log_file: + abs_log_file = os.path.abspath(log_file) + logging.debug(f"Absolute log_file => {abs_log_file}") + logging.debug(f"Exists? => {os.path.exists(abs_log_file)}") + if flanking == 50 and config is not None: flanking = config.get("default_values", {}).get("flanking", 50) @@ -266,17 +310,18 @@ def generate_summary_report( kestrel_df = load_kestrel_results(kestrel_result_file) advntr_df, advntr_available = load_advntr_results(advntr_result_file) + log_content = load_pipeline_log(log_file) - # Removed log_content since log_file is no longer handled here - igv_content, table_json, session_dictionary = ("", "", "") - + # If we did produce an IGV report, extract the content if igv_report_file and igv_report_file.exists(): igv_content, table_json, session_dictionary = extract_igv_content(igv_report_file) else: logging.warning("IGV report file not found. Skipping IGV content.") + igv_content, table_json, session_dictionary = "", "", "" fastp_data = load_fastp_output(fastp_file) + # Coverage status if mean_vntr_coverage is not None and mean_vntr_coverage < mean_vntr_cov_threshold: coverage_icon = '' coverage_color = 'red' @@ -311,6 +356,10 @@ def generate_summary_report( sequencing_str = summary.get("sequencing", "") def warn_icon(value, cutoff, higher_better=True): + """ + Returns an HTML icon/string and a color ('red' or 'green'), depending + on whether 'value' passes 'cutoff' under the 'higher_better' logic. + """ if value is None: return "", "" if higher_better: @@ -324,11 +373,13 @@ def warn_icon(value, cutoff, higher_better=True): else ('', 'green') ) + # Evaluate duplication, Q20, Q30, and passed filter rates dup_icon, dup_color = warn_icon(duplication_rate, dup_rate_cutoff, higher_better=False) q20_icon, q20_color = warn_icon(q20_rate, q20_rate_cutoff, higher_better=True) q30_icon, q30_color = warn_icon(q30_rate, q30_rate_cutoff, higher_better=True) pf_icon, pf_color = warn_icon(passed_filter_rate, passed_filter_rate_cutoff, higher_better=True) + # Convert Kestrel & adVNTR data to HTML kestrel_html = kestrel_df.to_html( classes='table table-bordered table-striped hover compact order-column table-sm', index=False, @@ -342,6 +393,7 @@ def warn_icon(value, cutoff, higher_better=True): else: advntr_html = None + # Prepare Jinja2 template env = Environment(loader=FileSystemLoader(template_dir)) try: template = env.get_template('report_template.html') @@ -349,14 +401,17 @@ def warn_icon(value, cutoff, higher_better=True): logging.error(f"Failed to load Jinja2 template: {e}") raise + # Summarize the results summary_text = build_screening_summary( kestrel_df, advntr_df, advntr_available, mean_vntr_coverage, mean_vntr_cov_threshold ) + + # Build the final context for rendering context = { 'kestrel_highlight': kestrel_html, 'advntr_highlight': advntr_html, 'advntr_available': advntr_available, - # 'log_content': log_content, # Removed + 'log_content': log_content, # Re-added pipeline log content 'igv_content': igv_content, 'table_json': table_json, 'session_dictionary': session_dictionary, @@ -382,15 +437,17 @@ def warn_icon(value, cutoff, higher_better=True): 'passed_filter_icon': pf_icon, 'passed_filter_color': pf_color, 'sequencing_str': sequencing_str, - 'summary_text': summary_text # **Uncommented to include in context** + 'summary_text': summary_text } + # Render the template try: rendered_html = template.render(context) except Exception as e: logging.error(f"Failed to render the report template: {e}") raise + # Write out the final HTML report report_file_path = Path(output_dir) / report_file try: with open(report_file_path, 'w') as f: @@ -422,10 +479,12 @@ def build_screening_summary(kestrel_df, advntr_df, advntr_available, mean_vntr_c def strip_html_tags(confidence_value): return re.sub(r"<[^>]*>", "", confidence_value or "") - # Check if there's any recognized Confidence + # We'll handle 'High_Precision*' same as 'High_Precision' or 'Low_Precision' if not kestrel_df.empty and "Confidence" in kestrel_df.columns: kestrel_confidences = kestrel_df["Confidence"].apply(strip_html_tags).dropna().unique().tolist() - kestrel_valid = any(conf in ("High_Precision", "Low_Precision") for conf in kestrel_confidences) + # Now includes 'High_Precision*' in recognized set + kestrel_valid = any(conf in ("High_Precision", "High_Precision*", "Low_Precision") + for conf in kestrel_confidences) else: kestrel_valid = False @@ -437,13 +496,12 @@ def strip_html_tags(confidence_value): logging.debug(f"adVNTR columns => {list(advntr_df.columns)}") logging.debug(f"adVNTR first row => {advntr_df.head(1).to_dict(orient='records')}") - # Check coverage status coverage_status = ( "above" if (mean_vntr_coverage is not None and mean_vntr_coverage >= mean_vntr_cov_threshold) else "below" ) - # Check if adVNTR p-value is available + # Possibly find p-value in adVNTR output if advntr_has_data: lower_cols = [c.lower() for c in advntr_df.columns] logging.debug(f"Lowercased adVNTR columns => {lower_cols}") diff --git a/vntyper/scripts/kestrel_genotyping.py b/vntyper/scripts/kestrel_genotyping.py index 2653275..a78e004 100644 --- a/vntyper/scripts/kestrel_genotyping.py +++ b/vntyper/scripts/kestrel_genotyping.py @@ -412,13 +412,6 @@ def process_kestrel_output( output_empty_result(output_dir, header) return None - # Write an intermediate TSV for debugging - pre_result_path = os.path.join(output_dir, "kestrel_pre_result.tsv") - with open(pre_result_path, 'w') as f: - f.write("\n".join(header) + "\n") - combined_df.to_csv(f, sep='\t', index=False) - logging.info(f"Intermediate results saved to {pre_result_path}") - # Write the final processed results final_output_path = os.path.join(output_dir, "kestrel_result.tsv") with open(final_output_path, 'w') as f: @@ -466,13 +459,13 @@ def process_kmer_results(combined_df, merged_motifs, output_dir, kestrel_config) """ Applies the main postprocessing heuristics: 1) Split the depth from the 'Sample' column and compute frame score - 2) Split frame score into numeric parts, filter out non-frameshifts - 3) Extract frameshift variants that meet the 3n+1 / 3n+2 logic - 4) Compute a Depth_Score = alt_depth / region_depth - - Then assign confidence: Low_Precision, High_Precision, etc. - 5) Filter by certain ALT values if needed (like removing "GG" below threshold) - 6) Motif correction & annotation (positions, edges, etc.) - 7) Optionally generate a BED file for coverage visualization + 2) Split frame score into numeric parts, mark frameshifts vs. non-frameshifts + 3) Extract frameshift variants (3n+1 / 3n+2) + 4) Compute Depth_Score, assign confidence + 5) ALT-based filtering logic (e.g., 'GG' threshold) + 6) Motif correction & annotation + 7) Optionally generate a BED file for coverage + 8) Finally, filter out rows that fail any relevant boolean filter columns References: - Saei et al., iScience 26, 107171 (2023) for empirical cutoffs @@ -499,7 +492,7 @@ def process_kmer_results(combined_df, merged_motifs, output_dir, kestrel_config) if df.empty: return df - # (2) Split frame score into numeric 'left'/'right', remove non-frameshifts + # (2) Split frame score into numeric 'direction'/'frameshift_amount' df = split_frame_score(df) if df.empty: return df @@ -524,7 +517,13 @@ def process_kmer_results(combined_df, merged_motifs, output_dir, kestrel_config) if df.empty: return df - # (7) Generate an optional BED file for IGV coverage (not used downstream yet) + # (7) Final Filter + df = filter_final_dataframe(df, output_dir) + if df.empty: + logging.info("All rows failed one or more filter criteria. Returning empty.") + return df + + # (8) Now generate the BED file from the fully filtered result bed_file_path = generate_bed_file(df, output_dir) if bed_file_path: logging.info(f"BED file created at: {bed_file_path}") @@ -568,3 +567,61 @@ def generate_bed_file(df, output_dir): logging.info(f"BED file generated at: {bed_file_path}") return bed_file_path + + +def filter_final_dataframe(df: pd.DataFrame, output_dir: str) -> pd.DataFrame: + """ + Final step: filter the DataFrame based on the boolean columns introduced + by earlier steps (e.g., 'is_frameshift', 'is_valid_frameshift', + 'depth_confidence_pass', 'alt_filter_pass', 'motif_filter_pass'). + + We keep rows where *all existing* filter columns are True. + If a filter column does not exist in the DataFrame, we ignore it. + + Additionally, this function logs how many rows pass or fail each filter + and writes the unfiltered DataFrame to 'kestrel_pre_result.tsv' directly + in the 'output_dir'. The final filtered DataFrame is returned in memory. + + Args: + df (pd.DataFrame): The postprocessed DataFrame, with various + boolean filter columns. + output_dir (str): Path to the main output directory. + + Returns: + pd.DataFrame: A copy of `df` containing only rows that pass + all available filter columns. + """ + logging.info("Starting final filter of DataFrame with %d rows...", len(df)) + + # Write the unfiltered DataFrame to 'kestrel_pre_result.tsv' in output_dir + pre_result_path = os.path.join(output_dir, "kestrel_pre_result.tsv") + df.to_csv(pre_result_path, sep="\t", index=False) + logging.info("Wrote pre-filter DataFrame to %s", pre_result_path) + + # Columns we will require to be True if they exist + filter_cols = [ + 'is_frameshift', + 'is_valid_frameshift', + 'depth_confidence_pass', + 'alt_filter_pass', + 'motif_filter_pass', + ] + + # Build a mask requiring all existing boolean filters == True + final_mask = pd.Series(True, index=df.index) + for col in filter_cols: + if col in df.columns: + before_count = final_mask.sum() + final_mask &= df[col] + after_count = final_mask.sum() + logging.info( + "Filter column '%s' exists; %d -> %d rows remain after requiring True.", + col, before_count, after_count + ) + else: + logging.info("Filter column '%s' not found; skipping.", col) + + filtered_df = df[final_mask].copy() + logging.info("Final DataFrame has %d rows after all filters.", len(filtered_df)) + + return filtered_df diff --git a/vntyper/scripts/motif_processing.py b/vntyper/scripts/motif_processing.py index 434afe6..03e7e54 100644 --- a/vntyper/scripts/motif_processing.py +++ b/vntyper/scripts/motif_processing.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 # vntyper/scripts/motif_processing.py + """ motif_processing.py @@ -7,7 +8,7 @@ --------------- Handles all logic for working with MUC1 VNTR motifs: - Loading references from FASTA - - Preprocessing insertion/deletion data by merging them with motif info + - Preprocessing insertion/deletion variants by merging them with motif info - Performing final motif correction & annotation based on the Kestrel config thresholds (position_threshold, exclude lists). @@ -149,36 +150,28 @@ def motif_correction_and_annotation(df, merged_motifs, kestrel_config): """ Final step of motif annotation: correct positions for left/right motifs, drop duplicates, handle special cases (e.g., 'GG' alt on the right motif). - - Steps: - 1) Split 'Motifs' column into 'Motif_left'/'Motif_right' if it has a dash. - 2) Filter by 'POS' to decide which side is left vs. right, referencing - the 'position_threshold' from kestrel_config. - 3) Merge with the additional motifs for annotation. - 4) Apply exclude lists for certain motifs or ALTs (e.g., 'exclude_motifs_right'). - 5) Adjust 'POS' for the right side so it's zero-based from the threshold. - - References: - - Saei et al., iScience 26, 107171 (2023) - - Args: - df (pd.DataFrame): DataFrame with columns (REF, ALT, Motifs, etc.). - merged_motifs (pd.DataFrame): Additional motif data (from load_additional_motifs). - kestrel_config (dict): Contains 'motif_filtering' subdict with thresholds. - - Returns: - pd.DataFrame: - Updated DataFrame with final annotated/filtered results. May be empty - if all variants are excluded. + + (Refactored): + - Returns the same shape as `df`, + - Adds 'motif_filter_pass' boolean, + - Ensures 'Motif_fasta', 'POS_fasta', and 'Motif' columns + exist in the final output, even for failing rows. """ logging.debug("Entering motif_correction_and_annotation") logging.debug(f"Initial row count: {len(df)}, columns: {df.columns.tolist()}") - # Exit early if empty if df.empty: logging.debug("DataFrame is empty. Exiting motif_correction_and_annotation.") + df['motif_filter_pass'] = False + df['Motif_fasta'] = pd.NA + df['POS_fasta'] = pd.NA + df['Motif'] = pd.NA return df + # Keep a copy to attach pass/fail + original_df = df.copy(deep=True) + original_df['original_index'] = original_df.index + mf = kestrel_config['motif_filtering'] position_threshold = mf.get('position_threshold', 60) exclude_motifs_right = mf.get('exclude_motifs_right', []) @@ -187,135 +180,97 @@ def motif_correction_and_annotation(df, merged_motifs, kestrel_config): exclude_alts_combined = mf.get('exclude_alts_combined', []) exclude_motifs_combined = mf.get('exclude_motifs_combined', []) - # Step 1) Rename to 'Motif_fasta' + split 'Motifs' - pre_split_rows = len(df) - pre_split_cols = df.columns.tolist() - if 'Motifs' in df.columns: - df['Motif_fasta'] = df['Motifs'] + # =============== Original Logic =============== + working_df = original_df.copy(deep=True) - if df['Motifs'].str.count('-').max() == 1: - df[['Motif_left', 'Motif_right']] = df['Motifs'].str.split('-', expand=True) + # Step 1) Ensure 'Motif_fasta'; check for dash + if 'Motifs' in working_df.columns: + working_df['Motif_fasta'] = working_df['Motifs'] else: - logging.error("Unexpected format in 'Motifs' column: cannot split as 'left-right'.") - return pd.DataFrame() - - logging.debug("After splitting 'Motifs' into left/right:") - logging.debug(f"Changed from {pre_split_rows} rows, {pre_split_cols} columns") - logging.debug(f"To {len(df)} rows, {df.columns.tolist()} columns") - - # Step 2) Separate into motif_left, motif_right by POS threshold - df['POS'] = df['POS'].astype(int) - pre_filter_rows = len(df) - pre_filter_cols = df.columns.tolist() - motif_left = df[df['POS'] < position_threshold].copy() - motif_right = df[df['POS'] >= position_threshold].copy() - logging.debug("After splitting into left/right sub-DataFrames by POS threshold:") - logging.debug(f"Original had {pre_filter_rows} rows, columns: {pre_filter_cols}") - logging.debug(f"motif_left has {len(motif_left)} rows, motif_right has {len(motif_right)} rows") - - # Step 3) Merge + filter left side - if not motif_left.empty: - pre_left_merge_rows = len(motif_left) - pre_left_merge_cols = motif_left.columns.tolist() - motif_left.rename(columns={'Motif_right': 'Motif'}, inplace=True) - motif_left.drop(['Motif_sequence'], axis=1, inplace=True) # if it exists - motif_left = motif_left.merge(merged_motifs, on='Motif', how='left') - logging.debug("After merging left motif with 'merged_motifs':") - logging.debug(f"Changed from {pre_left_merge_rows} rows, {pre_left_merge_cols} columns") - logging.debug(f"To {len(motif_left)} rows, {motif_left.columns.tolist()} columns") - - keep_cols = [ - 'Motif', - 'Motif_fasta', - 'Variant', - 'POS', - 'REF', - 'ALT', - 'Motif_sequence', - 'Estimated_Depth_AlternateVariant', - 'Estimated_Depth_Variant_ActiveRegion', - 'Depth_Score', - 'Confidence', - ] - motif_left = motif_left[keep_cols] - pre_left_filter_rows = len(motif_left) - - # Sort by depth, drop duplicate ALTs but DO NOT do a final .tail(1) - motif_left = motif_left.sort_values( - ["Depth_Score", "POS"], ascending=[False, False] - ) + logging.error("Missing 'Motifs' column. Old code returns empty.") + combined_df = pd.DataFrame(columns=working_df.columns) + pass - # Drop duplicates on ALT, keep first in this order: - motif_left = motif_left.drop_duplicates("ALT", keep="first") - - - - logging.debug("After final filtering on left motif variants:") - logging.debug(f"Changed from {pre_left_filter_rows} rows to {len(motif_left)} rows") - - # Step 4) Merge + filter right side - if not motif_right.empty: - pre_right_merge_rows = len(motif_right) - pre_right_merge_cols = motif_right.columns.tolist() - motif_right.rename(columns={'Motif_left': 'Motif'}, inplace=True) - motif_right.drop(['Motif_sequence'], axis=1, inplace=True) # if it exists - motif_right = motif_right.merge(merged_motifs, on='Motif', how='left') - logging.debug("After merging right motif with 'merged_motifs':") - logging.debug(f"Changed from {pre_right_merge_rows} rows, {pre_right_merge_cols} columns") - logging.debug(f"To {len(motif_right)} rows, {motif_right.columns.tolist()} columns") - - keep_cols = [ - 'Motif', - 'Motif_fasta', - 'Variant', - 'POS', - 'REF', - 'ALT', - 'Motif_sequence', - 'Estimated_Depth_AlternateVariant', - 'Estimated_Depth_Variant_ActiveRegion', - 'Depth_Score', - 'Confidence', - ] - motif_right = motif_right[keep_cols] - - # Special handling for 'GG' on the right - pre_right_gg_rows = len(motif_right) - if motif_right['ALT'].str.contains(r'\b' + alt_for_motif_right_gg + r'\b').any(): - motif_right = motif_right.loc[~motif_right['Motif'].isin(exclude_motifs_right)] - motif_right = motif_right.loc[motif_right['ALT'] == alt_for_motif_right_gg] - motif_right = ( - motif_right.sort_values('Depth_Score', ascending=False) - .drop_duplicates('ALT', keep='first') - ) - if motif_right['Motif'].isin(motifs_for_alt_gg).any(): - motif_right = motif_right[motif_right['Motif'].isin(motifs_for_alt_gg)] - else: - motif_right = ( - motif_right.sort_values('Depth_Score', ascending=False) - .drop_duplicates('ALT', keep='first') - ) - - logging.debug("After handling 'GG' alt on right motif:") - logging.debug(f"Changed from {pre_right_gg_rows} rows to {len(motif_right)} rows") - - pre_right_dedup_rows = len(motif_right) - motif_right.drop_duplicates(subset=['REF', 'ALT'], inplace=True) - logging.debug("After dropping duplicates on right motif:") - logging.debug(f"Changed from {pre_right_dedup_rows} rows to {len(motif_right)} rows") - - # Step 5) Combine left/right, exclude certain ALTs/motifs - pre_combine_rows = (0 if motif_left.empty else len(motif_left)) + (0 if motif_right.empty else len(motif_right)) - combined_df = pd.concat([motif_right, motif_left], axis=0) - combined_df = combined_df[~combined_df['ALT'].isin(exclude_alts_combined)] - combined_df = combined_df[~combined_df['Motif'].isin(exclude_motifs_combined)] - logging.debug("After combining left/right and excluding ALTs/Motifs:") - logging.debug(f"Combined row count: {pre_combine_rows}, now {len(combined_df)} rows") - - # Step 6) Adjust POS for the right side - pre_pos_adjust_rows = len(combined_df) - combined_df['POS'] = combined_df['POS'].astype(int) - if 'POS' in combined_df.columns: + if 'Motifs' not in working_df.columns or working_df['Motifs'].str.count('-').max() != 1: + logging.error("Cannot split 'Motifs' into left-right. Old code returns empty.") + combined_df = pd.DataFrame(columns=working_df.columns) + else: + working_df[['Motif_left', 'Motif_right']] = working_df['Motifs'].str.split('-', expand=True) + working_df['POS'] = pd.to_numeric(working_df['POS'], errors='coerce').fillna(-1).astype(int) + + # Left vs. Right + motif_left = working_df[working_df['POS'] < position_threshold].copy() + motif_right = working_df[working_df['POS'] >= position_threshold].copy() + + # Merge + filter left + if not motif_left.empty: + motif_left.rename(columns={'Motif_right': 'Motif'}, inplace=True) + if 'Motif_sequence' in motif_left.columns: + motif_left.drop(columns=['Motif_sequence'], inplace=True, errors='ignore') + motif_left = motif_left.merge(merged_motifs, on='Motif', how='left') + + keep_cols = [ + 'Motif', + 'Motif_fasta', + 'Variant', + 'POS', + 'REF', + 'ALT', + 'Motif_sequence', + 'Estimated_Depth_AlternateVariant', + 'Estimated_Depth_Variant_ActiveRegion', + 'Depth_Score', + 'Confidence', + 'original_index', + ] + motif_left = motif_left[keep_cols] + motif_left.sort_values(['Depth_Score', 'POS'], ascending=[False, False], inplace=True) + motif_left.drop_duplicates('ALT', keep='first', inplace=True) + + # Merge + filter right + if not motif_right.empty: + motif_right.rename(columns={'Motif_left': 'Motif'}, inplace=True) + if 'Motif_sequence' in motif_right.columns: + motif_right.drop(columns=['Motif_sequence'], inplace=True, errors='ignore') + motif_right = motif_right.merge(merged_motifs, on='Motif', how='left') + + keep_cols = [ + 'Motif', + 'Motif_fasta', + 'Variant', + 'POS', + 'REF', + 'ALT', + 'Motif_sequence', + 'Estimated_Depth_AlternateVariant', + 'Estimated_Depth_Variant_ActiveRegion', + 'Depth_Score', + 'Confidence', + 'original_index', + ] + motif_right = motif_right[keep_cols] + + # 'GG' logic + if motif_right['ALT'].str.contains(r'\b' + alt_for_motif_right_gg + r'\b').any(): + motif_right = motif_right[~motif_right['Motif'].isin(exclude_motifs_right)] + motif_right = motif_right[motif_right['ALT'] == alt_for_motif_right_gg] + motif_right.sort_values('Depth_Score', ascending=False, inplace=True) + motif_right.drop_duplicates('ALT', keep='first', inplace=True) + if motif_right['Motif'].isin(motifs_for_alt_gg).any(): + motif_right = motif_right[motif_right['Motif'].isin(motifs_for_alt_gg)] + else: + motif_right.sort_values('Depth_Score', ascending=False, inplace=True) + motif_right.drop_duplicates('ALT', keep='first', inplace=True) + + motif_right.drop_duplicates(subset=['REF', 'ALT'], inplace=True) + + # Combine + combined_df = pd.concat([motif_right, motif_left], axis=0, ignore_index=True) + combined_df = combined_df[~combined_df['ALT'].isin(exclude_alts_combined)] + combined_df = combined_df[~combined_df['Motif'].isin(exclude_motifs_combined)] + + # Adjust POS => create POS_fasta + combined_df['POS'] = pd.to_numeric(combined_df['POS'], errors='coerce').fillna(-1).astype(int) combined_df['POS_fasta'] = combined_df['POS'] combined_df.update( combined_df['POS'].mask( @@ -323,10 +278,36 @@ def motif_correction_and_annotation(df, merged_motifs, kestrel_config): lambda x: x - position_threshold ) ) - logging.debug("After adjusting 'POS' for right side variants:") - logging.debug(f"Changed from {pre_pos_adjust_rows} rows to {len(combined_df)} rows") - logging.debug(f"Columns are now: {combined_df.columns.tolist()}") + # =============== End Original Logic =============== + + # Mark pass/fail based on original_index + pass_mask = original_df['original_index'].isin(combined_df.get('original_index', [])) + original_df['motif_filter_pass'] = pass_mask + + # Ensure final columns exist in the main DF even for failing rows + # (They remain NaN if the row didn't pass.) + if 'Motif_fasta' not in original_df.columns: + original_df['Motif_fasta'] = pd.NA + if 'POS_fasta' not in original_df.columns: + original_df['POS_fasta'] = pd.NA + if 'Motif' not in original_df.columns: + original_df['Motif'] = pd.NA + + # For rows that "passed", copy over the final POS_fasta, Motif_fasta, and Motif + combined_df = combined_df.set_index('original_index', drop=False) + for idx in combined_df.index: + if idx in original_df.index: + # Copy Motif_fasta / POS_fasta + original_df.at[idx, 'Motif_fasta'] = combined_df.at[idx, 'Motif_fasta'] + if 'POS_fasta' in combined_df.columns: + original_df.at[idx, 'POS_fasta'] = combined_df.at[idx, 'POS_fasta'] + # Also copy 'Motif' + if 'Motif' in combined_df.columns: + original_df.at[idx, 'Motif'] = combined_df.at[idx, 'Motif'] + + # Drop the temporary original_index column + original_df.drop(columns=['original_index'], inplace=True, errors='ignore') logging.debug("Exiting motif_correction_and_annotation") - logging.debug(f"Final row count: {len(combined_df)}, columns: {combined_df.columns.tolist()}") - return combined_df + logging.debug(f"Final row count: {len(original_df)}, columns: {original_df.columns.tolist()}") + return original_df diff --git a/vntyper/scripts/pipeline.py b/vntyper/scripts/pipeline.py index 8eac756..17ff847 100644 --- a/vntyper/scripts/pipeline.py +++ b/vntyper/scripts/pipeline.py @@ -70,7 +70,8 @@ def run_pipeline( custom_regions=None, bed_file=None, log_level=logging.INFO, - sample_name=None + sample_name=None, + log_file=None ): """ Main pipeline function that orchestrates the genotyping process. @@ -475,6 +476,7 @@ def run_pipeline( output_dir, template_dir, report_file, + log_file, bed_file=bed_out, bam_file=bam_out, fasta_file=fasta_reference, diff --git a/vntyper/scripts/scoring.py b/vntyper/scripts/scoring.py index 861f038..25dd93e 100644 --- a/vntyper/scripts/scoring.py +++ b/vntyper/scripts/scoring.py @@ -12,10 +12,13 @@ 1. Split the 'Sample' column (colon-delimited) into relevant depths. 2. Calculate the frame score = (alt_len - ref_len) / 3, used to detect frameshift vs. non-frameshift. -3. Filter out non-frameshift variants. +3. (Refactored) Instead of filtering out non-frameshift variants, + we now add a boolean column indicating whether it is frameshift. 4. Further split the frame score to identify insertion/deletion patterns - (like 3n+1 vs 3n+2). -5. Return the filtered DataFrame that should then move on to confidence assignment. + (like 3n+1 vs 3n+2). Rather than dropping columns or filtering, + we add boolean columns that mark relevant conditions. +5. Return the DataFrame with added columns. Final filtering + is deferred to a later step. References: ----------- @@ -44,8 +47,8 @@ def split_depth_and_calculate_frame_score(df: pd.DataFrame) -> pd.DataFrame: Returns: pd.DataFrame: - Adds 'Frame_Score' and keeps only frameshift - (non-multiple of 3) variants. The others are filtered out. + Adds 'Frame_Score' and a boolean column 'is_frameshift'. + Does NOT filter out rows. """ logging.debug("Entering split_depth_and_calculate_frame_score") logging.debug(f"Initial row count: {len(df)}, columns: {df.columns.tolist()}") @@ -54,9 +57,7 @@ def split_depth_and_calculate_frame_score(df: pd.DataFrame) -> pd.DataFrame: logging.debug("DataFrame is empty. Exiting split_depth_and_calculate_frame_score.") return df - # Step 1) Split the Sample column into 3 parts - pre_split_rows = len(df) - pre_split_cols = df.columns.tolist() + # Step 1) Split 'Sample' into 3 parts split_columns = df['Sample'].str.split(':', expand=True) split_columns.columns = [ 'Del', @@ -64,53 +65,14 @@ def split_depth_and_calculate_frame_score(df: pd.DataFrame) -> pd.DataFrame: 'Estimated_Depth_Variant_ActiveRegion' ] df = pd.concat([df, split_columns], axis=1) - logging.debug("After splitting 'Sample':") - logging.debug(f"Changed from {pre_split_rows} rows, {pre_split_cols} columns") - logging.debug(f"To {len(df)} rows, {df.columns.tolist()} columns") - - # Step 2) Keep only necessary columns in a new DataFrame - pre_select_rows = len(df) - pre_select_cols = df.columns.tolist() - necessary_columns = [ - 'Motifs', - 'Variant', - 'POS', - 'REF', - 'ALT', - 'Motif_sequence', - 'Estimated_Depth_AlternateVariant', - 'Estimated_Depth_Variant_ActiveRegion', - ] - df = df[necessary_columns].copy() - logging.debug("After selecting necessary columns:") - logging.debug(f"Changed from {pre_select_rows} rows, {pre_select_cols} columns") - logging.debug(f"To {len(df)} rows, {df.columns.tolist()} columns") - - # Step 3) Compute lengths and frame score - pre_frame_rows = len(df) - pre_frame_cols = df.columns.tolist() + + # Step 2) Compute lengths and frame score df["ref_len"] = df["REF"].str.len() df["alt_len"] = df["ALT"].str.len() df["Frame_Score"] = (df["alt_len"] - df["ref_len"]) / 3 - logging.debug("After computing 'Frame_Score':") - logging.debug(f"Changed from {pre_frame_rows} rows, {pre_frame_cols} columns") - logging.debug(f"To {len(df)} rows, {df.columns.tolist()} columns") - # Step 4) Mark frameshift variants - pre_truefalse_rows = len(df) - pre_truefalse_cols = df.columns.tolist() + # Step 3) Mark frameshift in a new boolean column df["is_frameshift"] = ((df["alt_len"] - df["ref_len"]) % 3 != 0) - logging.debug("After marking frameshift variants:") - logging.debug(f"Changed from {pre_truefalse_rows} rows, {pre_truefalse_cols} columns") - logging.debug(f"To {len(df)} rows, {df.columns.tolist()} columns") - - # Keep only frameshift variants - pre_filter_rows = len(df) - pre_filter_cols = df.columns.tolist() - df = df[df["is_frameshift"]].copy() - logging.debug("After filtering out non-frameshift variants:") - logging.debug(f"Changed from {pre_filter_rows} rows, {pre_filter_cols} columns") - logging.debug(f"To {len(df)} rows, {df.columns.tolist()} columns") logging.debug("Exiting split_depth_and_calculate_frame_score") logging.debug(f"Final row count: {len(df)}, columns: {df.columns.tolist()}") @@ -120,20 +82,21 @@ def split_depth_and_calculate_frame_score(df: pd.DataFrame) -> pd.DataFrame: def split_frame_score(df: pd.DataFrame) -> pd.DataFrame: """ Splits 'Frame_Score' into 'direction' and 'frameshift_amount' - for further logic in identifying insertion vs. deletion frameshifts. + for further logic (3n+1 vs. 3n+2, etc.). Steps: 1) Determine the direction based on the sign of (alt_len - ref_len). - 2) Calculate frameshift_amount as abs(alt_len - ref_len) % 3. - 3) Adjust direction if necessary. - 4) Drop intermediate columns. + 2) Calculate frameshift_amount as abs((alt_len - ref_len) % 3). + 3) (Refactored) We do not drop columns used for intermediate steps. Args: df (pd.DataFrame): Should contain 'Frame_Score', 'is_frameshift', 'ref_len', 'alt_len'. Returns: - pd.DataFrame: Now has columns 'direction' and 'frameshift_amount' for analyzing frames. + pd.DataFrame: + Adds 'direction' and 'frameshift_amount'. + Retains all rows and intermediate columns for debugging. """ logging.debug("Entering split_frame_score") logging.debug(f"Initial row count: {len(df)}, columns: {df.columns.tolist()}") @@ -142,39 +105,11 @@ def split_frame_score(df: pd.DataFrame) -> pd.DataFrame: logging.debug("DataFrame is empty. Exiting split_frame_score.") return df - # Step 1) Determine the direction - pre_direction_rows = len(df) - pre_direction_cols = df.columns.tolist() + # Step 1) Determine direction df["direction"] = np.sign(df["alt_len"] - df["ref_len"]) - logging.debug("After determining 'direction':") - logging.debug(f"Changed from {pre_direction_rows} rows, {pre_direction_cols} columns") - logging.debug(f"To {len(df)} rows, {df.columns.tolist()} columns") # Step 2) Calculate frameshift_amount - pre_frameshift_rows = len(df) - pre_frameshift_cols = df.columns.tolist() df["frameshift_amount"] = (df["alt_len"] - df["ref_len"]).abs() % 3 - logging.debug("After calculating 'frameshift_amount':") - logging.debug(f"Changed from {pre_frameshift_rows} rows, {pre_frameshift_cols} columns") - logging.debug(f"To {len(df)} rows, {df.columns.tolist()} columns") - - # Step 3) Adjust direction if necessary (optional based on original logic) - # In the original code, '-0' was replaced with '-1', but since (alt_len - ref_len) cannot be zero here - # because is_frameshift is True, this step might be redundant. Including for completeness. - pre_adjust_rows = len(df) - pre_adjust_cols = df.columns.tolist() - df["direction"] = df["direction"].replace(0, -1) - logging.debug("After adjusting 'direction':") - logging.debug(f"Changed from {pre_adjust_rows} rows, {pre_adjust_cols} columns") - logging.debug(f"To {len(df)} rows, {df.columns.tolist()} columns") - - # Step 4) Drop intermediate columns - pre_drop_rows = len(df) - pre_drop_cols = df.columns.tolist() - df.drop(['is_frameshift', 'ref_len', 'alt_len'], axis=1, inplace=True) - logging.debug("After dropping intermediate columns:") - logging.debug(f"Changed from {pre_drop_rows} rows, {pre_drop_cols} columns") - logging.debug(f"To {len(df)} rows, {df.columns.tolist()} columns") logging.debug("Exiting split_frame_score") logging.debug(f"Final row count: {len(df)}, columns: {df.columns.tolist()}") @@ -187,9 +122,11 @@ def extract_frameshifts(df: pd.DataFrame) -> pd.DataFrame: Steps: - For insertion frameshifts: direction > 0 & frameshift_amount == 1 - (3n+1 logic). - For deletion frameshifts: direction < 0 & frameshift_amount == 2 - (3n+2 logic). + + (Refactored) We do NOT remove non-matching rows here. Instead, we add a + boolean column 'is_valid_frameshift' to indicate whether a row meets one + of the frameshift patterns. Args: df (pd.DataFrame): @@ -197,7 +134,8 @@ def extract_frameshifts(df: pd.DataFrame) -> pd.DataFrame: Returns: pd.DataFrame: - Subset of frameshift variants meeting insertion or deletion patterns. + Adds 'is_valid_frameshift' (bool). + Keeps all rows. """ logging.debug("Entering extract_frameshifts") logging.debug(f"Initial row count: {len(df)}, columns: {df.columns.tolist()}") @@ -206,36 +144,12 @@ def extract_frameshifts(df: pd.DataFrame) -> pd.DataFrame: logging.debug("DataFrame is empty. Exiting extract_frameshifts.") return df - # Identify insertion frameshifts: direction > 0 and frameshift_amount == 1 - pre_ins_rows = len(df) - pre_ins_cols = df.columns.tolist() - ins = df[ - (df["direction"] > 0) & - (df["frameshift_amount"] == 1) - ] - logging.debug("After identifying insertion frameshifts:") - logging.debug( - f"Sliced from {pre_ins_rows} rows, {pre_ins_cols} columns to {len(ins)} rows." - ) - - # Identify deletion frameshifts: direction < 0 and frameshift_amount == 2 - pre_del_rows = len(df) - pre_del_cols = df.columns.tolist() - del_ = df[ - (df["direction"] < 0) & - (df["frameshift_amount"] == 2) - ] - logging.debug("After identifying deletion frameshifts:") - logging.debug( - f"Sliced from {pre_del_rows} rows, {pre_del_cols} columns to {len(del_)} rows." - ) - - # Combine insertion and deletion frameshifts - combined = pd.concat([ins, del_], axis=0).reset_index(drop=True) - logging.debug("After concatenating insertion and deletion frameshifts:") - logging.debug( - f"Resulting row count: {len(combined)}, columns: {combined.columns.tolist()}" - ) + # Identify insertion vs deletion frameshifts + condition_insertion = (df["direction"] > 0) & (df["frameshift_amount"] == 1) + condition_deletion = (df["direction"] < 0) & (df["frameshift_amount"] == 2) + + df["is_valid_frameshift"] = condition_insertion | condition_deletion logging.debug("Exiting extract_frameshifts") - return combined + logging.debug(f"Final row count: {len(df)}, columns: {df.columns.tolist()}") + return df diff --git a/vntyper/scripts/variant_parsing.py b/vntyper/scripts/variant_parsing.py index b7327ed..232a75c 100644 --- a/vntyper/scripts/variant_parsing.py +++ b/vntyper/scripts/variant_parsing.py @@ -75,12 +75,9 @@ def filter_by_alt_values_and_finalize(df: pd.DataFrame, kestrel_config: dict) -> Applies final filtering rules based on ALT values, e.g., removing certain ALTs or requiring a minimal Depth_Score if ALT='GG'. - Steps: - 1) If 'GG' is present in the ALT column, only keep those rows if - Depth_Score >= 'gg_depth_score_threshold'. - 2) Exclude any ALTs in 'exclude_alts'. - 3) Drop 'left' and 'right' columns from earlier frame splitting - to finalize the output. + (Refactored) + - Instead of removing rows, add a boolean column 'alt_filter_pass' + that marks whether each row passes these checks. Args: df (pd.DataFrame): @@ -92,45 +89,40 @@ def filter_by_alt_values_and_finalize(df: pd.DataFrame, kestrel_config: dict) -> - 'exclude_alts' Returns: - pd.DataFrame: Filtered, finalized DataFrame ready for final steps. + pd.DataFrame: + Same shape as input, with a new 'alt_filter_pass' column (bool). + Intermediate columns like 'left', 'right' are no longer dropped; + they remain for debugging. """ logging.debug("Entering filter_by_alt_values_and_finalize") logging.debug(f"Initial DataFrame shape: {df.shape}") - # Early exit if empty if df.empty: - logging.debug("DataFrame is empty. Exiting function.") + logging.debug("DataFrame is empty. Exiting filter_by_alt_values_and_finalize.") return df - # Validate required columns required_columns = {'ALT', 'Depth_Score'} missing_columns = required_columns - set(df.columns) if missing_columns: logging.error(f"Missing required columns: {missing_columns}") raise KeyError(f"Missing required columns: {missing_columns}") - # Fetch filtering parameters from config alt_filter = kestrel_config.get('alt_filtering', {}) gg_alt_value = alt_filter.get('gg_alt_value', 'GG') gg_depth_threshold = alt_filter.get('gg_depth_score_threshold', 0.0) exclude_alts = alt_filter.get('exclude_alts', []) - # Convert Depth_Score to float to ensure numeric comparison - df['Depth_Score'] = df['Depth_Score'].astype(float) + # Ensure Depth_Score is float + df['Depth_Score'] = pd.to_numeric(df['Depth_Score'], errors='coerce') - # STEP 1 & 2) Filter rows with ALT=GG by depth threshold and exclude any ALTs in exclude_alts - initial_count = len(df) - is_gg = df['ALT'] == gg_alt_value - meets_gg_threshold = df['Depth_Score'] >= gg_depth_threshold - df = df[(~is_gg | meets_gg_threshold) & (~df['ALT'].isin(exclude_alts))] - logging.debug(f"Filtered 'GG' by depth and excluded ALTs: {initial_count} -> {len(df)} records") + # Create the boolean mask + is_gg = (df['ALT'] == gg_alt_value) + meets_gg_threshold = (df['Depth_Score'] >= gg_depth_threshold) + not_excluded_alt = ~df['ALT'].isin(exclude_alts) - # STEP 3) Drop intermediate columns 'left' and 'right' if they exist - drop_cols = [col for col in ('left', 'right') if col in df.columns] - if drop_cols: - df = df.drop(columns=drop_cols) - logging.debug(f"Dropped intermediate columns: {drop_cols}") + # Instead of filtering the DataFrame, we store a new boolean column + df['alt_filter_pass'] = ( (~is_gg | meets_gg_threshold) & not_excluded_alt ) - logging.debug(f"Final DataFrame shape: {df.shape}") logging.debug("Exiting filter_by_alt_values_and_finalize") - return df + logging.debug(f"Final DataFrame shape: {df.shape}") + return df \ No newline at end of file diff --git a/vntyper/version.py b/vntyper/version.py index 0d1653b..35e40e1 100644 --- a/vntyper/version.py +++ b/vntyper/version.py @@ -1,3 +1,3 @@ # vntyper/version.py -__version__ = "2.0.0-alpha.54" +__version__ = "2.0.0-alpha.55" From 4cfc2e86d2c75fd1c8d4c0cfbb33e5896eb134e6 Mon Sep 17 00:00:00 2001 From: Bernt Popp Date: Mon, 13 Jan 2025 14:54:17 +0100 Subject: [PATCH 10/10] feat: Implement detailed summary of screening results in the final report Closes #76 --- vntyper/scripts/generate_report.py | 163 +++++++++++++++++++---------- 1 file changed, 107 insertions(+), 56 deletions(-) diff --git a/vntyper/scripts/generate_report.py b/vntyper/scripts/generate_report.py index bb359ed..fdda8fc 100644 --- a/vntyper/scripts/generate_report.py +++ b/vntyper/scripts/generate_report.py @@ -7,6 +7,7 @@ import json from datetime import datetime from pathlib import Path +import re import pandas as pd from jinja2 import Environment, FileSystemLoader @@ -25,6 +26,8 @@ def load_kestrel_results(kestrel_result_file): try: df = pd.read_csv(kestrel_result_file, sep='\t', comment='#') + logging.debug(f"Kestrel DataFrame loaded with {len(df)} rows.") + columns_to_display = { 'Motif': 'Motif', 'Variant': 'Variant', @@ -39,6 +42,7 @@ def load_kestrel_results(kestrel_result_file): } df = df[list(columns_to_display.keys())] df = df.rename(columns=columns_to_display) + logging.debug("Kestrel DataFrame columns renamed for display.") # Handle Low_Precision in orange, High_Precision & High_Precision* in red df['Confidence'] = df['Confidence'].apply( @@ -50,6 +54,7 @@ def load_kestrel_results(kestrel_result_file): else x ) ) + logging.debug("Kestrel 'Confidence' column color-coded based on precision levels.") return df except pd.errors.ParserError as e: logging.error(f"Failed to parse Kestrel result file: {e}") @@ -71,13 +76,14 @@ def load_advntr_results(advntr_result_file): try: df = pd.read_csv(advntr_result_file, sep='\t', comment='#') + logging.debug(f"adVNTR DataFrame loaded with {len(df)} rows.") return df, True except pd.errors.ParserError as e: logging.error(f"Failed to parse adVNTR result file: {e}") return pd.DataFrame(), False except Exception as e: logging.error(f"Unexpected error loading adVNTR results: {e}") - return pd.DataFrame(), False + return pd.DataFrame(), False() def load_pipeline_log(log_file): @@ -98,7 +104,9 @@ def load_pipeline_log(log_file): try: with open(log_file, 'r') as f: - return f.read() + content = f.read() + logging.debug("Pipeline log successfully loaded.") + return content except Exception as e: logging.error(f"Failed to read pipeline log file: {e}") return "Failed to load pipeline log." @@ -120,6 +128,7 @@ def run_igv_report(bed_file, bam_file, fasta_file, output_html, flanking=50, vcf if config is not None and flanking == 50: flanking = config.get("default_values", {}).get("flanking", 50) + logging.debug(f"Flanking region set to {flanking} based on config.") # Convert each path or None into string or skip bed_file = str(bed_file) if bed_file else None @@ -146,6 +155,8 @@ def run_igv_report(bed_file, bam_file, fasta_file, output_html, flanking=50, vcf igv_report_cmd.extend(tracks) igv_report_cmd.extend(['--output', output_html]) + logging.debug(f"IGV report command: {' '.join([str(x) for x in igv_report_cmd if x])}") + try: logging.info(f"Running IGV report: {' '.join([str(x) for x in igv_report_cmd if x])}") subprocess.run(igv_report_cmd, check=True) @@ -209,6 +220,7 @@ def load_fastp_output(fastp_file): try: with open(fastp_file, 'r') as f: data = json.load(f) + logging.debug("fastp output successfully loaded.") return data except Exception as e: logging.error(f"Failed to load or parse fastp output: {e}") @@ -273,6 +285,7 @@ def generate_summary_report( if flanking == 50 and config is not None: flanking = config.get("default_values", {}).get("flanking", 50) + logging.debug(f"Flanking region set to {flanking} based on config.") thresholds = config.get("thresholds", {}) mean_vntr_cov_threshold = thresholds.get("mean_vntr_coverage", 100) @@ -325,9 +338,11 @@ def generate_summary_report( if mean_vntr_coverage is not None and mean_vntr_coverage < mean_vntr_cov_threshold: coverage_icon = '' coverage_color = 'red' + logging.debug("Mean VNTR coverage is below the threshold.") else: coverage_icon = '' coverage_color = 'green' + logging.debug("Mean VNTR coverage is above the threshold.") duplication_rate = None q20_rate = None @@ -344,6 +359,7 @@ def generate_summary_report( duplication_rate = duplication.get("rate", None) after_filtering = summary.get("after_filtering", {}) before_filtering = summary.get("before_filtering", {}) + q20_rate = after_filtering.get("q20_rate", None) q30_rate = after_filtering.get("q30_rate", None) @@ -351,9 +367,12 @@ def generate_summary_report( passed_filter_reads = filtering_result.get("passed_filter_reads", 0) if total_reads_before > 0: passed_filter_rate = passed_filter_reads / total_reads_before + logging.debug(f"Passed filter rate calculated: {passed_filter_rate:.2f}") else: passed_filter_rate = None + logging.debug("Total reads before filtering is zero; passed filter rate set to None.") sequencing_str = summary.get("sequencing", "") + logging.debug(f"Sequencing setup: {sequencing_str}") def warn_icon(value, cutoff, higher_better=True): """ @@ -361,17 +380,22 @@ def warn_icon(value, cutoff, higher_better=True): on whether 'value' passes 'cutoff' under the 'higher_better' logic. """ if value is None: + logging.debug("warn_icon called with value=None; returning empty strings.") return "", "" if higher_better: - return ( - ('', 'red') if value < cutoff - else ('', 'green') - ) + if value < cutoff: + logging.debug(f"Value {value} is below the cutoff {cutoff} (higher_better=True).") + return '', 'red' + else: + logging.debug(f"Value {value} is above or equal to the cutoff {cutoff} (higher_better=True).") + return '', 'green' else: - return ( - ('', 'red') if value > cutoff - else ('', 'green') - ) + if value > cutoff: + logging.debug(f"Value {value} is above the cutoff {cutoff} (higher_better=False).") + return '', 'red' + else: + logging.debug(f"Value {value} is below or equal to the cutoff {cutoff} (higher_better=False).") + return '', 'green' # Evaluate duplication, Q20, Q30, and passed filter rates dup_icon, dup_color = warn_icon(duplication_rate, dup_rate_cutoff, higher_better=False) @@ -385,18 +409,23 @@ def warn_icon(value, cutoff, higher_better=True): index=False, escape=False ) + logging.debug("Kestrel results converted to HTML.") + if advntr_available and not advntr_df.empty: advntr_html = advntr_df.to_html( classes='table table-bordered table-striped hover compact order-column table-sm', index=False ) + logging.debug("adVNTR results converted to HTML.") else: advntr_html = None + logging.debug("No adVNTR results available to convert to HTML.") # Prepare Jinja2 template env = Environment(loader=FileSystemLoader(template_dir)) try: template = env.get_template('report_template.html') + logging.debug("Jinja2 template 'report_template.html' loaded successfully.") except Exception as e: logging.error(f"Failed to load Jinja2 template: {e}") raise @@ -405,6 +434,7 @@ def warn_icon(value, cutoff, higher_better=True): summary_text = build_screening_summary( kestrel_df, advntr_df, advntr_available, mean_vntr_coverage, mean_vntr_cov_threshold ) + logging.debug(f"Summary text generated: {summary_text}") # Build the final context for rendering context = { @@ -443,6 +473,7 @@ def warn_icon(value, cutoff, higher_better=True): # Render the template try: rendered_html = template.render(context) + logging.debug("Report template rendered successfully.") except Exception as e: logging.error(f"Failed to render the report template: {e}") raise @@ -460,7 +491,7 @@ def warn_icon(value, cutoff, higher_better=True): def build_screening_summary(kestrel_df, advntr_df, advntr_available, mean_vntr_coverage, mean_vntr_cov_threshold): """ - Build the short screening summary text based on Kestrel and adVNTR data. + Build the detailed screening summary text based on Kestrel and adVNTR data. Args: kestrel_df (pd.DataFrame): Kestrel results DataFrame. @@ -470,66 +501,86 @@ def build_screening_summary(kestrel_df, advntr_df, advntr_available, mean_vntr_c mean_vntr_cov_threshold (float): Coverage threshold for the VNTR region. Returns: - str: A short summary text describing the findings or negative result. + str: A detailed summary text describing the findings or negative result. """ summary_text = "" try: - import re - + # Function to strip HTML tags from the confidence values def strip_html_tags(confidence_value): return re.sub(r"<[^>]*>", "", confidence_value or "") - # We'll handle 'High_Precision*' same as 'High_Precision' or 'Low_Precision' + # Extract unique confidence values from Kestrel results + kestrel_confidences = [] if not kestrel_df.empty and "Confidence" in kestrel_df.columns: kestrel_confidences = kestrel_df["Confidence"].apply(strip_html_tags).dropna().unique().tolist() - # Now includes 'High_Precision*' in recognized set - kestrel_valid = any(conf in ("High_Precision", "High_Precision*", "Low_Precision") - for conf in kestrel_confidences) + logging.debug(f"Kestrel confidences extracted: {kestrel_confidences}") + + # Determine if Kestrel identified any pathogenic variants + pathogenic_kestrel = any(conf in ("High_Precision", "High_Precision*", "Low_Precision") + for conf in kestrel_confidences) + logging.debug(f"Pathogenic variants identified by Kestrel: {pathogenic_kestrel}") + + # Determine confidence level + confidence_level = None + if "High_Precision" in kestrel_confidences or "High_Precision*" in kestrel_confidences: + confidence_level = "High_Precision" + logging.debug("Confidence level determined: High_Precision") + elif "Low_Precision" in kestrel_confidences: + confidence_level = "Low_Precision" + logging.debug("Confidence level determined: Low_Precision") + + # Assess quality metrics + quality_metrics_pass = True + if mean_vntr_coverage is not None and mean_vntr_coverage < mean_vntr_cov_threshold: + quality_metrics_pass = False + logging.debug("Quality metrics assessment: Failed (coverage below threshold).") else: - kestrel_valid = False - - advntr_has_data = advntr_available and not advntr_df.empty - - logging.debug(f"advntr_available => {advntr_available}") - logging.debug(f"advntr_df.empty => {advntr_df.empty}") - logging.debug(f"advntr_has_data => {advntr_has_data}") - logging.debug(f"adVNTR columns => {list(advntr_df.columns)}") - logging.debug(f"adVNTR first row => {advntr_df.head(1).to_dict(orient='records')}") - - coverage_status = ( - "above" if (mean_vntr_coverage is not None and mean_vntr_coverage >= mean_vntr_cov_threshold) - else "below" - ) - - # Possibly find p-value in adVNTR output - if advntr_has_data: - lower_cols = [c.lower() for c in advntr_df.columns] - logging.debug(f"Lowercased adVNTR columns => {lower_cols}") - if "p-value" in lower_cols: - pval_col = [col for col in advntr_df.columns if col.lower() == "p-value"][0] - pval_val = advntr_df[pval_col].iloc[0] - pval_msg = f" adVNTR p-value: {pval_val}." - elif "pvalue" in lower_cols: - pval_col = [col for col in advntr_df.columns if col.lower() == "pvalue"][0] - pval_val = advntr_df[pval_col].iloc[0] - pval_msg = f" adVNTR p-value: {pval_val}." + logging.debug("Quality metrics assessment: Passed (coverage above threshold).") + + # Scenario 1 & 2: Pathogenic variants identified by Kestrel + if pathogenic_kestrel and confidence_level == "High_Precision": + if quality_metrics_pass: + summary_text += ("Pathogenic frameshift variant identified by Kestrel with high precision, " + "and the VNTR coverage and quality metrics are above the threshold.") + logging.debug("Scenario 1 applied: High precision with passing quality metrics.") else: - logging.debug("No p-value or pvalue column detected.") - pval_msg = "" - else: - pval_msg = "" + summary_text += ("Pathogenic frameshift variant identified by Kestrel with high precision, " + "but one or more quality metrics are below the threshold.") + logging.debug("Scenario 2 applied: High precision with failing quality metrics.") + + # Scenario 3: Pathogenic variants identified with low precision + elif pathogenic_kestrel and confidence_level == "Low_Precision": + if quality_metrics_pass: + summary_text += ("Warning: Pathogenic variant identified with low precision confidence. " + "Validation through alternative methods (e.g., SNaPshot for dupC or " + "long-read sequencing for other variants) is recommended.") + logging.debug("Scenario 3a applied: Low precision with passing quality metrics.") + else: + summary_text += ("Warning: Pathogenic variant identified with low precision confidence and low-quality metrics. " + "Validation using alternative methods is strongly recommended.") + logging.debug("Scenario 3b applied: Low precision with failing quality metrics.") + + # Scenario 4: Check if both Kestrel and adVNTR have positive results + if advntr_available and not advntr_df.empty and pathogenic_kestrel: + # Determine if adVNTR also has a positive result + # Assuming that a positive result means that adVNTR identified at least one variant + adVNTR_positive = not advntr_df.empty + + if adVNTR_positive: + summary_text += (" Both Kestrel and adVNTR genotyping methods have identified pathogenic variants and are concordant.") + logging.debug("Scenario 4 applied: Both methods positive and concordant.") + else: + summary_text += (" There is a discrepancy between Kestrel and adVNTR genotyping methods regarding the identification of pathogenic variants.") + logging.debug("Scenario 4 applied: Discrepancy between methods.") - # Summarize - if kestrel_valid or advntr_has_data: - summary_text = ( - f"{'Kestrel detected a variant.' if kestrel_valid else ''}" - f"{pval_msg} Coverage is {coverage_status} threshold." - ) - else: + # If no pathogenic variants are identified by either method + if not pathogenic_kestrel and (not advntr_available or advntr_df.empty): summary_text = "The screening was negative (no valid Kestrel or adVNTR data)." + logging.debug("No pathogenic variants identified by either method; negative screening.") except Exception as ex: logging.error(f"Exception in build_screening_summary: {ex}") summary_text = "No summary available." + logging.debug(f"Final summary_text: {summary_text}") return summary_text