Signal Processing Examples - CircuitPython¶
N.B. this is new and still being tested.
The following Python samples demonstrate several single-channel filters for processing sensor data. The filter functions are purely numeric operations and should work on any Python or CircuitPython system. This supports offline testing, as they can be debugged and evaluated on a normal desktop computer.
The code is written for clarity, not efficiency. In many cases significant speedup could result from using ulab or numpy to apply filters to sample vectors or matrices.
The source files can be browsed in raw form in the CircuitPython signals code folder. The files may be also downloaded in a single archive file as signals.zip.
linear.py¶
An important step in signal processing is applying a calibration
transformation to translate raw values received from an analog to digital
converter (ADC) into repeatable and meaningful units. For linear sensors this
can be a two-parameter linear mapping which applies an offset and scaling. One
convenient implementation is the map
function.
- map(x, in_min, in_max, out_min, out_max)¶
Map an input x from range (in_min, in_max) to range (out_min, out_max). This is a Python implementation of the Arduino map() function. Works on either integers or floating point.
- constrain(x, a, b)¶
Constrains a number x to be within range (a, b). Python implementation of the Arduino constrain() function. N.B. included here for a convenience, but this is not a linear function.
1# linear.py : platform-independent linear transforms
2# No copyright, 2020-2021, Garth Zeglin. This file is explicitly placed in the public domain.
3
4def map(x, in_min, in_max, out_min, out_max):
5 """Map an input x from range (in_min, in_max) to range (out_min, out_max). This
6 is a Python implementation of the Arduino map() function. Works on either
7 integers or floating point.
8 """
9 divisor = in_max - in_min
10
11 if divisor == 0:
12 return out_min
13 else:
14 return ((x - in_min) * (out_max - out_min) / divisor) + out_min
15
16def constrain(x, a, b):
17 """Constrains a number x to be within range (a, b). Python implementation
18 of the Arduino constrain() function."""
19 return min(max(x, a), b)
20
statistics.py¶
- class CentralMeasures¶
Object encapsulating a set of accumulators for calculating the mean, variance, min, and max of a stream of values. All properties are available as object attributes (i.e. instance variables).
1# statistics.py : compute single-channel central measures in an accumulator
2# No copyright, 2009-2021, Garth Zeglin. This file is explicitly placed in the public domain.
3
4import math
5
6# CircuitPython lacks math.inf, here is a workaround:
7try:
8 inf_value = math.inf
9except AttributeError:
10 inf_value = math.exp(10000)
11
12
13class CentralMeasures:
14 def __init__(self):
15 """Object to accumulate central measures statistics on a single channel of data."""
16
17 self.samples = 0 # running sum of value^0, i.e., the number of samples
18 self.total = 0 # running sum of value^1, i.e., the accumulated total
19 self.squared = 0 # running sum of value^2, i.e., the accumulated sum of squares
20 self.minval = inf_value # smallest input seen
21 self.maxval = -inf_value # largest input seen
22 self.last = 0 # most recent input
23
24 # computed statistics
25 self.average = 0 # mean value
26 self.variance = 0 # square of the standard deviation
27
28 return
29
30 def update(self, value):
31 """Apply a new sample to the accumulators and update the computed statistics.
32 Returns the computed average and variance as a tuple."""
33 self.total += value
34 self.squared += value * value
35
36 if value < self.minval:
37 self.minval = value
38
39 if value > self.maxval:
40 self.maxval = value
41
42 self.samples += 1
43 self.last = value
44
45 # recalculate average and variance
46 if self.samples > 0:
47 self.average = self.total / self.samples
48
49 if self.samples > 1:
50 # The "standard deviation of the sample", which is only correct
51 # if the population is normally distributed and a large sample
52 # is available, otherwise tends to be too low:
53 # self.sigma = math.sqrt((self.samples * self.squared - self.total*self.total) /
54 # self.samples))
55
56 # Instead compute the "sample standard deviation", an unbiased
57 # estimator for the variance. The standard deviation is the
58 # square root of the variance.
59 self.variance = ((self.samples * self.squared - self.total*self.total) /
60 (self.samples * (self.samples - 1)))
61
62 return self.average, self.variance
hysteresis.py¶
Hysteresis is the behavior exhibited by a system with state dependent on its history. In physical systems it indicates the presence of hidden state, e.g., the magnetization of a material. In a filter it can provide useful non-linear behavior to discretize data or eliminate outliers.
- class Hysteresis¶
Filter to quantize an input stream into a binary state. Dual thresholds are needed to implement hysteresis: the input needs to rise above the upper threshold to trigger a high output, then drop below the input threshold to return to the low output. One bit of state is required.
- class Suppress¶
Fiilter to suppress a specific value in an input stream.
- class Debounce¶
Filter to ‘debounce’ an integer stream by suppressing changes from the previous value until a specific new value has been observed a minimum number of times.
1# hysteresis.py : platform-independent non-linear filters
2
3# No copyright, 2020-2011, Garth Zeglin. This file is explicitly placed in
4# the public domain.
5#--------------------------------------------------------------------------------
6class Hysteresis:
7 def __init__(self, lower=16000, upper=48000):
8 """Filter to quantize an input stream into a binary state. Dual thresholds are
9 needed to implement hysteresis: the input needs to rise above the upper
10 threshold to trigger a high output, then drop below the input threshold
11 to return to the low output. One bit of state is required.
12 """
13 self.lower = lower
14 self.upper = upper
15 self.state = False
16
17 def update(self, input):
18 """Apply a new sample value to the quantizing filter. Returns a boolean state."""
19 if self.state is True:
20 if input < self.lower:
21 self.state = False
22 else:
23 if input > self.upper:
24 self.state = True
25
26 return self.state
27
28#--------------------------------------------------------------------------------
29class Suppress:
30 def __init__(self, value=0):
31 """Filter to suppress a specific value in an input stream."""
32 self.suppress = value
33 self.previous = None
34
35 def update(self, input):
36 if input != self.suppress:
37 self.previous = input
38 return self.previous
39
40#--------------------------------------------------------------------------------
41class Debounce:
42 def __init__(self, samples=5):
43 """Filter to 'debounce' an integer stream by suppressing changes from the previous value
44 until a specific new value has been observed a minimum number of times."""
45
46 self.samples = samples # number of samples required to change
47 self.current_value = 0 # current stable value
48 self.new_value = None # possible new value
49 self.count = 0 # count of new values observed
50
51 def update(self, input):
52 if input == self.current_value:
53 # if the input is unchanged, keep the counter at zero
54 self.count = 0
55 self.new_value = None
56 else:
57 if input != self.new_value:
58 # start a new count
59 self.new_value = input
60 self.count = 1
61 else:
62 # count repeated changes
63 self.count += 1
64 if self.count >= self.samples:
65 # switch state after a sufficient number of changes
66 self.current_value = self.new_value
67 self.count = 0
68 self.new_value = None
69
70 return self.current_value
smoothing.py¶
- class Smoothing¶
Filter to smooth an input signal using a first-order filter. One state value is required. The smaller the coefficient, the smoother the output.
- class MovingAverage¶
Filter to smooth a signal by averaging over multiple samples. The recent time history (the ‘moving window’) is kept in an array along with a running total. The window size determines how many samples are held in memory and averaged together.
1# smoothing.py : platform-independent first-order smoothing filter
2# No copyright, 2020-2021, Garth Zeglin. This file is explicitly placed in the public domain.
3
4class Smoothing:
5 def __init__(self, coeff=0.1):
6 """Filter to smooth an input signal using a first-order filter. One state value
7 is required. The smaller the coefficient, the smoother the output."""
8 self.coeff = coeff
9 self.value = 0
10
11 def update(self, input):
12 # compute the error between the input and the accumulator
13 difference = input - self.value
14
15 # apply a constant coefficient to move the smoothed value toward the input
16 self.value += self.coeff * difference
17
18 return self.value
19
20#--------------------------------------------------------------------------------
21class MovingAverage:
22 def __init__(self, window_size=5):
23 """Filter to smooth a signal by averaging over multiple samples. The recent
24 time history (the 'moving window') is kept in an array along with a
25 running total. The window size determines how many samples are held in
26 memory and averaged together.
27 """
28 self.window_size = window_size
29 self.ring = [0] * window_size # ring buffer for recent time history
30 self.oldest = 0 # index of oldest sample
31 self.total = 0 # sum of all values in the buffer
32
33 def update(self, input):
34 # subtract the oldest sample from the running total before overwriting
35 self.total = self.total - self.ring[self.oldest]
36
37 # save the new sample by overwriting the oldest sample
38 self.ring[self.oldest] = input
39
40 # advance to the next position, wrapping around as needed
41 self.oldest += 1
42 if self.oldest >= self.window_size:
43 self.oldest = 0
44
45 # add the new input value to the running total
46 self.total = self.total + input
47
48 # calculate and return the average
49 return self.total / self.window_size
median.py¶
- class MedianFilter¶
Non-linear filter to reduce signal outliers by returning the median value of the recent history. The window size determines how many samples are held in memory. An input change is typically delayed by half the window width. This filter is useful for throwing away isolated outliers, especially glitches out of range.
1# median.py : platform-independent median filter
2# No copyright, 2020-2021, Garth Zeglin. This file is explicitly placed in the public domain.
3
4class MedianFilter:
5 def __init__(self, window_size=5):
6 """Non-linear filter to reduce signal outliers by returning the median value
7 of the recent history. The window size determines how many samples
8 are held in memory. An input change is typically delayed by half the
9 window width. This filter is useful for throwing away isolated
10 outliers, especially glitches out of range.
11 """
12 self.window_size = window_size
13 self.ring = [0] * window_size # ring buffer for recent time history
14 self.oldest = 0 # index of oldest sample
15
16 def update(self, input):
17 # save the new sample by overwriting the oldest sample
18 self.ring[self.oldest] = input
19 self.oldest += 1
20 if self.oldest >= self.window_size:
21 self.oldest = 0
22
23 # create a new sorted array from the ring buffer values
24 in_order = sorted(self.ring)
25
26 # return the value in the middle
27 return in_order[self.window_size//2]
biquad.py¶
- class BiquadFilter¶
General IIR digital filter using cascaded biquad sections. The specific filter type is configured using a coefficient matrix. These matrices can be generated for low-pass, high-pass, and band-pass configurations.
The file also contains filter matrices automatically generated using filter_gen.py and the SciPy toolkit. In general you will need to regenerate the filter coefficients for your particular application needs. The sample filter responses are shown below.
On the left is the low-pass filter signal transfer ratio as a function of frequency. Please note this is plotted on a linear scale for clarity; on a logarithmic scale (dB) the rolloff slope becomes straight. On the right is the low-pass filter time response to a ‘chirp’ frequency sweep.
On the left is the high-pass filter signal transfer ratio as a function of frequency, on the right is the time response.
On the left is the band-pass signal transfer ratio as a function of frequency, on the right is the time response.
On the left is the band-stop signal transfer ratio as a function of frequency, on the right is the time response.
1# biquad.py : digital IIR filters using cascaded biquad sections generated using filter_gen.py.
2
3#--------------------------------------------------------------------------------
4# Coefficients for a low-pass Butterworth IIR digital filter with sampling rate
5# 10 Hz and corner frequency 1.0 Hz. Filter is order 4, implemented as
6# second-order sections (biquads).
7# Reference: https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.butter.html
8
9low_pass_10_1 = [[[1.0, -1.04859958, 0.29614036], # A coefficients, first section
10 [0.00482434, 0.00964869, 0.00482434]], # B coefficients, first section
11 [[1.0, -1.32091343, 0.63273879], # A coefficients, second section
12 [1.0, 2.0, 1.0]]] # B coefficients, second section
13
14#--------------------------------------------------------------------------------
15# Coefficients for a high-pass Butterworth IIR digital filter with
16# sampling rate: 10 Hz and corner frequency 1.0 Hz.
17# Filter is order 4, implemented as second-order sections (biquads).
18
19high_pass_10_1 = [[[1.0, -1.04859958, 0.29614036],
20 [0.43284664, -0.86569329, 0.43284664]],
21 [[1.0, -1.32091343, 0.63273879],
22 [1.0, -2.0, 1.0]]]
23
24#--------------------------------------------------------------------------------
25# Coefficients for a band-pass Butterworth IIR digital filter with sampling rate
26# 10 Hz and pass frequency range [0.5, 1.5] Hz. Filter is order 4, implemented
27# as second-order sections (biquads).
28band_pass_10_1 = [[[1.0, -1.10547167, 0.46872661],
29 [0.00482434, 0.00964869, 0.00482434]],
30 [[1.0, -1.48782202, 0.63179763],
31 [1.0, 2.0, 1.0]],
32 [[1.0, -1.04431445, 0.72062964],
33 [1.0, -2.0, 1.0]],
34 [[1.0, -1.78062325, 0.87803603],
35 [1, -2.0, 1.0]]]
36
37#--------------------------------------------------------------------------------
38# Coefficients for a band-stop Butterworth IIR digital filter with
39# sampling rate: 10 Hz and exclusion frequency range [0.5, 1.5] Hz.
40# Filter is order 4, implemented as second-order sections (biquads).
41
42band_stop_10_1 = [[[1.0, -1.10547167, 0.46872661],
43 [0.43284664, -0.73640270, 0.43284664]],
44 [[1.0, -1.48782202, 0.63179763],
45 [1.0, -1.70130162, 1.0]],
46 [[1.0, -1.04431445, 0.72062964],
47 [1.0, -1.70130162, 1.0]],
48 [[1.0, -1.78062325, 0.87803603],
49 [1.0, -1.70130162, 1.0]]]
50
51#--------------------------------------------------------------------------------
52class BiquadFilter:
53 def __init__(self, coeff=low_pass_10_1):
54 """General IIR digital filter using cascaded biquad sections. The specific
55 filter type is configured using a coefficient matrix. These matrices can be
56 generated for low-pass, high-pass, and band-pass configurations.
57 """
58 self.coeff = coeff # coefficient matricies
59 self.sections = len(self.coeff) # number of biquad sections in chain
60 self.state = [[0,0] for i in range(self.sections)] # biquad state vectors
61
62 def update(self, input):
63 # Iterate over the biquads in sequence. The accum variable transfers
64 # the input into the chain, the output of each section into the input of
65 # the next, and final output value.
66 accum = input
67 for s in range(self.sections):
68 A = self.coeff[s][0]
69 B = self.coeff[s][1]
70 Z = self.state[s]
71 x = accum - A[1]*Z[0] - A[2]*Z[1]
72 accum = B[0]*x + B[1]*Z[0] + B[2]*Z[1]
73 Z[1] = Z[0]
74 Z[0] = x
75
76 return accum
demo.py¶
This following script applies all the sample filters to an analog sample stream.
This file should be copied into the top-level folder as code.py
, and the
individual filter samples under their own names so they may be loaded as
modules.
1# demo.py
2
3# Raspberry Pi Pico - Signal Processing Demo
4
5# Read an analog input with the ADC, apply various filters, and print filtered
6# data to the console for plotting.
7
8# Import CircuitPython modules.
9import board
10import time
11import analogio
12import digitalio
13
14# Import every filter sample. These files should be copied to the top-level
15# directory of the CIRCUITPY filesystem on the Pico.
16
17import biquad
18import hysteresis
19import linear
20import median
21import smoothing
22import statistics
23
24#---------------------------------------------------------------
25# Set up the hardware.
26
27# Set up an analog input on ADC0 (GP26), which is physically pin 31.
28# E.g., this may be attached to photocell or photointerrupter with associated pullup resistor.
29sensor = analogio.AnalogIn(board.A0)
30
31#---------------------------------------------------------------
32# Initialize filter objects as global variables.
33
34stats = statistics.CentralMeasures()
35hysteresis = hysteresis.Hysteresis(lower=0.25, upper=0.75)
36average = smoothing.MovingAverage()
37smoothing = smoothing.Smoothing()
38median = median.MedianFilter()
39lowpass = biquad.BiquadFilter(biquad.low_pass_10_1)
40highpass = biquad.BiquadFilter(biquad.high_pass_10_1)
41bandpass = biquad.BiquadFilter(biquad.band_pass_10_1)
42bandstop = biquad.BiquadFilter(biquad.band_stop_10_1)
43
44# Collect all the filters in a list for efficient multiple updates.
45
46all_filters = [ stats, hysteresis, average, smoothing, median,
47 lowpass, highpass, bandpass, bandstop, ]
48
49#---------------------------------------------------------------
50# Run the main event loop.
51
52# Use the high-precision clock to regulate a precise *average* sampling rate.
53sampling_interval = 100000000 # 0.1 sec period of 10 Hz in nanoseconds
54next_sample_time = time.monotonic_ns()
55
56while True:
57 # read the current nanosecond clock
58 now = time.monotonic_ns()
59 if now >= next_sample_time:
60 # Advance the next event time; by spacing out the timestamps at precise
61 # intervals, the individual sample times may have 'jitter', but the
62 # average rate will be exact.
63 next_sample_time += sampling_interval
64
65 # Read the sensor once per sampling cycle.
66 raw = sensor.value
67
68 # Apply calibration to map integer ADC values to meaningful units. The
69 # exact scaling and offset will depend on both the individual device and
70 # application.
71 calib = linear.map(raw, 59000, 4000, 0.0, 1.0)
72
73 # Pipe the calibrated value through all the filters.
74 filtered = [filt.update(calib) for filt in all_filters]
75
76 # Selectively report results for plotting.
77 # print((raw, calib)) # raw and calibrated input signal
78 # print((calib, filtered[0][0], filtered[0][1])) # calibrated, average, variance
79 # print((calib, 1*filtered[1], filtered[2])) # calibrated, thresholded, moving average
80 print((calib, filtered[3], filtered[4])) # calibrated, smoothed, median-filtered
81 # print((calib, filtered[5], filtered[6])) # calibrated, low-pass, high-pass
82 # print((calib, filtered[7], filtered[8])) # calibrated, band-pass, band-stop
Development Tools¶
The development of these filters involves several other tools:
A Python script for generating digital filters using SciPy: filter_gen.py
A Python matplotlib script for generating figures from the test data: generate_plots.py
The Python scripts use several third-party libraries:
SciPy: comprehensive numerical analysis; linear algebra algorithms used during filter generation
Matplotlib: plotting library for visualizing data
For more information on filters: