5.2. Pneumatics Animation Example

This module provides a basic template for a command-line Python script to communicate with an Arduino running the ValveControl Arduino Sketch. The script itself just reads and writes raw I/O data, but could be extended to implement application logic. The script uses just one Arduino but is designed to be extended for multiple.

The full code can be found in several files in pneumatics_animation or downloaded as a single zip file pneumatics_animation.zip

5.2.1. animation_example.py

This application script can be run from the command line to interact with an Arduino over a serial port.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
#!/usr/bin/env python
"""\
animation_example.py : example command-line program to communicate with an Arduino over a serial port and execute timed logic.

Copyright (c) 2017, Garth Zeglin.  All rights reserved. Licensed under the terms of the BSD 3-clause license.
"""

from __future__ import print_function
import os, sys, time, argparse, select

from ValveControlProtocol import ValveControlProtocol

################################################################
def update_animation_state(now):
    """Stub for a function called periodically to evaluate the sensor inputs and generate the next cycle of animation outputs.
    
    :param now: current wall clock time
    """
    
    pass

################################################################
def event_loop(arduinos):
    """Prototype for an event-processing loop which manages I/O on multiple serial ports.

    :param arduinos: list of Arduino protocol objects
    """

    polling_interval = 0.5
    polling_timer = polling_interval
    last_time = time.time()
    
    while True:
        # Check which Arduinos have data available, using select with a short timeout.
        ready = select.select(arduinos,[], [], max(polling_timer, 0.0))

        # Process messages on each Arduino port with available data. 
        for arduino in ready[0]: arduino.wait_for_message()
          
        # Check whether enough time has elapsed to run a polling cycle, and recalculate the next timeout.
        now = time.time()
        polling_timer -= (now - last_time) # subtract elapsed time
        last_time = now
        
        # When the polling interval has expired:
        if polling_timer <= 0:
            polling_timer += polling_interval

            # Apply any polled logic here; this will be called at a constant average rate.

            # Print a debugging message stream.
            for arduino in arduinos:
                print("arduino status: awake: %s, joints: %s" % (arduino.awake_time, arduino.joints))

            # Callback to update the animation based on any new data.
            update_animation_state(now)

################################################################
# The following section is run when this is loaded as a script.
if __name__ == "__main__":

    # Initialize the command parser.
    parser = argparse.ArgumentParser( description = """Example command line program to interface with an Arduino sketch.""")
    parser.add_argument( '-v', '--verbose', action='store_true', help='Enable more detailed output.' )
    parser.add_argument( '--debug', action='store_true', help='Enable debugging output.' )
    parser.add_argument( '-p', '--port', default='/dev/tty.usbmodem1411', help='Specify the name of the Arduino serial port device (default is /dev/tty.usbmodem1411).')

    # Parse the command line, returning a Namespace.
    args = parser.parse_args()

    # Extract specific command line options into a keyword argument dictionary.
    kwargs = { 'port' : args.port, 'debug' : args.debug, 'verbose' : args.verbose }
    
    # Create a protocol object to manage the serial port.
    arduino = ValveControlProtocol(**kwargs)

    # Connect to the Arduino, which will also reset it.
    print("Connecting to Arduino.")
    arduino.open_serial_port()

    # issue a few test commands
    time.sleep(2)
    arduino.set_flow(50, extend=1, retract=2)
    time.sleep(1)
    arduino.set_flow(-50, extend=1, retract=2)
    arduino.set_flow(50, extend=3, retract=4)
    time.sleep(1)
    arduino.set_flow(-50, extend=3, retract=4)
    time.sleep(1)
    arduino.set_flow(0, extend=1, retract=2)
    arduino.set_flow(0, extend=3, retract=4)        
    
    # Now run the event loop until it exits or control-C is pressed.
    try:
        event_loop([arduino])
            
    except KeyboardInterrupt:
        print("User interrupted operation.")

    # Shut down the connection.
    arduino.close_serial_port()
        

5.2.2. four_joint_example.py

