Heart Rate Monitor

import time
from machine import Pin, I2C
from max30102 import MAX30102
from ssd1306 import SSD1306_I2C

import array

# Initialize I2C
i2c = I2C(0, scl=Pin(22), sda=Pin(21))

# Initialize MAX30102 sensor
sensor = MAX30102(i2c)

# Initialize OLED display
oled = SSD1306_I2C(128, 64, i2c)

# User's age and MHR calculation
age = 60
max_heart_rate = 220 - age  # MHR calculation
min_heart_rate = 40  # Minimum plausible heart rate

# Variables for heart rate calculation
buffer_length = 200  # Increased buffer length
ir_buffer = array.array('i', [0] * buffer_length)
sampling_rate = 25  # Hz
sample_interval = 1 / sampling_rate
sample_count = 0

def mean(values):
    non_zero_values = [v for v in values if v > 0]
    if len(non_zero_values) == 0:
        return 0
    return sum(non_zero_values) / len(non_zero_values)

def moving_average(data, window_size):
    averaged_data = []
    for i in range(len(data)):
        if i < window_size:
            averaged_data.append(data[i])
        else:
            window = data[i - window_size:i]
            averaged_data.append(sum(window) / window_size)
    return averaged_data

def detect_peaks(ir_values):
    smoothed_ir = moving_average(ir_values, window_size=4)
    threshold = mean(smoothed_ir) * 0.8  # Adjusted threshold multiplier
    peaks_indices = []
    
    # Refractory period in samples
    min_interval = 60 / max_heart_rate  # Minimum interval in seconds
    refractory_period = int(min_interval / sample_interval)
    
    last_peak = -refractory_period
    for i in range(1, len(smoothed_ir) - 1):
        if i - last_peak > refractory_period:
            if smoothed_ir[i] > threshold and smoothed_ir[i] > smoothed_ir[i - 1] and smoothed_ir[i] > smoothed_ir[i + 1]:
                peaks_indices.append(i)
                last_peak = i
    return peaks_indices

while True:
    # Read IR data from the sensor
    ir_value = sensor.read_fifo()
    ir_buffer[sample_count % buffer_length] = ir_value
    sample_count += 1

    # Optional: Print IR value for debugging
    # print("IR Value:", ir_value)

    if sample_count % buffer_length == 0:
        # Process the collected data
        ir_values = list(ir_buffer)

        # Exclude zeros from DC removal
        dc_mean = mean(ir_values)
        ir_values = [x - dc_mean for x in ir_values]

        # Detect peaks in the IR signal
        peaks_indices = detect_peaks(ir_values)

        # Calculate heart rate
        if len(peaks_indices) > 1:
            peak_times = [i * sample_interval for i in peaks_indices]
            intervals = [peak_times[i + 1] - peak_times[i] for i in range(len(peak_times) - 1)]
            avg_interval = sum(intervals) / float(len(intervals))
            heart_rate = 60 / avg_interval

            # Validate heart rate range based on your MHR
            if min_heart_rate <= heart_rate <= max_heart_rate:
                valid_heart_rate = heart_rate
            else:
                valid_heart_rate = None
        else:
            valid_heart_rate = None

        # Display the heart rate and target zones
        oled.fill(0)
        oled.text("Heart Rate Monitor", 0, 0)
        if valid_heart_rate:
            oled.text("HR: {:.1f} bpm".format(valid_heart_rate), 0, 20)
            oled.text("Target: {}-{} bpm".format(int(max_heart_rate*0.5), int(max_heart_rate*0.7)), 0, 40)
        else:
            oled.text("HR: Calculating...", 0, 20)
        oled.show()

    # Wait for the next sample
    time.sleep(sample_interval)

Leave a Comment

Your email address will not be published. Required fields are marked *