Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updates the examples adding HR monitor #23

Merged
merged 1 commit into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
File renamed without changes.
File renamed without changes.
24 changes: 23 additions & 1 deletion example/main.py → examples/basic_usage/main.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,26 @@
# main.py
""" BASIC USAGE EXAMPLE
This example shows how to use the MAX30102 sensor to collect data from the RED and IR channels.

The sensor is connected to the I2C bus, and the I2C bus is scanned to ensure that the sensor is connected.
The sensor is also checked to ensure that it is a MAX30102 or MAX30105 sensor.

The sensor is set up with the following parameters:
- Sample rate: 400 Hz
- Averaged samples: 8
- LED brightness: medium
- Pulse width: 411 µs
- Led mode: 2 (RED + IR)

The temperature is read at the beginning of the acquisition.

Then, in a loop the data is printed to the serial port, so that it can be plotted with a Serial Plotter.
Also the real acquisition frequency (i.e. the rate at which samples are collected from the sensor) is computed
and printed to the serial port. It differs from the sample rate, because the sensor processed the data and
averages the samples before putting them into the FIFO queue (by default, 8 samples are averaged).

Author: n-elia
"""

# Some ports need to import 'sleep' from 'time' module
from machine import sleep, SoftI2C, Pin
from utime import ticks_diff, ticks_us
Expand Down
60 changes: 60 additions & 0 deletions examples/heart_rate/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Heart Rate Monitor Example

## Overview

The `HeartRateMonitor` class is designed to calculate heart rate from the raw sensor readings of a MAX30102 pulse oximeter and heart-rate sensor, tailored for use in a MicroPython environment on an ESP32 board. It continuously processes a stream of raw integer readings from the sensor, identifies heartbeats by detecting peaks in the signal, and calculates the heart rate based on the intervals between consecutive peaks.

## How It Works

- Input: The class expects individual raw sensor readings (integer values) as input, provided to it through the `add_sample` method. These readings should come from the IR or green LEDs of the MAX30102 sensor, continuously polled at a consistent rate.

- Signal Processing:

- **Smoothing**: The input signal is first smoothed using a moving average filter to reduce high-frequency noise. This step is crucial for accurate peak detection.

- **Peak Detection**: The algorithm then identifies peaks in the smoothed signal using a dynamic thresholding method. A peak represents a heartbeat.

- Heart Rate Calculation: Once peaks are identified, the class calculates the heart rate by averaging the time intervals between consecutive peaks. The result is expressed in beats per minute (BPM).

## Parameters

- `sample_rate` (int): Defines the rate at which samples are collected from the sensor, in samples per second (Hz). This rate should match the polling frequency of the sensor in your application.

- `window_size` (int): Determines the number of samples over which to perform peak detection and heart rate calculation. A larger `window_size` can improve accuracy by considering more data but may also increase computation time and reduce responsiveness to changes in heart rate. Typically set based on the expected range of heart rates and the sample rate.

- `smoothing_window` (int): Specifies the size of the moving average filter window for signal smoothing. A larger window will produce a smoother signal but may also dilute the signal's peaks, potentially affecting peak detection accuracy. The optimal size often depends on the level of noise in the signal and the sample rate.

### Setting the Parameters

- `sample_rate`: Set this to match the frequency at which you're polling the MAX30102 sensor. Common values are 50, 100, or 200 Hz, depending on your application's requirements for data granularity and responsiveness.

- `window_size`: Start with a value that covers 1 to 2 seconds of data, based on your sample_rate. For example, at 100 Hz, a window size of 100 to 200 samples might be appropriate. Adjust based on testing, considering the balance between accuracy and responsiveness.

- `smoothing_window`: Begin with a small window, such as 5 to 10 samples, and adjust based on the noise level observed in your sensor data. The goal is to smooth out high-frequency noise without significantly delaying the detection of true heartbeats.

## Expected Input and Results

- Input: Continuous integer readings from the MAX30102 sensor, added one at a time via the add_sample method.

- Output: The heart rate in BPM, calculated periodically by calling the calculate_heart_rate method. This method returns None if not enough data is present to accurately calculate the heart rate.

