Skip to content

Commit

Permalink
Added image utilities
Browse files Browse the repository at this point in the history
- added image utilities
- added a couple of new functions
  • Loading branch information
lukasz-migas committed Apr 13, 2023
1 parent 8074d8d commit ef0a228
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 7 deletions.
3 changes: 3 additions & 0 deletions .idea/koyo.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,9 @@ dev = [
"mypy",
"pdbpp",
"pre-commit",
"pytest-cov",
"pytest",
"rich",
"ruff",
"koyo[test]"
]

[project.urls]
Expand Down Expand Up @@ -114,7 +113,6 @@ extend-ignore = [

[tool.ruff.per-file-ignores]
"tests/*.py" = ["D", "S"]
"setup.py" = ["D"]

# https://docs.pytest.org/en/6.2.x/customize.html
[tool.pytest.ini_options]
Expand Down
77 changes: 77 additions & 0 deletions src/koyo/image.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
"""Image processing functions."""
import typing as ty

import numpy as np


Expand Down Expand Up @@ -26,3 +28,78 @@ def clip_hotspots(img: np.ndarray, quantile: float = 0.99) -> np.ndarray:
img = np.clip(img, None, hotspot_threshold)
img[mask] = np.nan
return img


def reshape_array(array: np.ndarray, image_shape: ty.Tuple[int, int], pixel_index: np.ndarray, fill_value: float = 0):
"""
Reshape 1D data to 2D heatmap.
Parameters
----------
array: np.array / list
1D array of values to be reshaped
image_shape: tuple
final shape of the image
pixel_index: np.array
array containing positions where pixels should be placed, considering missing values -
e.g. not acquired pixels
fill_value : float, optional
if value is provided, it will be used to fill-in the values
Returns
-------
im_array: np.array
reshaped heatmap of shape `image_shape`
"""
if isinstance(array, np.matrix):
array = np.asarray(array).flatten()
array = np.asarray(array)
dtype = np.float32 if isinstance(fill_value, float) else array.dtype

image_n_pixels = np.prod(image_shape)
im_array = np.full(image_n_pixels, dtype=dtype, fill_value=fill_value)
im_array[pixel_index] = array
im_array = np.reshape(im_array, image_shape)
return im_array


def reshape_array_from_coordinates(
array: np.ndarray, image_shape: ty.Tuple[int, int], coordinates: np.ndarray, fill_value: float = 0
):
"""Reshape array based on xy coordinates."""
dtype = np.float32 if np.isnan(fill_value) else array.dtype
im = np.full(image_shape, fill_value=fill_value, dtype=dtype)
im[coordinates[:, 1] - 1, coordinates[:, 0] - 1] = array
return im


def reshape_array_batch(
array: np.ndarray, image_shape: ty.Tuple[int, int], pixel_index: np.ndarray, fill_value: float = 0
):
"""Reshape many images into a data cube."""
array = np.asarray(array)
if array.ndim == 1:
return reshape_array(array, image_shape, pixel_index, fill_value)
count = array.shape[1]
dtype = np.float32 if isinstance(fill_value, float) else array.dtype

im_array = np.full((count, np.prod(image_shape)), dtype=dtype, fill_value=fill_value)
for i in range(count):
im_array[i, pixel_index] = array[:, i]
# reshape data
im_array = np.reshape(im_array, (count, *image_shape))
return im_array


def reshape_array_batch_from_coordinates(
array: np.ndarray, image_shape: ty.Tuple[int, int], coordinates: np.ndarray, fill_value: int = 0
):
"""Batch reshape image."""
if array.ndim != 2:
raise ValueError("Expected 2-D array.")
n = array.shape[1]
dtype = np.float32 if np.isnan(fill_value) else array.dtype
im = np.full((n, *image_shape), fill_value=fill_value, dtype=dtype)
for i in range(n):
im[i, coordinates[:, 1] - 1, coordinates[:, 0] - 1] = array[:, i]
return im
45 changes: 41 additions & 4 deletions src/koyo/spectrum.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,38 @@ def ppm_diff(a: np.ndarray, axis=-1) -> np.ndarray:
return (np.subtract(a[slice1], a[slice2]) / a[slice2]) * 1e6


@numba.njit()
def find_between(data: SimpleArrayLike, min_value: float, max_value: float) -> np.ndarray:
@numba.njit(cache=True, fastmath=True)
def find_between(data: SimpleArrayLike, min_value: float, max_value: float):
"""Find indices between windows."""
return np.where(np.logical_and(data >= min_value, data <= max_value))[0]


@numba.njit()
def find_between_ppm(data: SimpleArrayLike, value: float, ppm: float):
@numba.njit(cache=True, fastmath=True)
def find_between_tol(data: np.ndarray, value: float, tol: float):
"""Find indices between window and ppm."""
return find_between(data, value - tol, value + tol)


@numba.njit(cache=True, fastmath=True)
def find_between_ppm(data: np.ndarray, value: float, ppm: float):
"""Find indices between window and ppm."""
window = get_window_for_ppm(value, abs(ppm))
return find_between(data, value - window, value + window)


@numba.njit(cache=True, fastmath=True)
def find_between_batch(array: np.ndarray, min_value: np.ndarray, max_value: np.ndarray):
"""Find indices between specified boundaries for many items."""
min_indices = np.searchsorted(array, min_value, side="left")
max_indices = np.searchsorted(array, max_value, side="right")

res = []
for i in range(len(min_value)):
_array = array[min_indices[i] : max_indices[i]]
res.append(min_indices[i] + find_between(_array, min_value[i], max_value[i]))
return res


def get_peaklist_window_for_ppm(peaklist: np.ndarray, ppm: float) -> ty.List[ty.Tuple[float, float]]:
"""Retrieve peaklist + tolerance."""
_peaklist = []
Expand All @@ -75,6 +94,24 @@ def get_peaklist_window_for_da(peaklist: np.ndarray, da: float) -> ty.List[ty.Tu
return _peaklist


def get_mzs_for_tol(mzs: np.ndarray, tol: float = None, ppm: float = None):
"""Get min/max values for specified tolerance or ppm."""
if tol is None and ppm is None or tol == 0 and ppm == 0:
raise ValueError("Please specify `tol` or `ppm`.")
elif tol is not None and ppm is not None:
raise ValueError("Please only specify `tol` or `ppm`.")

mzs = np.asarray(mzs)
if tol:
mzs_min = mzs - tol
mzs_max = mzs + tol
else:
tol = np.asarray([get_window_for_ppm(mz, ppm) for mz in mzs])
mzs_min = mzs - tol
mzs_max = mzs + tol
return mzs_min, mzs_max


def bisect_spectrum(x_spectrum, mz_value, tol: float) -> ty.Tuple[int, int]:
"""Get left/right window of extraction for peak."""
ix_l, ix_u = (
Expand Down

0 comments on commit ef0a228

Please sign in to comment.