This application script can be run from the command line to interact with a pair of Arduinos driving up to four actuators.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
#!/usr/bin/env python
"""\
four_joint_example.py : example command-line program to communicate with two Arduinos over serial ports and execute timed logic.

Copyright (c) 2017, Garth Zeglin.  All rights reserved. Licensed under the terms of the BSD 3-clause license.
"""

from __future__ import print_function
import os, sys, time, argparse, select

from ValveControlProtocol import ValveControlProtocol
from SensorProtocol import SensorProtocol

################################################################
def polling_wait(arduinos, duration=0.0):
    """Poll all Arduinos and process pending data for a given interval.

    :param arduinos: list of Arduino protocol objects
    :param duration: time in seconds to poll before returning (default to zero)
    """
    last_time = time.time()
    pending = True

    while pending or duration > 0.0:
        # Check which Arduinos have data available, using select with a short timeout.
        ready = select.select(arduinos,[], [], max(duration, 0.0))
        pending = False
        
        # Process messages on each Arduino port with available data.  If any message
        # is received, reflag pending to ensure checking again.
        for arduino in ready[0]:
            arduino.wait_for_message()
            pending = True
          
        # Recalculate the maximum timeout.
        if duration > 0.0:
            now = time.time()
            duration -= (now - last_time) # subtract elapsed time
            last_time = now

################################################################
# The following section is run when this is loaded as a script.
if __name__ == "__main__":

    # Initialize the command parser.
    parser = argparse.ArgumentParser( description = """Example command line program to interface with an Arduino sketch.""")
    parser.add_argument( '-v', '--verbose', action='store_true', help='Enable more detailed output.' )
    parser.add_argument( '--debug', action='store_true', help='Enable debugging output.' )

    # Parse the command line, returning a Namespace.
    args = parser.parse_args()

    # Extract specific command line options into a keyword argument dictionary.
    kwargs = { 'debug' : args.debug, 'verbose' : args.verbose }
    
    # Create protocol objects to manage the serial ports.
    valve1 = ValveControlProtocol(port = "/dev/tty.usbmodem141431", **kwargs)
    valve2 = ValveControlProtocol(port = "/dev/tty.usbmodem141441", **kwargs)
    sensors = SensorProtocol(     port = "/dev/tty.usbmodem141421", **kwargs)
    arduinos = [valve1, valve2, sensors]
    
    # Connect to the Arduinos, which will also reset them.
    print("Connecting to Arduinos.")
    
    for arduino in arduinos:
        arduino.open_serial_port()

    # issue a few test commands
    time.sleep(2)

    # Begin an animation sequence until it finishes or control-C is pressed..
    try:
        valve1.set_flow(50, extend=1, retract=2)
        polling_wait(arduinos, duration=1.0)
        
        valve1.set_flow(-50, extend=1, retract=2)
        valve1.set_flow(50, extend=3, retract=4)
        polling_wait(arduinos, duration=1.0)

        valve1.set_flow(-50, extend=3, retract=4)
        polling_wait(arduinos, duration=1.0)

        valve1.set_flow(0, extend=1, retract=2)
        valve1.set_flow(0, extend=3, retract=4)
        polling_wait(arduinos, duration=1.0)
                
        valve2.set_flow(50, extend=1, retract=2)
        polling_wait(arduinos, duration=1.0)
        
        valve2.set_flow(-50, extend=1, retract=2)
        valve2.set_flow(50, extend=3, retract=4)
        polling_wait(arduinos, duration=1.0)

        valve2.set_flow(-50, extend=3, retract=4)
        polling_wait(arduinos, duration=1.0)

        valve2.set_flow(0, extend=1, retract=2)
        valve2.set_flow(0, extend=3, retract=4)        
        polling_wait(arduinos, duration=1.0)
            
    except KeyboardInterrupt:
        print("User interrupted operation.")

    # Shut down the connections.
    for arduino in arduinos:    
        arduino.close_serial_port()

5.2.3. ValveControlProtocol.py

