diff --git a/.gitignore b/.gitignore index a0f4a6f1..df4fda14 100644 --- a/.gitignore +++ b/.gitignore @@ -191,3 +191,6 @@ dkms.conf # Added by vscode .vscode + +# development folder +dev diff --git a/lcls_tools/common/data/fit/gaussian_fit.py b/lcls_tools/common/data/fit/gaussian_fit.py deleted file mode 100644 index 00495080..00000000 --- a/lcls_tools/common/data/fit/gaussian_fit.py +++ /dev/null @@ -1,125 +0,0 @@ -import numpy as np -from pydantic import BaseModel, ConfigDict, PositiveFloat -from typing import Optional -from lcls_tools.common.data.fit.projection import ProjectionFit -from lcls_tools.common.image.processing import ImageProcessor - - -class GaussianFit(BaseModel): - # should rename to BeamsizeEvaluator or something ? - """ - Gaussian Fitting class that takes in a fitting tool (with method), - ImageProcessor, and Image and returns the beamsize and position. - """ - - model_config = ConfigDict(arbitrary_types_allowed=True) - processor: ImageProcessor = ImageProcessor() - fit: ProjectionFit = ProjectionFit() - min_log_intensity: float = 4.0 - bounding_box_half_width: PositiveFloat = 3.0 - apply_bounding_box_constraint: bool = True - image: Optional[np.ndarray] = None - - @property - def beamsize(self): - beamspot_chars = self.gaussian_fit(self.image) - beamsize = self.calculate_beamsize(beamspot_chars) - return beamsize - - def gaussian_fit(self, image): - self.processor.auto_process(image) - x_proj, y_proj = self.get_projections(image) - x_parameters = self.fit.fit_projection(x_proj) - y_parameters = self.fit.fit_projection(y_proj) - - return { - "centroid": np.array((x_parameters["mean"], y_parameters["mean"])), - "rms_sizes": np.array((x_parameters["sigma"], y_parameters["sigma"])), - "total_intensity": image.sum(), - } - - def get_projections(self, image): - x_projection = np.array(np.sum(image, axis=0)) - y_projection = np.array(np.sum(image, axis=1)) - return x_projection, y_projection - - def calculate_beamsize(self, beamspot_chars: dict): - """ - Conditional beamsize calculation: - if condition1 (total_intensity is to small): then return NULL result. - else: do beamsize calculation and check that the beamspot is - within the ROI - """ - if np.log10(beamspot_chars["total_intensity"]) < self.min_log_intensity: - # TODO: character count is really not liking this one line - print( - ( - "log10 image intensity" - + f"{np.log10(beamspot_chars['total_intensity'])}" - + "below threshold" - ) - ) - result = { - "Cx": np.NaN, - "Cy": np.NaN, - "Sx": np.NaN, - "Sy": np.NaN, - "bb_penalty": np.NaN, - "total_intensity": beamspot_chars["total_intensity"], - } - return result - else: - centroid = beamspot_chars["centroid"] - sizes = beamspot_chars["rms_sizes"] - - if np.all(~np.isnan(np.stack((centroid, sizes)))): - # get beam region bounding box - n_stds = self.bounding_box_half_width - - bounding_box_corner_pts = np.array( - ( - centroid - n_stds * sizes, - centroid + n_stds * sizes, - centroid - n_stds * sizes * np.array((-1, 1)), - centroid + n_stds * sizes * np.array((-1, 1)), - ) - ) - - if self.processor.roi is not None: - roi_radius = self.processor.roi.radius - else: - roi_radius = np.min(np.array(self.image.shape) / 1.5) - # TODO: maybe need to change this ^^^ - - # This is a check whether or not the beamspot is within - # the bounding box. - temp = bounding_box_corner_pts - np.array((roi_radius, roi_radius)) - distances = np.linalg.norm(temp, axis=1) - bounding_box_penalty = np.max(distances) - roi_radius - - result = { - "Cx": centroid[0], - "Cy": centroid[1], - "Sx": sizes[0], - "Sy": sizes[1], - "bb_penalty": bounding_box_penalty, - "total_intensity": beamspot_chars["total_intensity"], - } - - # set results to none if the beam extends beyond the roi - # and the bounding box constraint is active - if bounding_box_penalty > 0 and self.apply_bounding_box_constraint: - for name in ["Cx", "Cy", "Sx", "Sy"]: - result[name] = np.NaN - - else: - result = { - "Cx": np.NaN, - "Cy": np.NaN, - "Sx": np.NaN, - "Sy": np.NaN, - "bb_penalty": np.NaN, - "total_intensity": beamspot_chars["total_intensity"], - } - - return result diff --git a/lcls_tools/common/data/fit/method_base.py b/lcls_tools/common/data/fit/method_base.py index 0a30f9cb..5bd608ed 100644 --- a/lcls_tools/common/data/fit/method_base.py +++ b/lcls_tools/common/data/fit/method_base.py @@ -1,4 +1,6 @@ from abc import ABC, abstractmethod +from typing import Optional + import numpy as np from pydantic import BaseModel, ConfigDict @@ -49,7 +51,7 @@ def priors(self, priors: dict[str, float]): # TODO: define properties -class MethodBase(ABC): +class MethodBase(ABC, BaseModel): """ Base abstract class for all fit methods, which serves as the bare minimum skeleton code needed. Should be used only as a parent class to all method @@ -63,7 +65,9 @@ class MethodBase(ABC): and upper bound on for acceptable values of each parameter) """ - parameters: ModelParameters = None + parameters: ModelParameters + use_priors: Optional[bool] = False + fitted_params_dict: Optional[dict] = None @abstractmethod def find_init_values(self) -> list: @@ -121,10 +125,9 @@ def loss( method_parameter_list: np.ndarray, x: np.ndarray, y: np.ndarray, - use_priors: bool = False, ): loss_temp = -self._log_likelihood(x, y, method_parameter_list) - if use_priors: + if self.use_priors: loss_temp = loss_temp - self._log_prior(method_parameter_list) return loss_temp diff --git a/lcls_tools/common/data/fit/methods.py b/lcls_tools/common/data/fit/methods.py index 0397b975..bd5dae04 100644 --- a/lcls_tools/common/data/fit/methods.py +++ b/lcls_tools/common/data/fit/methods.py @@ -28,7 +28,7 @@ class GaussianModel(MethodBase): density functions to match that data. """ - parameters = gaussian_parameters + parameters: ModelParameters = gaussian_parameters def find_init_values(self) -> dict: """Fit data without optimization, return values.""" @@ -50,7 +50,7 @@ def find_init_values(self) -> dict: self.parameters.initial_values = init_values return init_values - def find_priors(self) -> dict: + def find_priors(self, **kwargs) -> dict: """ Do initial guesses based on data and make distribution from that guess. """ diff --git a/lcls_tools/common/data/fit/projection.py b/lcls_tools/common/data/fit/projection.py index a5947a75..9fbc3e64 100644 --- a/lcls_tools/common/data/fit/projection.py +++ b/lcls_tools/common/data/fit/projection.py @@ -1,7 +1,10 @@ +from typing import Optional + import numpy as np import scipy.optimize import scipy.signal from pydantic import BaseModel, ConfigDict + from lcls_tools.common.data.fit.method_base import MethodBase from lcls_tools.common.data.fit.methods import GaussianModel @@ -27,8 +30,7 @@ class ProjectionFit(BaseModel): # TODO: come up with better name model_config = ConfigDict(arbitrary_types_allowed=True) - model: MethodBase = GaussianModel() - use_priors: bool = False + model: Optional[MethodBase] = GaussianModel() def normalize(self, data: np.ndarray) -> np.ndarray: """ @@ -40,8 +42,8 @@ def normalize(self, data: np.ndarray) -> np.ndarray: return normalized_data def unnormalize_model_params( - self, method_params_dict: dict, projection_data: np.ndarray - ) -> np.ndarray: + self, method_params_dict: dict, projection_data: np.ndarray + ) -> dict: """ Takes fitted and normalized params and returns them to unnormalized values i.e the true fitted values of the distribution @@ -78,7 +80,7 @@ def fit_model(self) -> scipy.optimize._optimize.OptimizeResult: res = scipy.optimize.minimize( self.model.loss, init_values, - args=(x, y, self.use_priors), + args=(x, y), bounds=bounds, method="Powell", ) @@ -86,7 +88,7 @@ def fit_model(self) -> scipy.optimize._optimize.OptimizeResult: def fit_projection(self, projection_data: np.ndarray) -> dict: """ - type is dict[str, float] + Return type is dict[str, float] Wrapper function that does all necessary steps to fit 1d array. Returns a dictionary where the keys are the model params and their values are the params fitted to the data @@ -101,4 +103,5 @@ def fit_projection(self, projection_data: np.ndarray) -> dict: fitted_params_dict[param] = (res.x)[i] self.model.fitted_params_dict = fitted_params_dict.copy() params_dict = self.unnormalize_model_params(fitted_params_dict, projection_data) + return params_dict diff --git a/lcls_tools/common/frontend/plotting/image.py b/lcls_tools/common/frontend/plotting/image.py new file mode 100644 index 00000000..869eab79 --- /dev/null +++ b/lcls_tools/common/frontend/plotting/image.py @@ -0,0 +1,42 @@ +import numpy as np +from matplotlib import pyplot as plt + +from lcls_tools.common.image.fit import ImageProjectionFitResult + + +def plot_image_projection_fit(result: ImageProjectionFitResult): + """ + plot image and projection data for validation + """ + fig, ax = plt.subplots(3, 1) + fig.set_size_inches(4, 9) + + image = result.processed_image + ax[0].imshow(image) + + projections = { + "x": np.array(np.sum(image, axis=0)), + "y": np.array(np.sum(image, axis=1)) + } + + ax[0].plot(*result.centroid, "+r") + + # plot data and model fit + for i, name in enumerate(["x", "y"]): + fit_params = getattr(result, f"{name}_projection_fit_parameters") + ax[i + 1].text(0.01, 0.99, + "\n".join([ + f"{name}: {int(val)}" for name, val in + fit_params.items() + ]), + transform=ax[i + 1].transAxes, + ha='left', va='top', fontsize=10) + x = np.arange(len(projections[name])) + + ax[i + 1].plot(projections[name], label="data") + fit_param_numpy = np.array([fit_params[name] for name in + result.projection_fit_method.parameters.parameters]) + ax[i + 1].plot(result.projection_fit_method._forward(x, fit_param_numpy), + label="model fit") + + return fig, ax diff --git a/lcls_tools/common/image/fit.py b/lcls_tools/common/image/fit.py new file mode 100644 index 00000000..3990f926 --- /dev/null +++ b/lcls_tools/common/image/fit.py @@ -0,0 +1,82 @@ +from abc import ABC, abstractmethod +from typing import Optional, List + +import numpy as np +from numpy import ndarray +from pydantic import BaseModel, ConfigDict, PositiveFloat, Field + +from lcls_tools.common.data.fit.method_base import MethodBase +from lcls_tools.common.data.fit.methods import GaussianModel +from lcls_tools.common.data.fit.projection import ProjectionFit +from lcls_tools.common.image.processing import ImageProcessor + + +class ImageFitResult(BaseModel): + centroid: List[float] = Field(min_length=2, max_length=2) + rms_size: List[float] = Field(min_length=2, max_length=2) + total_intensity: PositiveFloat + processed_image: ndarray + model_config = ConfigDict(arbitrary_types_allowed=True) + + +class ImageProjectionFitResult(ImageFitResult): + projection_fit_method: MethodBase + x_projection_fit_parameters: dict[str, float] + y_projection_fit_parameters: dict[str, float] + + +class ImageFit(BaseModel, ABC): + """ + Abstract class for determining beam properties from an image + """ + image_processor: Optional[ImageProcessor] = ImageProcessor() + model_config = ConfigDict(arbitrary_types_allowed=True) + + def fit_image(self, image: ndarray) -> ImageFitResult: + """ + Public method to determine beam properties from an image, including initial + image processing, internal image fitting method, and image validation. + + """ + processed_image = self.image_processor.auto_process(image) + fit_result = self._fit_image(processed_image) + return fit_result + + @abstractmethod + def _fit_image(self, image: ndarray) -> ImageFitResult: + """ + Private image fitting method to be overwritten by subclasses. Expected to + return a ImageFitResult dataclass. + """ + ... + + +class ImageProjectionFit(ImageFit): + """ + Image fitting class that gets the beam size and location by independently fitting + the x/y projections. The default configuration uses a Gaussian fitting of the + profile with prior distributions placed on the model parameters. + """ + projection_fit_method: Optional[MethodBase] = GaussianModel(use_priors=True) + model_config = ConfigDict(arbitrary_types_allowed=True) + + def _fit_image(self, image: ndarray) -> ImageProjectionFitResult: + x_projection = np.array(np.sum(image, axis=0)) + y_projection = np.array(np.sum(image, axis=1)) + + proj_fit = ProjectionFit(model=self.projection_fit_method) + + x_parameters = proj_fit.fit_projection(x_projection) + y_parameters = proj_fit.fit_projection(y_projection) + + result = ImageProjectionFitResult( + centroid=[x_parameters["mean"], y_parameters["mean"]], + rms_size=[x_parameters["sigma"], y_parameters["sigma"]], + total_intensity=image.sum(), + x_projection_fit_parameters=x_parameters, + y_projection_fit_parameters=y_parameters, + processed_image=image, + projection_fit_method=self.projection_fit_method, + ) + + return result diff --git a/lcls_tools/common/image/processing.py b/lcls_tools/common/image/processing.py index cb2fcd17..86f1c635 100644 --- a/lcls_tools/common/image/processing.py +++ b/lcls_tools/common/image/processing.py @@ -1,5 +1,8 @@ +from copy import copy +from typing import Optional + import numpy as np -from scipy.ndimage import gaussian_filter +from scipy.ndimage import gaussian_filter, median_filter from pydantic import BaseModel, PositiveFloat, ConfigDict from lcls_tools.common.image.roi import ROI @@ -23,9 +26,11 @@ class ImageProcessor(BaseModel): """ model_config = ConfigDict(arbitrary_types_allowed=True) - roi: ROI = None - background_image: np.ndarray = None - threshold: PositiveFloat = 0.0 + roi: Optional[ROI] = None + background_image: Optional[np.ndarray] = None + threshold: Optional[PositiveFloat] = 0.0 + gaussian_filter_size: Optional[PositiveFloat] = None + median_filter_size: Optional[PositiveFloat] = None def subtract_background(self, raw_image: np.ndarray) -> np.ndarray: """Subtract background pixel intensity from a raw image""" @@ -33,24 +38,23 @@ def subtract_background(self, raw_image: np.ndarray) -> np.ndarray: image = raw_image - self.background_image else: image = raw_image - self.threshold - return image - def clip_image(self, image): + # clip images to make sure values are positive return np.clip(image, 0, None) def auto_process(self, raw_image: np.ndarray) -> np.ndarray: """Process image by subtracting background pixel intensity from a raw image, crop, and filter""" + raw_image = copy(raw_image) image = self.subtract_background(raw_image) - clipped_image = self.clip_image(image) if self.roi is not None: - cropped_image = self.roi.crop_image(clipped_image) - else: - cropped_image = clipped_image - processed_image = self.filter(cropped_image) - return processed_image - - def filter(self, unfiltered_image: np.ndarray, sigma=5) -> np.ndarray: - # TODO: extend to other types of filters? Change the way we pass sigma? - filtered_data = gaussian_filter(unfiltered_image, sigma) - return filtered_data + image = self.roi.crop_image(image) + + if self.median_filter_size is not None: + image = median_filter(image, self.median_filter_size) + + # apply smoothing filter if smoothing_factor is specified + if self.gaussian_filter_size is not None: + image = gaussian_filter(image, self.gaussian_filter_size) + + return image diff --git a/lcls_tools/common/measurements/emittance_measurement.py b/lcls_tools/common/measurements/emittance_measurement.py index 4ae7a7b9..b6e34025 100644 --- a/lcls_tools/common/measurements/emittance_measurement.py +++ b/lcls_tools/common/measurements/emittance_measurement.py @@ -15,11 +15,11 @@ class QuadScanEmittance(Measurement): ------------------------ Arguments: energy: beam energy - magnet_collection: MagnetCollection object of magnets for an area of the beamline (use create_magnet()) - magnet_name: name of magnet - magnet_length: length of magnet scan_values: BDES values of magnet to scan over - device_measurement: Measurement object of profile monitor/wire scanner + magnet: Magnet object used to conduct scan + beamsize_measurement: BeamsizeMeasurement object from profile monitor/wire scanner + n_measurement_shots: number of beamsize measurements to make per individual quad + strength ------------------------ Methods: measure: does the quad scan, getting the beam sizes at each scan value, diff --git a/lcls_tools/common/measurements/screen_profile.py b/lcls_tools/common/measurements/screen_profile.py index 6761fd3d..d60ef356 100644 --- a/lcls_tools/common/measurements/screen_profile.py +++ b/lcls_tools/common/measurements/screen_profile.py @@ -1,5 +1,5 @@ from lcls_tools.common.devices.screen import Screen -from lcls_tools.common.data.fit.gaussian_fit import GaussianFit +from lcls_tools.common.image.fit import ImageProjectionFit, ImageFit from lcls_tools.common.measurements.measurement import Measurement from pydantic import ConfigDict @@ -25,24 +25,12 @@ class ScreenBeamProfileMeasurement(Measurement): model_config = ConfigDict(arbitrary_types_allowed=True) name: str = "beam_profile" device: Screen - beam_fit: GaussianFit = GaussianFit() # actually want an additional + beam_fit: ImageFit = ImageProjectionFit() # actually want an additional # layer before GaussianFit so that beam_fit_tool is a generic fit tool # not constrained to be of gaussian fit type fit_profile: bool = True # return_images: bool = True - def single_measure(self) -> dict: - """ - Function that grabs a single image from the device class - (typically live beam images) and passes it to the - image processing class embedded with beam_fit for - processing (subtraction and cropping) - returns a dictionary with both the raw and processed dictionary - """ - raw_image = self.device.image - processed_image = self.beam_fit.processor.auto_process(raw_image) - return {"raw_image": raw_image, "processed_image": processed_image} - def measure(self, n_shots: int = 1) -> dict: """ Measurement function that takes in n_shots as argument @@ -55,12 +43,14 @@ def measure(self, n_shots: int = 1) -> dict: """ images = [] while len(images) < n_shots: - images.append(self.single_measure()) + images.append(self.device.image) + # TODO: need to add a wait statement in here for images to update + + results = {"raw_images": images} if self.fit_profile: - for image_measurement in images: - self.beam_fit.image = image_measurement["processed_image"] - image_measurement.update(self.beam_fit.beamsize) + for image in images: + results.update(self.beam_fit.fit_image(image)) ''' results = {} for image_measurement in images: @@ -75,4 +65,4 @@ def measure(self, n_shots: int = 1) -> dict: for key in {k for meas in images for k in meas} } - return results + return results diff --git a/tests/unit_tests/lcls_tools/common/image/test_processing.py b/tests/unit_tests/lcls_tools/common/image/test_processing.py index f6a9e673..32cd67fe 100644 --- a/tests/unit_tests/lcls_tools/common/image/test_processing.py +++ b/tests/unit_tests/lcls_tools/common/image/test_processing.py @@ -53,7 +53,7 @@ def test_subtract_background(self): image_processor = ImageProcessor(background_image=background_image) image = image_processor.subtract_background(self.image) image = image - background = (self.image - 1) + background = np.clip(self.image - 1, 0, None) np.testing.assert_array_equal( image, background, err_msg=("expected image to equal background " @@ -67,24 +67,9 @@ def test_subtract_background(self): image_processor = ImageProcessor(threshold=1) image = image_processor.subtract_background(self.image) image = image - background = (self.image - 1) + background = np.clip(self.image - 1, 0, None) np.testing.assert_array_equal( image, background, err_msg=("expected image to equal background " + "when applying threshold") ) - - def test_clip(self): - """ - Given an np.ndarray check that when the image_processor - is passed a threshold check that the np.ndarray elements - are clipped at to not drop below zero - """ - image_processor = ImageProcessor(threshold=100) - image = image_processor.subtract_background(self.image) - clipped_image = image_processor.clip_image(image) - np.testing.assert_array_equal( - clipped_image, np.zeros(self.size), - err_msg=("expected clipped image to equal zero " - + "when subtracting background with threshold") - ) diff --git a/tests/unit_tests/lcls_tools/superconducting/test_sc_cryomodule.py b/tests/unit_tests/lcls_tools/superconducting/test_sc_cryomodule.py index b5f5d115..0c53ab02 100644 --- a/tests/unit_tests/lcls_tools/superconducting/test_sc_cryomodule.py +++ b/tests/unit_tests/lcls_tools/superconducting/test_sc_cryomodule.py @@ -6,7 +6,7 @@ class TestCryomodule(TestCase): def test_is_harmonic_linearizer_true(self): - hl = MACHINE.cryomodules[f"H{randint(1,2)}"] + hl = MACHINE.cryomodules[f"H{randint(1, 2)}"] self.assertTrue( hl.is_harmonic_linearizer, msg=f"{hl} is_harmonic_linearizer is not true" )