-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add inheritance based plotter,
PlotterBase
and demo example
User has bigger flexibility for creating axis, plots and figures
- Loading branch information
Showing
4 changed files
with
206 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,100 @@ | ||
import multiprocessing as mp | ||
import time | ||
|
||
import numpy as np | ||
import matplotlib.pyplot as plt | ||
|
||
from pydevdtk.plotting import PlotterManager, PlotterBase | ||
|
||
|
||
def generate_sine_signal( | ||
num_samples, amplitude, frequency, sample_rate, offset=0, phase=0 | ||
): | ||
if sample_rate <= 0: | ||
raise ValueError("Sample rate can't be negative or zero") | ||
if frequency <= 0: | ||
raise ValueError("Frequency can't be negative or zero") | ||
if frequency > sample_rate / 2: | ||
raise ValueError("Frequency is greater than sample_rate/2") | ||
time = np.arange(num_samples) / sample_rate | ||
signal = amplitude * np.sin(2 * np.pi * frequency * time + phase) + offset | ||
return signal | ||
|
||
|
||
# create custom PlotterBase | ||
class Plotter(PlotterBase): | ||
def __init__(self, num_points=50): | ||
self.num_points = num_points | ||
|
||
def init(self): | ||
fig, axs = plt.subplots(3, 1, figsize=(8, 5), constrained_layout=True) | ||
|
||
self.y_sin = self.num_points * [np.nan] | ||
self.y_cos = self.num_points * [np.nan] | ||
self.y_rand = self.num_points * [np.nan] | ||
|
||
# create artists | ||
self.sin_artist = axs[0].plot(self.y_sin, color="C0")[0] | ||
self.cos_artist = axs[1].plot(self.y_cos, color="C1")[0] | ||
self.rand_artist = axs[2].plot(self.y_rand, color="C2")[0] | ||
|
||
# modify axes properties | ||
for ax in axs: | ||
ax.set_xlim([0, self.num_points - 1]) | ||
for ax in axs: | ||
ax.set_xticks([0, self.num_points / 2, self.num_points]) | ||
axs[0].set_title("sin") | ||
axs[1].set_title("cos") | ||
axs[2].set_title("rand") | ||
axs[0].set_ylim([-1.1, 1.1]) | ||
axs[1].set_ylim([-1.1, 1.1]) | ||
axs[2].set_ylim([999, 5001]) | ||
|
||
# ensure this is called! | ||
self.add_figure_and_artists( | ||
fig, [self.sin_artist, self.cos_artist, self.rand_artist] | ||
) | ||
|
||
def process_data_queue(self): | ||
while not self.data_queue.empty(): | ||
sin, cos, rand = self.data_queue.get() | ||
self.y_sin.append(sin) | ||
self.y_cos.append(cos) | ||
self.y_rand.append(rand) | ||
self.y_sin = self.y_sin[-self.num_points :] | ||
self.y_cos = self.y_cos[-self.num_points :] | ||
self.y_rand = self.y_rand[-self.num_points :] | ||
self.sin_artist.set_ydata(self.y_sin) | ||
self.cos_artist.set_ydata(self.y_cos) | ||
self.rand_artist.set_ydata(self.y_rand) | ||
|
||
|
||
def main(): | ||
fs = 50 | ||
num_samples = 100 | ||
sin_wave = generate_sine_signal(num_samples, 1, 5, fs) | ||
cos_wave = generate_sine_signal(num_samples, 1, 2, fs, 0, np.pi / 2) | ||
i_sin = 0 | ||
i_cos = 0 | ||
plotter_manager = PlotterManager(Plotter(num_samples)) | ||
plotter_manager.show() | ||
while plotter_manager.is_shown(): | ||
data = [ | ||
sin_wave[i_sin], | ||
cos_wave[i_cos], | ||
np.random.randint(1000, 5000 + 1), | ||
] | ||
plotter_manager.add_data(data) | ||
i_sin += 1 | ||
i_cos += 1 | ||
if i_sin >= len(sin_wave): | ||
i_sin = 0 | ||
if i_cos >= len(cos_wave): | ||
i_cos = 0 | ||
time.sleep(1 / 50) | ||
plotter_manager.stop() | ||
|
||
|
||
if __name__ == "__main__": | ||
mp.freeze_support() | ||
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,5 @@ | ||
from .plotter import Plotter | ||
from .plotter_base import Plotter as PlotterBase | ||
from .plotter_manager import PlotterManager | ||
|
||
__all__ = ["Plotter", "PlotterManager"] | ||
__all__ = ["Plotter", "PlotterBase", "PlotterManager"] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
import time | ||
|
||
import matplotlib.pyplot as plt | ||
|
||
|
||
class Plotter: | ||
def __call__( | ||
self, cmd_queue, data_queue, stop_event, plot_closed_event, fps=None | ||
): | ||
self.cmd_queue = cmd_queue | ||
self.data_queue = data_queue | ||
self.stop_event = stop_event | ||
self.plot_closed_event = plot_closed_event | ||
self.figs = [] | ||
self.axs = {} | ||
self.event_processing = False | ||
self.init() | ||
if len(self.figs) == 0: | ||
raise RuntimeError( | ||
"No figure added, " | ||
"ensure `add_figure` is called at least once in `init`" | ||
) | ||
t_start = time.time() | ||
while not self.stop_event.is_set(): | ||
self.process_cmd_queue() | ||
self.process_data_queue() | ||
self.process_events() | ||
if fps is not None and time.time() - t_start < 1 / fps: | ||
continue | ||
self.update_figures() | ||
t_start = time.time() | ||
|
||
while self.cmd_queue.get() is not None: | ||
self.cmd_queue.get() | ||
while self.data_queue.get() is not None: | ||
self.data_queue.get() | ||
|
||
def init(self): | ||
raise NotImplementedError("init method not implemented") | ||
|
||
def process_data_queue(self): | ||
raise NotImplementedError("process_data_queue method not implemented") | ||
|
||
def process_cmd_queue(self): | ||
while not self.cmd_queue.empty(): | ||
cmd = self.cmd_queue.get() | ||
if cmd is None: | ||
# stop | ||
pass | ||
if cmd[0] == "show": | ||
self.show() | ||
elif cmd[0] == "close": | ||
self.close() | ||
|
||
def add_figure_and_artists(self, fig, artists): | ||
fig.canvas.draw() | ||
bg = fig.canvas.copy_from_bbox(fig.bbox) | ||
self.figs.append([fig, bg, artists]) | ||
fig.canvas.mpl_connect("draw_event", self.on_draw) | ||
# exclude artists from regular redraw | ||
for artist in artists: | ||
artist.set_animated(True) | ||
|
||
def process_events(self): | ||
if self.event_processing: | ||
is_any_plot_present = False | ||
for fig, _, _ in self.figs: | ||
if plt.fignum_exists(fig.number): | ||
is_any_plot_present = True | ||
fig.canvas.flush_events() | ||
if not is_any_plot_present: | ||
self.plot_closed_event.set() | ||
|
||
def update_figures(self): | ||
for fig, bg, artists in self.figs: | ||
fig.canvas.restore_region(bg) | ||
for artist in artists: | ||
fig.draw_artist(artist) | ||
fig.canvas.blit(fig.bbox) | ||
|
||
def on_draw(self, event): | ||
if event is not None: | ||
bg = event.canvas.copy_from_bbox(event.canvas.figure.bbox) | ||
i_fig = next( | ||
i | ||
for i, (fig, _, _) in enumerate(self.figs) | ||
if fig == event.canvas.figure | ||
) | ||
self.figs[i_fig][1] = bg | ||
|
||
def show(self): | ||
plt.show(block=False) | ||
for i_fig, (fig, _, _) in enumerate(self.figs): | ||
bg = fig.canvas.copy_from_bbox(fig.bbox) | ||
fig.canvas.mpl_connect("draw_event", self.on_draw) | ||
self.figs[i_fig][1] = bg | ||
self.plot_closed_event.clear() | ||
self.event_processing = True | ||
|
||
def close(self): | ||
self.event_processing = False | ||
plt.close("all") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters