Skip to content

Commit

Permalink
Merge pull request #738 from broadinstitute/kl/validity_checks
Browse files Browse the repository at this point in the history
Add functions to check struct and array missingness
  • Loading branch information
klaricch authored Jan 23, 2025
2 parents 9964357 + de9036e commit 28a5433
Show file tree
Hide file tree
Showing 2 changed files with 400 additions and 0 deletions.
117 changes: 117 additions & 0 deletions gnomad/assessment/validity_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -1211,3 +1211,120 @@ def count_vep_annotated_variants_per_interval(
)

return interval_ht


def check_missingness_of_struct(
struct_expr: hl.expr.StructExpression, prefix: str = ""
) -> Dict[str, Any]:
"""
Recursively check the fraction of missing values of all fields within a StructExpression.
Either a standalone or nested struct can be provided. If the struct contains an array (or set) of values, the array
will be considered missing if it is NA, an empty array, or only has missing elements.
:param struct_expr: StructExpression for which to check for missing values.
:param prefix: Prefix to append to names of struct fields within the struct_expr.
:return: Dictionary mapping field names to their missingness fraction expressions, with nested dictionaries representing any nested structs.
"""
if isinstance(struct_expr, hl.expr.StructExpression):
return {
f"{prefix}.{key}": check_missingness_of_struct(
struct_expr[key], f"{prefix}.{key}"
)
for key in struct_expr.keys()
}
elif isinstance(struct_expr, (hl.expr.ArrayExpression, hl.expr.SetExpression)):
# Count array/set as missing if it is NA, an empty array/set, or only has missing
# elements.
return hl.agg.fraction(
hl.or_else(struct_expr.all(lambda x: hl.is_missing(x)), True)
)
else:
return hl.agg.fraction(hl.is_missing(struct_expr))


def flatten_missingness_struct(
missingness_struct: hl.expr.StructExpression,
) -> Dict[str, float]:
"""
Recursively flatten and evaluate nested dictionaries of missingness within a Struct.
:param missingness_struct: Struct containing dictionaries of missingness values.
:return: Dictionary with field names as keys and their evaluated missingness fractions as values.
"""
missingness_dict = {}
for key, value in missingness_struct.items():
# Recursively check nested missingness dictionaries and flatten if needed.
if isinstance(value, dict):
missingness_dict.update(flatten_missingness_struct(value))
else:
missingness_dict[key] = hl.eval(value)
return missingness_dict


def unfurl_array_annotations(
ht: hl.Table, indexed_array_annotations: Dict[str, str]
) -> Dict[str, Any]:
"""
Unfurl specified arrays of structs into a dictionary of flattened expressions.
Array annotations must have a corresponding dictionary to define the indices for each array field.
Example: indexed_array_annotations = {"freq": "freq_index_dict"}, where 'freq' is structured as array<struct{AC: int32, AF: float64, AN: int32, homozygote_count: int64} and 'freq_index_dict' is defined as {'adj': 0, 'raw': 1}.
The keys of indexed_array_annotations should be present in the Table as row annotations, whereas the values should be present as global annotations.
:param ht: Input Table.
:param indexed_array_annotations: Dictionary mapping array field names to their corresponding index dictionaries, which define the indices for each array field. Default is {'faf': 'faf_index_dict', 'freq': 'freq_index_dict'}.
:return: Flattened dictionary of unfurled array annotations.
"""
expr_dict = {}

# For each specified array, unfurl the array elements and their structs
# into expr_dict.
for array, array_index_dict in indexed_array_annotations.items():
# Check for presence of array in the Table rows and the array index in the
# globals.
if array not in ht.row:
raise ValueError(f"Annotation '{array}' not found in the Table rows.")
if array_index_dict not in ht.globals:
raise ValueError(
f"Annotation '{array_index_dict}' not found in the Table globals."
)

# Evaluate the index dictionary for the specified array.
array_index_dict = hl.eval(ht[array_index_dict])

# Unfurl the array elements and structs into the expression dictionary.
for k, i in array_index_dict.items():
for f in ht[array][0].keys():
expr_dict[f"{f}_{k}"] = ht[array][i][f]

return expr_dict


def check_array_struct_missingness(
ht: hl.Table,
indexed_array_annotations: Dict[str, str] = {
"faf": "faf_index_dict",
"freq": "freq_index_dict",
},
) -> hl.expr.StructExpression:
"""
Check the missingness of all fields in an array of structs.
Iterates over arrays of structs and calculates the percentage of missing values for each element of the array and each struct. Array annotations must have a corresponding dictionary to define the indices for each array field.
Example: indexed_array_annotations = {"freq": "freq_index_dict"}, where 'freq' is structured as array<struct{AC: int32, AF: float64, AN: int32, homozygote_count: int64} and 'freq_index_dict' is defined as {'adj': 0, 'raw': 1}.
:param ht: Input Table.
:param indexed_array_annotations: A dictionary mapping array field names to their corresponding index dictionaries, which define the indices for each array field. Default is {'faf': 'faf_index_dict', 'freq': 'freq_index_dict'}.
:return: A Struct where each field represents a struct field's missingness percentage across the Table for each element of the specified arrays.
"""
# Create row annotations for each element of the arrays and their structs.
annotations = unfurl_array_annotations(ht, indexed_array_annotations)
ht = ht.annotate(**annotations)

# Compute missingness for each of the newly created row annotations.
missingness_dict = {
field_name: hl.agg.fraction(hl.is_missing(ht[field_name]))
for field_name in annotations.keys()
}
return ht.aggregate(hl.struct(**missingness_dict))
Loading

0 comments on commit 28a5433

Please sign in to comment.