Remote Connection Examples - CircuitPython¶
The following short CircuitPython programs will demonstrate communication via the host serial port. This interface can be used for local debugging as a user console, and also enable linking projects across the network. The network connection uses a Python program running on a laptop or desktop to forward data to and from a partner via the IDeATe MQTT server.
Each can be run by copying the program into code.py
on the CIRCUITPY drive
offered by the board. The text can be pasted directly from this page, or each
file can be downloaded from the CircuitPython sample code folder on this site.
Related Pages
Remote Blink¶
Toy demonstration of remote communication implementing a command language to
control a real-time LED blinker. N.B. this top-level script assumes that the
remote.py
and led.py
modules have been copied as-is to the top-level
CIRCUITPY directory.
Direct download: remoteblink.py.
1# remoteblink.py
2
3# Raspberry Pi Pico - LED Blinker with Remote Control
4
5# Toy demonstration of remote communication implementing a command language
6# to control a real-time LED blinker.
7
8import time
9
10# Import modules provided in the 'CircuitPython/remote/' code folder. These
11# files should be copied to the top-level directory of the CIRCUITPY filesystem
12# on the Pico.
13import led
14import remote
15
16#---------------------------------------------------------------
17# Initialize global objects.
18
19# LED interface object.
20blinker = led.Blinker()
21
22# Communication interface and callbacks.
23remote = remote.RemoteSerial()
24
25def default_handler(msgtype, *args):
26 print(f"Warning: received unknown message {msgtype} {args}")
27
28def help_handler(*args):
29 print("Usage: rate <Hz>")
30
31remote.add_handler('rate', blinker.set_rate)
32remote.add_handler('help', help_handler)
33remote.add_default_handler(default_handler)
34
35#---------------------------------------------------------------
36# Main event loop to run each non-preemptive thread.
37
38last_clock = time.monotonic_ns()
39
40while True:
41 # read the current nanosecond clock
42 now = time.monotonic_ns()
43 elapsed = now - last_clock
44 last_clock = now
45
46 # poll each thread
47 remote.poll(elapsed)
48 blinker.poll(elapsed)
Remote Command Processing¶
This module implements a real-time command handler to process serial port input. This should work on any CircuitPython platform with a host port.
- class remote.RemoteSerial¶
Communication manager for receiving messages over the USB serial port.
Please note the use of the CircuitPython supervisor module for non-blocking
I/O on sys.stdin
from the CircuitPython sys module.
Direct download: remote.py.
1# remote.py
2
3# CircuitPython - Remote Communication
4
5# This module provides a class for communicating with an outside system via the
6# USB serial port. This can be a user, a control program, or a proxy script
7# communicating with a remote system.
8
9# ----------------------------------------------------------------
10# Import the standard Python math library and time functions.
11import math, time, sys
12
13# Import the runtime for checking serial port status.
14import supervisor
15
16# ----------------------------------------------------------------
17class RemoteSerial:
18 def __init__(self):
19 """Communication manager for receiving messages over the USB serial port."""
20
21 # Keep a mutable buffer to accumulate input lines. This will incur less
22 # allocation overhead than constantly extending strings.
23 self.line_buffer = bytearray(80)
24 self.next_char = 0
25
26 # Keep a dictionary mapping input tokens to callback functions.
27 self.dispatch_table = {}
28
29 # Callback entry for unrecognized tokens.
30 self.default_handler = None
31
32 def add_handler(self, token, function):
33 self.dispatch_table[token] = function
34
35 def add_default_handler(self, function):
36 self.default_handler = function
37
38 def poll(self, elapsed):
39 """Polling function to be called as frequently as possible from the event loop
40 to read and process new samples.
41
42 :param elapsed: nanoseconds elapsed since the last cycle.
43 """
44
45 # Check the serial input for new remote data.
46 while supervisor.runtime.serial_bytes_available:
47 char = sys.stdin.read(1)
48
49 # if an end-of-line character is received, process the current line buffer
50 if char == '\n':
51 # convert the valid characters from bytearray to a string
52 line_string = self.line_buffer[0:self.next_char].decode()
53
54 # pass string through the dispatch system
55 self.process_line(line_string)
56
57 # reset the buffer
58 self.next_char = 0
59
60 else:
61 # if the input buffer is full, increase its length
62 if self.next_char == len(self.line_buffer):
63 self.line_buffer += bytearray(80)
64
65 # insert the new character into the buffer
66 self.line_buffer[self.next_char] = ord(char)
67 self.next_char += 1
68
69
70 def process_line(self, line):
71 """Process a string representing a line of remote input including an initial
72 endpoint token and optional arguments separated by whitespace."""
73 tokens = line.split()
74 if len(tokens) > 0:
75 key_token = tokens[0]
76
77 # convert arguments into Python values
78 args = [self.convert_arg(arg) for arg in tokens[1:]]
79
80 # Look for a dispatch callback.
81 callback = self.dispatch_table.get(key_token)
82 if callback is not None:
83 # Call the method. Note: the Python *args form will pass a list
84 # as individual arguments. Any return values are ignored; this
85 # is assumed to be an imperative function called for side effects.
86 try:
87 callback(*args)
88 except Exception as e:
89 print("Warning:", e)
90 elif self.default_handler is not None:
91 # If the key token isn't found in the table, try the unknown message handler.
92 try:
93 self.default_handler(key_token, *args)
94 except Exception as e:
95 print("Warning:", e)
96
97
98 def convert_arg(self, arg):
99 """Attempt converting a string into either an integer or float, returning the
100 Python value. If conversion fails, returns the argument."""
101 try:
102 return int(arg)
103 except ValueError:
104 try:
105 return float(arg)
106 except ValueError:
107 return arg
LED Blinker¶
This module implements the Pico LED interface used in the remoteblink demo.
Direct download: led.py.
1# led.py
2
3# Raspberry Pi Pico - LED Blinker
4
5# Blink controller for the onboard LED, intended to run inside an event loop system.
6
7import board
8import digitalio
9
10class Blinker:
11 def __init__(self):
12 """Interface to the onboard LED featuring variable rate blinking."""
13 # Set up built-in green LED for output.
14 self.led = digitalio.DigitalInOut(board.LED) # GP25
15 self.led.direction = digitalio.Direction.OUTPUT
16 self.update_timer = 0
17 self.set_rate(1.0)
18
19 def poll(self, elapsed):
20 """Polling function to be called as frequently as possible from the event loop
21 with the nanoseconds elapsed since the last cycle."""
22 self.update_timer -= elapsed
23 if self.update_timer < 0:
24 self.update_timer += self.update_interval
25 self.led.value = not self.led.value
26
27 def set_rate(self, Hz):
28 self.update_interval = int(500000000 / Hz) # blink half-period in nanoseconds
29