## Example Usage

python
Copy code
hr_monitor = HeartRateMonitor(sample_rate=100, window_size=150, smoothing_window=10)

```python
# Add samples in a loop (replace the sample polling with actual sensor data retrieval)
for _ in range(1000): # Example loop
sample = ... # Poll the MAX30102/5 sensor to get a new sample
hr_monitor.add_sample(sample)
# Optionally, sleep or wait based on your polling frequency

# Calculate and print the heart rate
heart_rate = hr_monitor.calculate_heart_rate()
if heart_rate is not None:
print(f"Heart Rate: {heart_rate:.2f} BPM")
else:
print("Not enough data to calculate heart rate")
```
36 changes: 36 additions & 0 deletions examples/heart_rate/boot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# This file is executed on every boot (including wake-boot from deep sleep)

def do_connect(ssid: str, password: str):
import network
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
print('connecting to network...')
wlan.connect(ssid, password)
while not wlan.isconnected():
pass
print('network config:', wlan.ifconfig())


if __name__ == '__main__':
# Put yor Wi-Fi credentials here
my_ssid = "my_ssid"
my_pass = "my_password"

# Check if the module is available in memory
try:
from max30102 import MAX30102
except ImportError as e:
# Module not available. Try to connect to Internet to download it.
print(f"Import error: {e}")
print("Trying to connect to the Internet to download the module.")
do_connect(my_ssid, my_pass)
try:
# Try to leverage upip package manager to download the module.
import upip
upip.install("micropython-max30102")
except ImportError:
# upip not available. Try to leverage mip package manager to download the module.
print("upip not available in this port. Trying with mip.")
import mip
mip.install("github:n-elia/MAX30102-MicroPython-driver")
5 changes: 5 additions & 0 deletions examples/heart_rate/lib/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Note

To manually install the library, copy the files `max30102/circular_buffer.py`, `max30102/max30102.py` and past them here.

Then, load the content of `example` directory into the board.
189 changes: 189 additions & 0 deletions examples/heart_rate/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
# main.py
# Some ports need to import 'sleep' from 'time' module
from machine import sleep, SoftI2C, Pin
from utime import ticks_diff, ticks_us, ticks_ms

from max30102 import MAX30102, MAX30105_PULSE_AMP_MEDIUM


class HeartRateMonitor:
"""A simple heart rate monitor that uses a moving window to smooth the signal and find peaks."""

def __init__(self, sample_rate=100, window_size=10, smoothing_window=5):
self.sample_rate = sample_rate
self.window_size = window_size
self.smoothing_window = smoothing_window
self.samples = []
self.timestamps = []
self.filtered_samples = []

def add_sample(self, sample):
"""Add a new sample to the monitor."""
timestamp = ticks_ms()
self.samples.append(sample)
self.timestamps.append(timestamp)

# Apply smoothing
if len(self.samples) >= self.smoothing_window:
smoothed_sample = (
sum(self.samples[-self.smoothing_window :]) / self.smoothing_window
)
self.filtered_samples.append(smoothed_sample)
else:
self.filtered_samples.append(sample)

# Maintain the size of samples and timestamps
if len(self.samples) > self.window_size:
self.samples.pop(0)
self.timestamps.pop(0)
self.filtered_samples.pop(0)

def find_peaks(self):
"""Find peaks in the filtered samples."""
peaks = []

if len(self.filtered_samples) < 3: # Need at least three samples to find a peak
return peaks

# Calculate dynamic threshold based on the min and max of the recent window of filtered samples
recent_samples = self.filtered_samples[-self.window_size :]
min_val = min(recent_samples)
max_val = max(recent_samples)
threshold = (
min_val + (max_val - min_val) * 0.5
) # 50% between min and max as a threshold

for i in range(1, len(self.filtered_samples) - 1):
if (
self.filtered_samples[i] > threshold
and self.filtered_samples[i - 1] < self.filtered_samples[i]
and self.filtered_samples[i] > self.filtered_samples[i + 1]
):
peak_time = self.timestamps[i]
peaks.append((peak_time, self.filtered_samples[i]))

