Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Audio and plotter libraries and demonstrators #110

Merged
merged 4 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions examples/Python/src/Audio/Spectrum.lf
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
target Python {
keepalive: true # Needed because of the physical action in the Microphone reactor.
}

import Microphone from "../lib/Audio.lf"
import VectorPlot from "../lib/Plotters.lf"

preamble {=
import numpy as np
=}

reactor FFT {
input x
output y

reaction(x) -> y {=
s = np.fft.rfft(x.value)
y.set(np.abs(s))
=}
}

main reactor {
m = new Microphone(block_size=1024)
f = new FFT()
p = new VectorPlot(
size = [10, 5],
title="Spectrum",
xlabel = "Frequency (Hz)",
ylabel="Magnitude",
ylim = [0, 10],
xrange = {= [0, 16000/2 + 1, 16000/1024] =})
m.audio_data -> f.x
f.y -> p.y
}
72 changes: 72 additions & 0 deletions examples/Python/src/lib/Audio.lf
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/**
* @file
* @author Vincenzo Barbuto
* @author Edward A. Lee
* @brief Basic audio library for Lingua Franca Python target.
*/
target Python {
keepalive: true
}

/**
* @brief A reactor that produces audio data captured from a microphone.
*
* This reactor outputs a series of numpy arrays of audio samples. The size of each array is equal
* to the `block_size` parameter. Each array element will be a `float32` value if the number of
* channels is 1, or an array of `float32` values if the number of channels is greater than 1.
*
* The logical time of the output is the physical time at which the audio hardware calls a callback
* function, which occurs when a buffer of length block_size has been filled with audio samples.
*
* If your machine has more than one microphone, you can select one by specifying the device index.
* To see what devices are available, run `python3 -m sounddevice`.
*
* To use this reactor, you must install the sounddevice and numpy libraries for Python. You can do
* this with `pip install sounddevice numpy`.
*/
reactor Microphone(block_size=16000, sample_rate=16000, channels=1, device = {= None =}) {
physical action send_audio_data
output audio_data

preamble {=
import sounddevice as sd
import numpy as np
=}

reaction(startup) -> send_audio_data {=
def callback(indata, frames, time, status):
if status:
print(status)
input_data = indata.astype(self.np.float32)
if (len(input_data)):
if self.channels == 1:
input_data = self.np.array(self.np.transpose(input_data)[0])
send_audio_data.schedule(0, input_data)

self.stream = self.sd.InputStream(
channels=self.channels,
samplerate=self.sample_rate,
callback=callback,
blocksize=self.block_size,
device=self.device)
self.stream.start()
=}

reaction(send_audio_data) -> audio_data {=
audio_data.set(send_audio_data.value)
=}

reaction(shutdown) {=
if self.stream:
self.stream.close()
=}
}

/** @brief A test reactor that prints arrays of audio data captured from a microphone. */
main reactor {
m = new Microphone()

reaction(m.audio_data) {=
print(m.audio_data.value, " at time ", lf.time.logical_elapsed())
=}
}
96 changes: 96 additions & 0 deletions examples/Python/src/lib/Plotters.lf
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/**
* @file
* @author Edward A. Lee
* @brief Reactors for plotting signals.
*/
target Python {
timeout: 10 s
}

/**
* @brief A reactor that plots a sequence of vectors, where each vector plot replaces the previous
* one.
*
* The `size` parameter is a tuple of two values: the width and height of the plot in inches.
*
* The `title` parameter is a string that is displayed above the plot. The `xlabel` and `ylabel`
* parameters provide strings to label the x and y axes, respectively.
*
* The `ylim` parameter is a tuple of two values: the lower limit and the upper limit of the y axis.
* If no ylim is specified or it is not a tuple with two value, then the default range is determined
* by the limits of the first vector provided.
*
* The `xrange` parameter is a tuple of three values: the start of the x axis, the end of the x
* axis, and the step size. If `xrange` is `None` (the default), the x axis will be set to the
* length of the first input vector.
*
* To use this reactor, you must install the matplotlib and numpy libraries for Python. You can do
* this with `pip install matplotlib numpy`. See [matplotlib
* documentation](https://matplotlib.org/stable/) for more information.
*/
reactor VectorPlot(
size = {= None =},
title = {= None =},
xlabel = {= None =},
xrange = {= None =},
ylabel = {= None =},
ylim = {= None =}) {
preamble {=
import matplotlib.pyplot as plt
import numpy as np
=}

input y
state showing = False
state figure = {= None =}
state axes = {= None =}
state line1 = {= None =}

reaction(y) {=
if not self.showing:
# First vector to plot. Set up the plot.
# to run GUI event loop
self.plt.ion()
self.figure, self.axes = self.plt.subplots(figsize=self.size)
if (self.title):
self.axes.set_title(self.title)
if (self.xlabel):
self.axes.set_xlabel(self.xlabel)
if (self.ylabel):
self.axes.set_ylabel(self.ylabel)
if (self.ylim) and len(self.ylim) == 2:
self.axes.set_ylim(self.ylim[0], self.ylim[1])

# Determine the x axis.
xrange = self.xrange
if not xrange:
xrange = [0, len(y.value), 1]
x = self.np.arange(xrange[0], xrange[1], xrange[2])

# Plot the data.
self.line1, = self.axes.plot(x, y.value)

self.showing = True
else:
self.line1.set_ydata(y.value)

self.figure.canvas.draw()
self.figure.canvas.flush_events()
=}
}

main reactor {
preamble {=
import numpy as np
=}
state count = 1
timer t(0, 1 s)
v = new VectorPlot(title = "Sine wave", xlabel = "X Axis", ylabel = "Y Axis")

reaction(t) -> v.y {=
x = self.np.linspace(0, 2 * self.np.pi * self.count, 200)
y = self.np.sin(x)
v.y.set(y)
self.count += 1
=}
}
Loading