Remote Connection Examples - CircuitPython

The following short CircuitPython programs will demonstrate communication via the host serial port. This interface can be used for local debugging as a user console, and also enable linking projects across the network. The network connection uses a Python program running on a laptop or desktop to forward data to and from a partner via the IDeATe MQTT server.

Each can be run by copying the program into code.py on the CIRCUITPY drive offered by the board. The text can be pasted directly from this page, or each file can be downloaded from the CircuitPython sample code folder on this site.

Related Pages

Remote Command Processing

This module implements a real-time command handler to process serial port input. This should work on any CircuitPython platform with a host port.

class remote.RemoteSerial

Communication manager for receiving messages over the USB serial port.

Please note the use of the CircuitPython supervisor module for non-blocking I/O on sys.stdin from the CircuitPython sys module.

Direct download: remote.py.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
# remote.py

# CircuitPython - Remote Communication

# This module provides a class for communicating with an outside system via the
# USB serial port.  This can be a user, a control program, or a proxy script
# communicating with a remote system.

# ----------------------------------------------------------------
# Import the standard Python math library and time functions.
import math, time, sys

# Import the runtime for checking serial port status.
import supervisor

# ----------------------------------------------------------------
class RemoteSerial:
    def __init__(self):
        """Communication manager for receiving messages over the USB serial port."""

        # Keep a mutable buffer to accumulate input lines.  This will incur less
        # allocation overhead than constantly extending strings.
        self.line_buffer = bytearray(80)
        self.next_char = 0

        # Keep a dictionary mapping input tokens to callback functions.
        self.dispatch_table = {}

        # Callback entry for unrecognized tokens.
        self.default_handler = None

    def add_handler(self, token, function):
        self.dispatch_table[token] = function

    def add_default_handler(self, function):
        self.default_handler = function

    def poll(self, elapsed):
        """Polling function to be called as frequently as possible from the event loop
        to read and process new samples.

        :param elapsed: nanoseconds elapsed since the last cycle.
        """

        # Check the serial input for new remote data.
        while supervisor.runtime.serial_bytes_available:
            char = sys.stdin.read(1)

            # if an end-of-line character is received, process the current line buffer
            if char == '\n':
                # convert the valid characters from bytearray to a string
                line_string = self.line_buffer[0:self.next_char].decode()

                # pass string through the dispatch system
                self.process_line(line_string)

                # reset the buffer
                self.next_char = 0

            else:
                # if the input buffer is full, increase its length
                if self.next_char == len(self.line_buffer):
                    self.line_buffer += bytearray(80)

                # insert the new character into the buffer
                self.line_buffer[self.next_char] = ord(char)
                self.next_char += 1


    def process_line(self, line):
        """Process a string representing a line of remote input including an initial
        endpoint token and optional arguments separated by whitespace."""
        tokens = line.split()
        if len(tokens) > 0:
            key_token = tokens[0]

            # convert arguments into Python values
            args = [self.convert_arg(arg) for arg in tokens[1:]]

            # Look for a dispatch callback.
            callback = self.dispatch_table.get(key_token)
            if callback is not None:
                # Call the method.  Note: the Python *args form will pass a list
                # as individual arguments.  Any return values are ignored; this
                # is assumed to be an imperative function called for side effects.
                try:
                    callback(*args)
                except Exception as e:
                    print("Warning:", e)
            elif self.default_handler is not None:
                # If the key token isn't found in the table, try the unknown message handler.
                try:
                    self.default_handler(key_token, *args)
                except Exception as e:
                    print("Warning:", e)


    def convert_arg(self, arg):
        """Attempt converting a string into either an integer or float, returning the
        Python value.  If conversion fails, returns the argument."""
        try:
            return int(arg)
        except ValueError:
            try:
                return float(arg)
            except ValueError:
                return arg

LED Blinker

This module implements the Pico LED interface used in the remoteblink demo.

Direct download: led.py.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# led.py

# Raspberry Pi Pico - LED Blinker

# Blink controller for the onboard LED, intended to run inside an event loop system.

import board
import digitalio

class Blinker:
    def __init__(self):
        """Interface to the onboard LED featuring variable rate blinking."""
        # Set up built-in green LED for output.
        self.led = digitalio.DigitalInOut(board.LED)  # GP25
        self.led.direction = digitalio.Direction.OUTPUT
        self.update_timer = 0
        self.set_rate(1.0)
            
    def poll(self, elapsed):
        """Polling function to be called as frequently as possible from the event loop
        with the nanoseconds elapsed since the last cycle."""
        self.update_timer -= elapsed
        if self.update_timer < 0:
            self.update_timer += self.update_interval
            self.led.value = not self.led.value

    def set_rate(self, Hz):
        self.update_interval = int(500000000 / Hz) # blink half-period in nanoseconds