return peaks

def calculate_heart_rate(self):
"""Calculate the heart rate in beats per minute (BPM)."""
peaks = self.find_peaks()

if len(peaks) < 2:
return None # Not enough peaks to calculate heart rate

# Calculate the average interval between peaks in milliseconds
intervals = []
for i in range(1, len(peaks)):
interval = ticks_diff(peaks[i][0], peaks[i - 1][0])
intervals.append(interval)

average_interval = sum(intervals) / len(intervals)

# Convert intervals to heart rate in beats per minute (BPM)
heart_rate = (
60000 / average_interval
) # 60 seconds per minute * 1000 ms per second

return heart_rate


def main():
# I2C software instance
i2c = SoftI2C(
sda=Pin(8), # Here, use your I2C SDA pin
scl=Pin(9), # Here, use your I2C SCL pin
freq=400000,
) # Fast: 400kHz, slow: 100kHz

# Examples of working I2C configurations:
# Board | SDA pin | SCL pin
# ------------------------------------------
# ESP32 D1 Mini | 22 | 21
# TinyPico ESP32 | 21 | 22
# Raspberry Pi Pico | 16 | 17
# TinyS3 | 8 | 9

# Sensor instance
sensor = MAX30102(i2c=i2c) # An I2C instance is required

# Scan I2C bus to ensure that the sensor is connected
if sensor.i2c_address not in i2c.scan():
print("Sensor not found.")
return
elif not (sensor.check_part_id()):
# Check that the targeted sensor is compatible
print("I2C device ID not corresponding to MAX30102 or MAX30105.")
return
else:
print("Sensor connected and recognized.")

# Load the default configuration
print("Setting up sensor with default configuration.", "\n")
sensor.setup_sensor()

# Set the sample rate to 400: 400 samples/s are collected by the sensor
sensor_sample_rate = 400
sensor.set_sample_rate(sensor_sample_rate)

# Set the number of samples to be averaged per each reading
sensor_fifo_average = 8
sensor.set_fifo_average(sensor_fifo_average)

# Set LED brightness to a medium value
sensor.set_active_leds_amplitude(MAX30105_PULSE_AMP_MEDIUM)

# Expected acquisition rate: 400 Hz / 8 = 50 Hz
actual_acquisition_rate = int(sensor_sample_rate / sensor_fifo_average)

sleep(1)

print(
"Starting data acquisition from RED & IR registers...",
"press Ctrl+C to stop.",
"\n",
)
sleep(1)

# Initialize the heart rate monitor
hr_monitor = HeartRateMonitor(
# Select a sample rate that matches the sensor's acquisition rate
sample_rate=actual_acquisition_rate,
# Select a significant window size to calculate the heart rate (2-5 seconds)
window_size=int(actual_acquisition_rate * 3),
)

# Setup to calculate the heart rate every 2 seconds
hr_compute_interval = 2 # seconds
ref_time = ticks_ms() # Reference time

while True:
# The check() method has to be continuously polled, to check if
# there are new readings into the sensor's FIFO queue. When new
# readings are available, this function will put them into the storage.
sensor.check()

# Check if the storage contains available samples
if sensor.available():
# Access the storage FIFO and gather the readings (integers)
red_reading = sensor.pop_red_from_storage()
ir_reading = sensor.pop_ir_from_storage()

# Add the IR reading to the heart rate monitor
# Note: based on the skin color, the red, IR or green LED can be used
# to calculate the heart rate with more accuracy.
hr_monitor.add_sample(ir_reading)

# Periodically calculate the heart rate every `hr_compute_interval` seconds
if ticks_diff(ticks_ms(), ref_time) / 1000 > hr_compute_interval:
# Calculate the heart rate
heart_rate = hr_monitor.calculate_heart_rate()
if heart_rate is not None:
print("Heart Rate: {:.0f} BPM".format(heart_rate))
else:
print("Not enough data to calculate heart rate")
# Reset the reference time
ref_time = ticks_ms()


if __name__ == "__main__":
main()
Loading