forked from c3-time-domain/SeeChange
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Algorithm to calculate sky flats (c3-time-domain#93)
* Create a simulator for making semi-realistic images * Make an initial version of an algorithm to calculate flats from hundreds of regular images.
- Loading branch information
Showing
8 changed files
with
1,160 additions
and
4 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
import numpy as np | ||
|
||
|
||
from improc.simulator import Simulator | ||
from improc.tools import sigma_clipping | ||
|
||
|
||
def calc_sky_flat(images, iterations=3, nsigma=3.0, median=True): | ||
"""Calculate the sky flat for a set of images. | ||
Parameters | ||
---------- | ||
images : list of 2D numpy.ndarrays or single 3D numpy.ndarray | ||
The images to calculate the sky flat for. | ||
iterations : int | ||
The number of iterations to use for the sigma clipping procedure. | ||
Default is 3. If the procedure converges it may do fewer iterations. | ||
nsigma : float | ||
The number of sigma to use for the sigma clipping procedure. | ||
Values further from this many standard deviations are removed. | ||
Default is 5.0. | ||
median: bool | ||
If True, use the median instead of the mean for the all iterations | ||
of the sigma clipping algorithm. Default is True. | ||
TODO: does the use of median cause a bias? | ||
Returns | ||
------- | ||
sky_flat : numpy.ndarray | ||
The sky flat image. The value in each pixel represents how much light was | ||
lost between the sky and the detector (including quantum efficiency, and digitization). | ||
Divide an image by the flat to correct for pixel-to-pixel sensitivity variations | ||
and camera vignetting. | ||
""" | ||
# TODO: we may need to chop the images into smaller pieces to avoid memory issues | ||
|
||
if isinstance(images, np.ndarray) and images.ndim == 3: | ||
pass | ||
elif isinstance(images, list) and all(isinstance(im, np.ndarray) for im in images): | ||
images = np.array(images) | ||
else: | ||
raise TypeError("images must be a list of 2D numpy arrays or a 3D numpy array") | ||
|
||
# use the middle half of the image to calculate the sky level (to avoid vignetting) | ||
idx1_1 = images.shape[1] // 4 | ||
idx1_2 = images.shape[1] // 4 * 3 | ||
idx2_1 = images.shape[2] // 4 | ||
idx2_2 = images.shape[2] // 4 * 3 | ||
|
||
# normalize all images to the same mean sky level | ||
mean_sky = np.array([sigma_clipping(im[idx1_1:idx1_2, idx2_1:idx2_2], nsigma=3.0)[0] for im in images]) | ||
# mean_sky = np.array([sigma_clipping(im)[0] for im in images]) | ||
mean_sky = np.reshape(mean_sky, (images.shape[0], 1, 1)) | ||
|
||
im = images.copy() / mean_sky | ||
|
||
mean, rms = sigma_clipping(im, nsigma=nsigma, iterations=iterations, axis=0, median=median) | ||
|
||
return mean | ||
|
||
|
||
if __name__ == '__main__': | ||
sim = Simulator(image_size_x=128) | ||
sim.make_image() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
# various functions and tools used for image processing | ||
|
||
import numpy as np | ||
|
||
|
||
def sigma_clipping(values, nsigma=3.0, iterations=5, axis=None, median=False): | ||
"""Calculate the robust mean and rms by iterative exclusion of outliers. | ||
Parameters | ||
---------- | ||
values: numpy.ndarray | ||
The values to calculate the mean and rms for. | ||
Can be a vector, image, or cube. | ||
nsigma: float | ||
The number of sigma to use for the sigma clipping procedure. | ||
Values further from this many standard deviations are removed. | ||
Default is 3.0. | ||
iterations: int | ||
The number of iterations to use for the sigma clipping procedure. | ||
Default is 5. If the procedure converges it may do fewer iterations. | ||
axis: int or tuple of ints | ||
The axis or axes along which to calculate the mean and rms. | ||
Default is None, which means the function will attempt to guess | ||
the right axis. | ||
For vectors and 3D image cubes, will use axis=0 by default, | ||
which produces a scalar mean/rms for a vector, | ||
and a 2D image for a cube. | ||
For a 2D image input, will use axis=(0,1) by default, | ||
which will produce a scalar mean/rms for the image. | ||
median: bool | ||
If True, use the median instead of the mean for the all iterations | ||
beyond the first one (first iteration always uses median). | ||
Returns | ||
------- | ||
mean: float or numpy.ndarray | ||
The mean of the values after sigma clipping. | ||
rms: float or numpy.ndarray | ||
The rms of the values after sigma clipping. | ||
""" | ||
# parse arguments | ||
if not isinstance(values, np.ndarray): | ||
raise TypeError("values must be a numpy.ndarray") | ||
|
||
if axis is None: | ||
if values.ndim == 1 or values.ndim == 3: | ||
axis = 0 | ||
elif values.ndim == 2: | ||
axis = (0, 1) | ||
else: | ||
raise ValueError("values must be a vector, image, or cube") | ||
|
||
values = values.copy() | ||
|
||
# first iteration: | ||
mean = np.nanmedian(values, axis=axis) | ||
rms = np.nanstd(values, axis=axis) | ||
|
||
# how many nan values? | ||
nans = np.isnan(values).sum() | ||
|
||
for i in range(iterations): | ||
# remove pixels that are more than nsigma from the median | ||
clipped = np.abs(values - mean) > nsigma * rms | ||
values[clipped] = np.nan | ||
|
||
# recalculate the sky flat and noise | ||
if median: # use median to calculate the mean estimate | ||
mean = np.nanmedian(values, axis=axis) | ||
else: # only use median on the first iteration | ||
mean = np.nanmean(values, axis=axis) | ||
rms = np.nanstd(values, axis=axis) | ||
|
||
new_nans = np.isnan(values).sum() | ||
|
||
if new_nans == nans: | ||
break | ||
else: | ||
nans = new_nans | ||
|
||
return mean, rms |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
import os | ||
import time | ||
import pytest | ||
|
||
import numpy as np | ||
|
||
import matplotlib.pyplot as plt | ||
|
||
from models.base import CODE_ROOT | ||
|
||
from improc.simulator import Simulator | ||
from improc.sky_flat import calc_sky_flat | ||
|
||
|
||
@pytest.mark.parametrize("num_images", [10, 300]) | ||
def test_simple_sky_flat(num_images): | ||
clear_cache = True # cache the images from the simulator | ||
filename = os.path.join(CODE_ROOT, f"tests/improc/cache/flat_test_images_{num_images}.npz") | ||
sim = Simulator( | ||
image_size_x=256, # make smaller images to make the test faster | ||
vignette_radius=150, # adjust the vignette radius to match the image size | ||
pixel_qe_std=0.025, # increase the QE variations above the background noise | ||
star_number=100, # the smaller images require a smaller number of stars to avoid crowding | ||
bias_std=0, # simplify by having a completely uniform bias | ||
gain_std=0, # leave the gain at 1.0 | ||
dark_current=0, # simplify by having no dark current | ||
read_noise=0, # simplify by having no read noise | ||
) | ||
|
||
if os.path.isfile(filename) and not clear_cache: | ||
file_obj = np.load(filename, allow_pickle=True) | ||
images = file_obj['images'] | ||
sim.truth = file_obj['truth'][()] | ||
else: | ||
t0 = time.time() | ||
images = [] | ||
for i in range(num_images): | ||
sim.make_image(new_sky=True, new_stars=True) | ||
images.append(sim.apply_bias_correction(sim.image)) | ||
|
||
images = np.array(images) | ||
if not os.path.isdir(os.path.dirname(filename)): | ||
os.makedirs(os.path.dirname(filename)) | ||
np.savez(filename, images=images, truth=sim.truth) | ||
# print(f"Generating {num_images} images took {time.time() - t0:.1f} seconds") | ||
|
||
t0 = time.time() | ||
# don't use median so we can see when it fails on too few stars | ||
sky_flat = calc_sky_flat(images, nsigma=3.0, iterations=5, median=False) | ||
# print(f'calc_sky_flat took {time.time() - t0:.1f} seconds') | ||
|
||
# plt.plot(sky_flat[10, :], label="sky flat") | ||
# plt.plot(sim.truth.vignette_map[10, :], label="camera vignette") | ||
# plt.plot(sim.truth.vignette_map[10, :] * sim.truth.pixel_qe_map[10, :], label="expected flat") | ||
# plt.legend() | ||
# plt.show(block=True) | ||
|
||
delta = (sky_flat - sim.truth.vignette_map * sim.truth.pixel_qe_map) | ||
|
||
bkg = sim.truth.background_mean | ||
expected_noise = np.sqrt(bkg / num_images) / bkg | ||
|
||
# delta should have a mean=0, but the estimator for this number has the same noise as above, | ||
# reduced by the number of pixels in the image (since we are averaging over all pixels) | ||
expected_bias = expected_noise / np.sqrt(sim.truth.imsize[0] * sim.truth.imsize[1]) | ||
|
||
if num_images < 100: | ||
assert expected_noise / 2 < np.nanstd(delta) < expected_noise * 2 | ||
assert np.nanmean(delta) > expected_bias * 3 # lots of bias because the stars are not removed with few images | ||
else: | ||
assert np.nanstd(delta) < expected_noise * 2 | ||
assert np.nanmean(delta) < expected_bias * 2 |