ValveControlProtocol.py : application-specific class to manage serial communication with an Arduino ValveControl sketch.

Copyright (c) 2015-2017, Garth Zeglin. All rights reserved. Licensed under the terms of the BSD 3-clause license.

class pneumatics_animation.ValveControlProtocol.ValveControlProtocol(**kwargs)[source]

Example class to communicate with the ValveControlASCII Arduino sketch.

awake_time = None

The clock time of the most recent Arduino awake message.

joints = None

Dictionary of most recent joint status data.

message_received(tokens)[source]

Application-specific message processing to parse lists of tokens received from the Arduino sketch. This implementation just stores values in instance attributes. This method overrides the null implementation in the parent class.

Parameters:tokens – list of string tokens
set_flow(flow, extend=1, retract=2, verbose=False)[source]

Issue an open-loop set of flow commands for a valve pair attached to an actuator. Returns immediately. No return value.

Parameters:
  • flow – net flow from -100 full retraction rate to 100 full extension rate
  • extend – channel number for extension valve pair
  • retract – channel number for retract valve pair
  • verbose – flag to enable printed debugging output
Returns:

None

5.2.4. SensorProtocol.py

SensorProtocol.py : application-specific class to manage serial communication with an Arduino sketch reporting sensor data.

Copyright (c) 2015-2017, Garth Zeglin. All rights reserved. Licensed under the terms of the BSD 3-clause license.

class pneumatics_animation.SensorProtocol.SensorProtocol(**kwargs)[source]

Example class to communicate with an Arduino sketch which reports lists of numbers.

message_received(tokens)[source]

Application-specific message processing to parse lists of tokens received from the Arduino sketch. This implementation just converts every token to a number and stores them in a list in an instance attribute. This method overrides the null implementation in the parent class.

Parameters:tokens – list of string tokens
values = None

List of the most recent values received.

5.2.5. ArduinoProtocol.py

ArduinoProtocol.py : abstract class to manage serial communication with an Arduino sketch.

Copyright (c) 2015-2017, Garth Zeglin. All rights reserved. Licensed under the terms of the BSD 3-clause license.

class pneumatics_animation.ArduinoProtocol.ArduinoProtocol(port=None, verbose=False, debug=False, rate=115200, **kwargs)[source]

Abstract class to manage a serial connection to an Arduino sketch.

This class is sufficient to send and receive structured data messages as line-delimited text, but is more useful as a parent class for an application-specific class which manages the specific data types in the stream.

Parameters:
  • port – the name of the serial port device
  • verbose – flag to increase console output
  • debug – flag to print raw inputs on console
  • rate – serial baud rate
  • kwargs – collect any unused keyword arguments
close_serial_port()[source]

Shut down the serial connection to the Arduino, after which this object may no longer be used.

fileno()[source]

Return the file descriptor for the input port, useful for blocking on available input with select.

flush_serial_input()[source]

Clear the input buffer.

input = None

Input port object (usually same as output).

is_connected()[source]

Return true if the serial port device is open.

is_startup_delay_complete()[source]

Check whether the boot time interval has completed after opening the serial port.

message_received(tokens)[source]

Method called whenever a new message is received from the Arduino. The default implentation is null but this may be overridden in subclasses as a hook for this event.

Parameters:tokens – list of strings representing whitespace-delimited tokens in received message
messages_received = None

Count of total lines received.

messages_sent = None

Count of total lines sent.

open_serial_port()[source]

Open the serial connection to the Arduino and initialize communications.

output = None

Output port object (usually same as input).

send_message(tokens)[source]

Issue a message to the Arduino sketch.

Parameters:tokens – message as list of values
send_message_string(string)[source]

Issue a message string to the Arduino.

Parameters:string – message as single string without terminating newline
set_serial_port_name(name)[source]

Set the name of the serial port device.

wait_for_message()[source]

Wait for a line of input from the Arduino and process it, updating status variables. This will block if data is not ready; if using in an event-driven system, it is important to check the port for available data prior to calling.