diff --git a/examples/plotter_base_demo.py b/examples/plotter_base_demo.py new file mode 100644 index 0000000..982db67 --- /dev/null +++ b/examples/plotter_base_demo.py @@ -0,0 +1,100 @@ +import multiprocessing as mp +import time + +import numpy as np +import matplotlib.pyplot as plt + +from pydevdtk.plotting import PlotterManager, PlotterBase + + +def generate_sine_signal( + num_samples, amplitude, frequency, sample_rate, offset=0, phase=0 +): + if sample_rate <= 0: + raise ValueError("Sample rate can't be negative or zero") + if frequency <= 0: + raise ValueError("Frequency can't be negative or zero") + if frequency > sample_rate / 2: + raise ValueError("Frequency is greater than sample_rate/2") + time = np.arange(num_samples) / sample_rate + signal = amplitude * np.sin(2 * np.pi * frequency * time + phase) + offset + return signal + + +# create custom PlotterBase +class Plotter(PlotterBase): + def __init__(self, num_points=50): + self.num_points = num_points + + def init(self): + fig, axs = plt.subplots(3, 1, figsize=(8, 5), constrained_layout=True) + + self.y_sin = self.num_points * [np.nan] + self.y_cos = self.num_points * [np.nan] + self.y_rand = self.num_points * [np.nan] + + # create artists + self.sin_artist = axs[0].plot(self.y_sin, color="C0")[0] + self.cos_artist = axs[1].plot(self.y_cos, color="C1")[0] + self.rand_artist = axs[2].plot(self.y_rand, color="C2")[0] + + # modify axes properties + for ax in axs: + ax.set_xlim([0, self.num_points - 1]) + for ax in axs: + ax.set_xticks([0, self.num_points / 2, self.num_points]) + axs[0].set_title("sin") + axs[1].set_title("cos") + axs[2].set_title("rand") + axs[0].set_ylim([-1.1, 1.1]) + axs[1].set_ylim([-1.1, 1.1]) + axs[2].set_ylim([999, 5001]) + + # ensure this is called! + self.add_figure_and_artists( + fig, [self.sin_artist, self.cos_artist, self.rand_artist] + ) + + def process_data_queue(self): + while not self.data_queue.empty(): + sin, cos, rand = self.data_queue.get() + self.y_sin.append(sin) + self.y_cos.append(cos) + self.y_rand.append(rand) + self.y_sin = self.y_sin[-self.num_points :] + self.y_cos = self.y_cos[-self.num_points :] + self.y_rand = self.y_rand[-self.num_points :] + self.sin_artist.set_ydata(self.y_sin) + self.cos_artist.set_ydata(self.y_cos) + self.rand_artist.set_ydata(self.y_rand) + + +def main(): + fs = 50 + num_samples = 100 + sin_wave = generate_sine_signal(num_samples, 1, 5, fs) + cos_wave = generate_sine_signal(num_samples, 1, 2, fs, 0, np.pi / 2) + i_sin = 0 + i_cos = 0 + plotter_manager = PlotterManager(Plotter(num_samples)) + plotter_manager.show() + while plotter_manager.is_shown(): + data = [ + sin_wave[i_sin], + cos_wave[i_cos], + np.random.randint(1000, 5000 + 1), + ] + plotter_manager.add_data(data) + i_sin += 1 + i_cos += 1 + if i_sin >= len(sin_wave): + i_sin = 0 + if i_cos >= len(cos_wave): + i_cos = 0 + time.sleep(1 / 50) + plotter_manager.stop() + + +if __name__ == "__main__": + mp.freeze_support() + main() diff --git a/src/pydevdtk/plotting/__init__.py b/src/pydevdtk/plotting/__init__.py index bd4266a..ed382ff 100644 --- a/src/pydevdtk/plotting/__init__.py +++ b/src/pydevdtk/plotting/__init__.py @@ -1,4 +1,5 @@ from .plotter import Plotter +from .plotter_base import Plotter as PlotterBase from .plotter_manager import PlotterManager -__all__ = ["Plotter", "PlotterManager"] +__all__ = ["Plotter", "PlotterBase", "PlotterManager"] diff --git a/src/pydevdtk/plotting/plotter_base.py b/src/pydevdtk/plotting/plotter_base.py new file mode 100644 index 0000000..1fe3485 --- /dev/null +++ b/src/pydevdtk/plotting/plotter_base.py @@ -0,0 +1,102 @@ +import time + +import matplotlib.pyplot as plt + + +class Plotter: + def __call__( + self, cmd_queue, data_queue, stop_event, plot_closed_event, fps=None + ): + self.cmd_queue = cmd_queue + self.data_queue = data_queue + self.stop_event = stop_event + self.plot_closed_event = plot_closed_event + self.figs = [] + self.axs = {} + self.event_processing = False + self.init() + if len(self.figs) == 0: + raise RuntimeError( + "No figure added, " + "ensure `add_figure` is called at least once in `init`" + ) + t_start = time.time() + while not self.stop_event.is_set(): + self.process_cmd_queue() + self.process_data_queue() + self.process_events() + if fps is not None and time.time() - t_start < 1 / fps: + continue + self.update_figures() + t_start = time.time() + + while self.cmd_queue.get() is not None: + self.cmd_queue.get() + while self.data_queue.get() is not None: + self.data_queue.get() + + def init(self): + raise NotImplementedError("init method not implemented") + + def process_data_queue(self): + raise NotImplementedError("process_data_queue method not implemented") + + def process_cmd_queue(self): + while not self.cmd_queue.empty(): + cmd = self.cmd_queue.get() + if cmd is None: + # stop + pass + if cmd[0] == "show": + self.show() + elif cmd[0] == "close": + self.close() + + def add_figure_and_artists(self, fig, artists): + fig.canvas.draw() + bg = fig.canvas.copy_from_bbox(fig.bbox) + self.figs.append([fig, bg, artists]) + fig.canvas.mpl_connect("draw_event", self.on_draw) + # exclude artists from regular redraw + for artist in artists: + artist.set_animated(True) + + def process_events(self): + if self.event_processing: + is_any_plot_present = False + for fig, _, _ in self.figs: + if plt.fignum_exists(fig.number): + is_any_plot_present = True + fig.canvas.flush_events() + if not is_any_plot_present: + self.plot_closed_event.set() + + def update_figures(self): + for fig, bg, artists in self.figs: + fig.canvas.restore_region(bg) + for artist in artists: + fig.draw_artist(artist) + fig.canvas.blit(fig.bbox) + + def on_draw(self, event): + if event is not None: + bg = event.canvas.copy_from_bbox(event.canvas.figure.bbox) + i_fig = next( + i + for i, (fig, _, _) in enumerate(self.figs) + if fig == event.canvas.figure + ) + self.figs[i_fig][1] = bg + + def show(self): + plt.show(block=False) + for i_fig, (fig, _, _) in enumerate(self.figs): + bg = fig.canvas.copy_from_bbox(fig.bbox) + fig.canvas.mpl_connect("draw_event", self.on_draw) + self.figs[i_fig][1] = bg + self.plot_closed_event.clear() + self.event_processing = True + + def close(self): + self.event_processing = False + plt.close("all") diff --git a/src/pydevdtk/plotting/plotter_manager.py b/src/pydevdtk/plotting/plotter_manager.py index cca7995..3d9bbaa 100644 --- a/src/pydevdtk/plotting/plotter_manager.py +++ b/src/pydevdtk/plotting/plotter_manager.py @@ -2,10 +2,11 @@ import warnings from .plotter import Plotter +from .plotter_base import Plotter as PlotterBase class PlotterManager: - def __init__(self, plotter: Plotter, fps: int | None = None): + def __init__(self, plotter: Plotter | PlotterBase, fps: int | None = None): self.cmd_queue = mp.Queue() self.data_queue = mp.Queue() self.stop_event = mp.Event()