Remote Connection Examples - CircuitPython¶
The following short CircuitPython programs will demonstrate communication via the host serial port. This interface can be used for local debugging as a user console, and also enable linking projects across the network. The network connection uses a Python program running on a laptop or desktop to forward data to and from a partner via the IDeATe MQTT server.
Each can be run by copying the program into code.py
on the CIRCUITPY drive
offered by the board. The text can be pasted directly from this page, or each
file can be downloaded from the CircuitPython sample code folder on this site.
Related Pages
Contents
Remote Blink¶
Toy demonstration of remote communication implementing a command language to
control a real-time LED blinker. N.B. this top-level script assumes that the
remote.py
and led.py
modules have been copied as-is to the top-level
CIRCUITPY directory.
Direct download: remoteblink.py.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 | # remoteblink.py
# Raspberry Pi Pico - LED Blinker with Remote Control
# Toy demonstration of remote communication implementing a command language
# to control a real-time LED blinker.
import time
# Import modules provided in the 'CircuitPython/remote/' code folder. These
# files should be copied to the top-level directory of the CIRCUITPY filesystem
# on the Pico.
import led
import remote
#---------------------------------------------------------------
# Initialize global objects.
# LED interface object.
blinker = led.Blinker()
# Communication interface and callbacks.
remote = remote.RemoteSerial()
def default_handler(msgtype, *args):
print(f"Warning: received unknown message {msgtype} {args}")
def help_handler(*args):
print("Usage: rate <Hz>")
remote.add_handler('rate', blinker.set_rate)
remote.add_handler('help', help_handler)
remote.add_default_handler(default_handler)
#---------------------------------------------------------------
# Main event loop to run each non-preemptive thread.
last_clock = time.monotonic_ns()
while True:
# read the current nanosecond clock
now = time.monotonic_ns()
elapsed = now - last_clock
last_clock = now
# poll each thread
remote.poll(elapsed)
blinker.poll(elapsed)
|
Remote Command Processing¶
This module implements a real-time command handler to process serial port input. This should work on any CircuitPython platform with a host port.
-
class
remote.
RemoteSerial
¶ Communication manager for receiving messages over the USB serial port.
Please note the use of the CircuitPython supervisor module for non-blocking
I/O on sys.stdin
from the CircuitPython sys module.
Direct download: remote.py.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 | # remote.py
# CircuitPython - Remote Communication
# This module provides a class for communicating with an outside system via the
# USB serial port. This can be a user, a control program, or a proxy script
# communicating with a remote system.
# ----------------------------------------------------------------
# Import the standard Python math library and time functions.
import math, time, sys
# Import the runtime for checking serial port status.
import supervisor
# ----------------------------------------------------------------
class RemoteSerial:
def __init__(self):
"""Communication manager for receiving messages over the USB serial port."""
# Keep a mutable buffer to accumulate input lines. This will incur less
# allocation overhead than constantly extending strings.
self.line_buffer = bytearray(80)
self.next_char = 0
# Keep a dictionary mapping input tokens to callback functions.
self.dispatch_table = {}
# Callback entry for unrecognized tokens.
self.default_handler = None
def add_handler(self, token, function):
self.dispatch_table[token] = function
def add_default_handler(self, function):
self.default_handler = function
def poll(self, elapsed):
"""Polling function to be called as frequently as possible from the event loop
to read and process new samples.
:param elapsed: nanoseconds elapsed since the last cycle.
"""
# Check the serial input for new remote data.
while supervisor.runtime.serial_bytes_available:
char = sys.stdin.read(1)
# if an end-of-line character is received, process the current line buffer
if char == '\n':
# convert the valid characters from bytearray to a string
line_string = self.line_buffer[0:self.next_char].decode()
# pass string through the dispatch system
self.process_line(line_string)
# reset the buffer
self.next_char = 0
else:
# if the input buffer is full, increase its length
if self.next_char == len(self.line_buffer):
self.line_buffer += bytearray(80)
# insert the new character into the buffer
self.line_buffer[self.next_char] = ord(char)
self.next_char += 1
def process_line(self, line):
"""Process a string representing a line of remote input including an initial
endpoint token and optional arguments separated by whitespace."""
tokens = line.split()
if len(tokens) > 0:
key_token = tokens[0]
# convert arguments into Python values
args = [self.convert_arg(arg) for arg in tokens[1:]]
# Look for a dispatch callback.
callback = self.dispatch_table.get(key_token)
if callback is not None:
# Call the method. Note: the Python *args form will pass a list
# as individual arguments. Any return values are ignored; this
# is assumed to be an imperative function called for side effects.
try:
callback(*args)
except Exception as e:
print("Warning:", e)
elif self.default_handler is not None:
# If the key token isn't found in the table, try the unknown message handler.
try:
self.default_handler(key_token, *args)
except Exception as e:
print("Warning:", e)
def convert_arg(self, arg):
"""Attempt converting a string into either an integer or float, returning the
Python value. If conversion fails, returns the argument."""
try:
return int(arg)
except ValueError:
try:
return float(arg)
except ValueError:
return arg
|
LED Blinker¶
This module implements the Pico LED interface used in the remoteblink demo.
Direct download: led.py.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | # led.py
# Raspberry Pi Pico - LED Blinker
# Blink controller for the onboard LED, intended to run inside an event loop system.
import board
import digitalio
class Blinker:
def __init__(self):
"""Interface to the onboard LED featuring variable rate blinking."""
# Set up built-in green LED for output.
self.led = digitalio.DigitalInOut(board.LED) # GP25
self.led.direction = digitalio.Direction.OUTPUT
self.update_timer = 0
self.set_rate(1.0)
def poll(self, elapsed):
"""Polling function to be called as frequently as possible from the event loop
with the nanoseconds elapsed since the last cycle."""
self.update_timer -= elapsed
if self.update_timer < 0:
self.update_timer += self.update_interval
self.led.value = not self.led.value
def set_rate(self, Hz):
self.update_interval = int(500000000 / Hz) # blink half-period in nanoseconds
|