Skip to content

Commit

Permalink
Update spectrum.py
Browse files Browse the repository at this point in the history
  • Loading branch information
lukasz-migas committed May 8, 2024
1 parent 8e9d3e2 commit b752b76
Showing 1 changed file with 119 additions and 0 deletions.
119 changes: 119 additions & 0 deletions src/spec_utils/spectrum.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
"""Spectra."""

from __future__ import annotations

import math
import typing as ty
from contextlib import suppress
Expand Down Expand Up @@ -226,3 +229,119 @@ def downsample(array_signal, n_up=2, n_down=20, ds_filter="upfirdn"):
else:
downsampled_signal = array_signal
return downsampled_signal


def merge_peaks_by_ppm(
mz: np.ndarray, intensity: np.ndarray, ion_mobility: np.ndarray | None = None, combine_ppm: float = 5
):
"""Merge peaks by ppm."""
from koyo.spectrum import ppm_diff

if ion_mobility is None:
ion_mobility = np.zeros_like(mz)

# this calculates the ppm difference between adjacent m/z values and subsequently splits those masses
# into separate groups
splits = np.where(ppm_diff(mz) > combine_ppm)[0] + 1
index = np.asarray([group.argmax() for group in np.split(intensity, splits)])
x, y, im = [], [], []
for i, (group_x, group_y, group_im) in enumerate(
zip(
np.split(mz, splits),
np.split(intensity, splits),
np.split(ion_mobility, splits),
)
):
x.append(group_x[index[i]])
y.append(group_y[index[i]])
im.append(group_im[index[i]])
return np.asarray(x), np.asarray(y), np.asarray(im)


def merge_peaks_by_tol(
mz: np.ndarray, intensity: np.ndarray, ion_mobility: np.ndarray | None = None, combine_tol: float = 0.001
):
"""Merge peaks by m/z tolerance."""
if ion_mobility is None:
ion_mobility = np.zeros_like(mz)
index_groups, peak_groups = group_peaks_by_tol(mz, combine_tol)
mz, intensity, im = apply_most_intense(mz, intensity, index_groups, ion_mobility)
return mz, intensity, im


def select_most_intense(xs, ys, index_groups):
"""Select peaks that have the highest intensity."""
xs_new, ys_new, indices_sel = [], [], []
for index_group in index_groups:
indices = np.argsort(ys[index_group])
xs_new.append(xs[index_group][indices[-1]])
ys_new.append(ys[index_group][indices[-1]])
indices_sel.append(index_group[indices[-1]])
return np.array(xs_new), np.array(ys_new)


def get_most_intense_index(xs, ys, index_groups):
"""Get most intense index."""
indices_sel = []
for index_group in index_groups:
indices = np.argsort(ys[index_group])
indices_sel.append(index_group[indices[-1]])
return indices_sel


def apply_most_intense(xs, ys, index_groups, *arrays):
"""Apply most intense."""
indices_sel = get_most_intense_index(xs, ys, index_groups)
ret = []
for _ in range(len(arrays) + 2):
ret.append([])
for index in indices_sel:
ret[0].append(xs[index])
ret[1].append(ys[index])
for i, array in enumerate(arrays, start=2):
ret[i].append(array[index])
return [np.array(r) for r in ret]


def merge_peaks(xs: ty.List[np.ndarray], ys: ty.List[np.ndarray], threshold: float = 0.005):
"""Merge peaks."""
xs, indices = np.unique(np.concatenate(xs), return_index=True)
ys = np.concatenate(ys)[indices]
index_groups, peak_groups = group_peaks_by_tol(xs, threshold)
xs_new, ys_new = select_most_intense(xs, ys, index_groups)
return xs_new, ys_new


def group_peaks_by_tol(xs: np.ndarray, threshold: float = 0.01) -> ty.Tuple[ty.List[np.ndarray], ty.List[np.ndarray]]:
"""
Group peaks together by their distance based on a threshold.
Parameters
----------
xs : np.ndarray
Sorted array of m/z values.
threshold : float
Maximum difference between peaks for grouping.
Returns
-------
List of Lists: A list of peak groups, where each group is represented as a list of m/z values.
"""
index_groups = []
current_group = []

for i in range(len(xs) - 1):
current_group.append(i)

# Check if the next peak is within the threshold
if xs[i + 1] - xs[i] > threshold:
index_groups.append(current_group)
current_group = []

# Add the last group
if len(xs) > 0:
current_group.append(len(xs) - 1)
index_groups.append(current_group)
index_groups = [np.array(index_group) for index_group in index_groups]
peak_groups = [xs[index_group] for index_group in index_groups]
return index_groups, peak_groups

0 comments on commit b752b76

Please sign in to comment.