From b752b76af3e7201b286dca5abaf790f9049fb11a Mon Sep 17 00:00:00 2001 From: Lukasz Migas Date: Wed, 8 May 2024 18:12:45 +0200 Subject: [PATCH] Update spectrum.py --- src/spec_utils/spectrum.py | 119 +++++++++++++++++++++++++++++++++++++ 1 file changed, 119 insertions(+) diff --git a/src/spec_utils/spectrum.py b/src/spec_utils/spectrum.py index 9c8fbb2..244efee 100644 --- a/src/spec_utils/spectrum.py +++ b/src/spec_utils/spectrum.py @@ -1,4 +1,7 @@ """Spectra.""" + +from __future__ import annotations + import math import typing as ty from contextlib import suppress @@ -226,3 +229,119 @@ def downsample(array_signal, n_up=2, n_down=20, ds_filter="upfirdn"): else: downsampled_signal = array_signal return downsampled_signal + + +def merge_peaks_by_ppm( + mz: np.ndarray, intensity: np.ndarray, ion_mobility: np.ndarray | None = None, combine_ppm: float = 5 +): + """Merge peaks by ppm.""" + from koyo.spectrum import ppm_diff + + if ion_mobility is None: + ion_mobility = np.zeros_like(mz) + + # this calculates the ppm difference between adjacent m/z values and subsequently splits those masses + # into separate groups + splits = np.where(ppm_diff(mz) > combine_ppm)[0] + 1 + index = np.asarray([group.argmax() for group in np.split(intensity, splits)]) + x, y, im = [], [], [] + for i, (group_x, group_y, group_im) in enumerate( + zip( + np.split(mz, splits), + np.split(intensity, splits), + np.split(ion_mobility, splits), + ) + ): + x.append(group_x[index[i]]) + y.append(group_y[index[i]]) + im.append(group_im[index[i]]) + return np.asarray(x), np.asarray(y), np.asarray(im) + + +def merge_peaks_by_tol( + mz: np.ndarray, intensity: np.ndarray, ion_mobility: np.ndarray | None = None, combine_tol: float = 0.001 +): + """Merge peaks by m/z tolerance.""" + if ion_mobility is None: + ion_mobility = np.zeros_like(mz) + index_groups, peak_groups = group_peaks_by_tol(mz, combine_tol) + mz, intensity, im = apply_most_intense(mz, intensity, index_groups, ion_mobility) + return mz, intensity, im + + +def select_most_intense(xs, ys, index_groups): + """Select peaks that have the highest intensity.""" + xs_new, ys_new, indices_sel = [], [], [] + for index_group in index_groups: + indices = np.argsort(ys[index_group]) + xs_new.append(xs[index_group][indices[-1]]) + ys_new.append(ys[index_group][indices[-1]]) + indices_sel.append(index_group[indices[-1]]) + return np.array(xs_new), np.array(ys_new) + + +def get_most_intense_index(xs, ys, index_groups): + """Get most intense index.""" + indices_sel = [] + for index_group in index_groups: + indices = np.argsort(ys[index_group]) + indices_sel.append(index_group[indices[-1]]) + return indices_sel + + +def apply_most_intense(xs, ys, index_groups, *arrays): + """Apply most intense.""" + indices_sel = get_most_intense_index(xs, ys, index_groups) + ret = [] + for _ in range(len(arrays) + 2): + ret.append([]) + for index in indices_sel: + ret[0].append(xs[index]) + ret[1].append(ys[index]) + for i, array in enumerate(arrays, start=2): + ret[i].append(array[index]) + return [np.array(r) for r in ret] + + +def merge_peaks(xs: ty.List[np.ndarray], ys: ty.List[np.ndarray], threshold: float = 0.005): + """Merge peaks.""" + xs, indices = np.unique(np.concatenate(xs), return_index=True) + ys = np.concatenate(ys)[indices] + index_groups, peak_groups = group_peaks_by_tol(xs, threshold) + xs_new, ys_new = select_most_intense(xs, ys, index_groups) + return xs_new, ys_new + + +def group_peaks_by_tol(xs: np.ndarray, threshold: float = 0.01) -> ty.Tuple[ty.List[np.ndarray], ty.List[np.ndarray]]: + """ + Group peaks together by their distance based on a threshold. + + Parameters + ---------- + xs : np.ndarray + Sorted array of m/z values. + threshold : float + Maximum difference between peaks for grouping. + + Returns + ------- + List of Lists: A list of peak groups, where each group is represented as a list of m/z values. + """ + index_groups = [] + current_group = [] + + for i in range(len(xs) - 1): + current_group.append(i) + + # Check if the next peak is within the threshold + if xs[i + 1] - xs[i] > threshold: + index_groups.append(current_group) + current_group = [] + + # Add the last group + if len(xs) > 0: + current_group.append(len(xs) - 1) + index_groups.append(current_group) + index_groups = [np.array(index_group) for index_group in index_groups] + peak_groups = [xs[index_group] for index_group in index_groups] + return index_groups, peak_groups