Remote Connection Examples - CircuitPython

The following short CircuitPython programs will demonstrate communication via the host serial port. This interface can be used for local debugging as a user console, and also enable linking projects across the network. The network connection uses a Python program running on a laptop or desktop to forward data to and from a partner via the IDeATe MQTT server.

Each can be run by copying the program into code.py on the CIRCUITPY drive offered by the board. The text can be pasted directly from this page, or each file can be downloaded from the CircuitPython sample code folder on this site.

Related Pages

Remote Command Processing

This module implements a real-time command handler to process serial port input. This should work on any CircuitPython platform with a host port.

class remote.RemoteSerial

Communication manager for receiving messages over the USB serial port.

Please note the use of the CircuitPython supervisor module for non-blocking I/O on sys.stdin from the CircuitPython sys module.

Direct download: remote.py.

  1# remote.py
  2
  3# CircuitPython - Remote Communication
  4
  5# This module provides a class for communicating with an outside system via the
  6# USB serial port.  This can be a user, a control program, or a proxy script
  7# communicating with a remote system.
  8
  9# ----------------------------------------------------------------
 10# Import the standard Python math library and time functions.
 11import math, time, sys
 12
 13# Import the runtime for checking serial port status.
 14import supervisor
 15
 16# ----------------------------------------------------------------
 17class RemoteSerial:
 18    def __init__(self):
 19        """Communication manager for receiving messages over the USB serial port."""
 20
 21        # Keep a mutable buffer to accumulate input lines.  This will incur less
 22        # allocation overhead than constantly extending strings.
 23        self.line_buffer = bytearray(80)
 24        self.next_char = 0
 25
 26        # Keep a dictionary mapping input tokens to callback functions.
 27        self.dispatch_table = {}
 28
 29        # Callback entry for unrecognized tokens.
 30        self.default_handler = None
 31
 32    def add_handler(self, token, function):
 33        self.dispatch_table[token] = function
 34
 35    def add_default_handler(self, function):
 36        self.default_handler = function
 37
 38    def poll(self, elapsed):
 39        """Polling function to be called as frequently as possible from the event loop
 40        to read and process new samples.
 41
 42        :param elapsed: nanoseconds elapsed since the last cycle.
 43        """
 44
 45        # Check the serial input for new remote data.
 46        while supervisor.runtime.serial_bytes_available:
 47            char = sys.stdin.read(1)
 48
 49            # if an end-of-line character is received, process the current line buffer
 50            if char == '\n':
 51                # convert the valid characters from bytearray to a string
 52                line_string = self.line_buffer[0:self.next_char].decode()
 53
 54                # pass string through the dispatch system
 55                self.process_line(line_string)
 56
 57                # reset the buffer
 58                self.next_char = 0
 59
 60            else:
 61                # if the input buffer is full, increase its length
 62                if self.next_char == len(self.line_buffer):
 63                    self.line_buffer += bytearray(80)
 64
 65                # insert the new character into the buffer
 66                self.line_buffer[self.next_char] = ord(char)
 67                self.next_char += 1
 68
 69
 70    def process_line(self, line):
 71        """Process a string representing a line of remote input including an initial
 72        endpoint token and optional arguments separated by whitespace."""
 73        tokens = line.split()
 74        if len(tokens) > 0:
 75            key_token = tokens[0]
 76
 77            # convert arguments into Python values
 78            args = [self.convert_arg(arg) for arg in tokens[1:]]
 79
 80            # Look for a dispatch callback.
 81            callback = self.dispatch_table.get(key_token)
 82            if callback is not None:
 83                # Call the method.  Note: the Python *args form will pass a list
 84                # as individual arguments.  Any return values are ignored; this
 85                # is assumed to be an imperative function called for side effects.
 86                try:
 87                    callback(*args)
 88                except Exception as e:
 89                    print("Warning:", e)
 90            elif self.default_handler is not None:
 91                # If the key token isn't found in the table, try the unknown message handler.
 92                try:
 93                    self.default_handler(key_token, *args)
 94                except Exception as e:
 95                    print("Warning:", e)
 96
 97
 98    def convert_arg(self, arg):
 99        """Attempt converting a string into either an integer or float, returning the
100        Python value.  If conversion fails, returns the argument."""
101        try:
102            return int(arg)
103        except ValueError:
104            try:
105                return float(arg)
106            except ValueError:
107                return arg

LED Blinker

This module implements the Pico LED interface used in the remoteblink demo.

Direct download: led.py.

 1# led.py
 2
 3# Raspberry Pi Pico - LED Blinker
 4
 5# Blink controller for the onboard LED, intended to run inside an event loop system.
 6
 7import board
 8import digitalio
 9
10class Blinker:
11    def __init__(self):
12        """Interface to the onboard LED featuring variable rate blinking."""
13        # Set up built-in green LED for output.
14        self.led = digitalio.DigitalInOut(board.LED)  # GP25
15        self.led.direction = digitalio.Direction.OUTPUT
16        self.update_timer = 0
17        self.set_rate(1.0)
18            
19    def poll(self, elapsed):
20        """Polling function to be called as frequently as possible from the event loop
21        with the nanoseconds elapsed since the last cycle."""
22        self.update_timer -= elapsed
23        if self.update_timer < 0:
24            self.update_timer += self.update_interval
25            self.led.value = not self.led.value
26
27    def set_rate(self, Hz):
28        self.update_interval = int(500000000 / Hz) # blink half-period in nanoseconds
29