suitcase Webots emulator

One solution for the suitcase is to develop performances as much as possible in simulation than use a compatibility layer to replace the Webots API with the physical hardware.

The sample implementation makes use of several Python features: multithreading, multiple inheritance, and ‘duck typing’. The RealSuitcase class encapsulates the robot control process by inheriting from multiple classes:

  1. The suitcase.Suitcase class is a duplicate of the Webots simulation controller. It inherits from controller.Robot which is normally provided by the Webots API. Instead, a proxy class is provided in controller.py with a partial implementation of this API.

  2. The controller.SuitcaseRobot class implements the specific devices required for the example in the form of methods which are normally implemented in the Webots Robot class. This works because Python isn’t particular about the exact parent class of a method.

  3. The threading.Thread class implements the methods to allow the control process to run as a separate thread.

The communication with the Arduino is handled by a separate thread to allow the controller and Arduino to run on independent schedules. In practice, the Arduino communication is slower than the Webots communication, so the actual sensor and motor behavior will vary from the simulation, but the controller will run in approximately real time. This communication is managed by the Arduino class which receives status information, passes it to the controller emulator for interpretation, then requests and sends updated control commands.

running the system

The script is intended to be run from the command line, as it requires the Arduino serial port specified as an argument and it continuously prints out data traffic.

A typical invocation looks as follows:

python3 main.py --port /dev/cu.usbmodem1444301

Under Windows, Arduino serial ports are named using COM, e.g. COM5.

On any platform, if you are not sure of the port name, please launch the Arduino IDE to identify the correct port.

main.py

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
#!/usr/bin/env python3

# main.py : top-level script to run a suitcase testbed show
# No copyright, 2021, Garth Zeglin.  This file is explicitly placed in the public domain.

# This provides a skeleton Webots API to connect a Webots controller object to 
# physical hardware.  Unlike Webots, it uses threads to run the controller as
# a normal Python object within a single process.

# Standard Python modules.
import os, sys, time, threading, argparse

# Import pySerial module for communicating with an Arduino.  https://pythonhosted.org/pyserial/
import serial

# Import the skeleton Webots API from the current folder.
import controller

# Import the controller. The suitcase.py file was copied from a Webots project tree.
import suitcase

################################################################
# Define wrapper classes which inherit from both a Webots robot controller and a
# specialized physical interface class.  The controller object inherits
# controller.Robot with common API code, but some methods are implemented
# instead by the SuitcaseRobot interface class.

class RealSuitcase(suitcase.Suitcase, controller.SuitcaseRobot, threading.Thread):
    def __init__(self):
        print("Entering RealSuitcase.__init__ and initializing superclasses")
        super(RealSuitcase, self).__init__()
        self.daemon = True
        
    def run(self):
        print("Starting suitcase.")
        self._lock.acquire()
        super(RealSuitcase, self).run()
        
################################################################
class Arduino(threading.Thread):
    """Connect an Arduino serial port to a controller object.  The input and output
    is assumed to be a text line of space-delimited integers. The specific data
    format of both input is determined by the controller class and the Arduino
    sketch.  This process runs as a thread so it can easily block on the input.
    """
    def __init__(self, portname, controller):
        print("Entering Arduino.__init__ and initializing superclasses")
        super(Arduino, self).__init__()

        self.controller = controller
        self.portname = portname
        self.port = serial.Serial(portname, baudrate=115200, timeout=1.0)
        self.running = True
        
    def run(self):
        # wait briefly for the Arduino to complete waking up
        time.sleep(0.2)
        
        print(f"Entering Arduino event loop for {self.portname}.")
        while(self.running):
            line = self.controller.get_output_string()
            self.port.write(line.encode())
            print("Sending to Arduino:    " + line.strip())

            input = self.port.readline().decode('ascii')
            print("Received from Arduino: " + input.strip())
            self.controller.process_sensor_string(input)

    def safe_exit(self):
        self.running = False
        self.port.write(b"enable 0\n")        
        
################################################################        
# The following section is run when this is loaded as a script.
if __name__ == "__main__":

    # Initialize the command parser.
    parser = argparse.ArgumentParser( description = """Control script to connect Webots controller to StepperWinch firmware on an Arduino.""")
    parser.add_argument( '-p', '--port', default='/dev/cu.usbmodem1444301',
                         help='Specify the name of the Arduino serial port device (default is %(default)s.')

    # Parse the command line, returning a Namespace.
    args = parser.parse_args()
    
    # Create the suitcase control object.
    suitcase = RealSuitcase()

    # Create the Arduino interface which communicates with physical hardware.
    # The serial device address will need to be modified for your specific computer.
    suitcase_hw = Arduino(args.port, suitcase)

    # Start the control thread.
    suitcase.start()

    # Start the hardware interface thread.
    suitcase_hw.start()

    # Wait for the hardware thread to finish.
    try:
        suitcase_hw.join()

    except KeyboardInterrupt:
        suitcase_hw.safe_exit()
        print("Main thread received user interrupt, exiting.")
    

controller.py

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
# controller.py : skeleton Webots API to connect the controller objects to the physical hardware.
# No copyright, 2021, Garth Zeglin.  This file is explicitly placed in the public domain.

import time, threading, queue, math

################################################################
class Robot(object):
    """This object emulates the portions of the Webots API common to all physical robots."""
    def __init__(self):

        print("Entering Robot.__init__ and initializing superclasses")
        super(Robot, self).__init__()

        # include a receiver/emitter pair by default
        self.emitter = Emitter()
        self.receiver = Receiver()

        # keep track of elapsed time
        self._time = 0.0
        
        # lock to synchronize access for this object and its children
        self._lock = threading.Lock()

    def step(self, interval):
        # print(f"Robot step function sleeping for {interval} msec on {self}.")

        self._lock.release()
        time.sleep(0.001 * interval)
        self._lock.acquire()
        self._time += 0.001 * interval
        
        # print("Robot step function resuming.")
        return 0

    def getTime(self):
        return self._time
    
    # only support a single receiver and emitter    
    def getReceiver(self, name):
        return self.receiver

    def getEmitter(self, name):
        return self.emitter

################################################################
class SuitcaseRobot(object):
    """This object emulates the portions of the Webots API related to the physical suitcase robot."""
    def __init__(self):
        print("Entering SuitcaseRobot.__init__ and initializing superclasses")
        super(SuitcaseRobot, self).__init__()

        # The hardware is assumed to use 1/4 step microstepping and 800 steps per revolution.
        self.radians_to_steps = 800 / (2*math.pi)
        self.steps_to_radians = 1.0 / self.radians_to_steps
        
        # Create objects representing the physical hardware.
        self._motors  = [RealMotor(name)  for name in ['motor1', 'motor2', 'motor3', 'motor4']]
        self._sensors = [RealSensor(name) for name in ['joint1', 'joint2', 'joint3', 'joint4']]

        # Collect all the devices into a single dictionary for retrieval by the controller.
        self._devices = {}
        for m in self._motors:
            self._devices[m.name] = m
        for s in self._sensors:
            self._devices[s.name] = s
        # print("SuitcaseRobot initialized devices: ", self._devices)
         
    def getDevice(self, name):
        return self._devices[name]

    def process_sensor_string(self, line):
        # Process a bytestring of text from the hardware, typically called from
        # a serial port thread with data from the Arduino.
        # This assumes the StepperWinch sketch is running.
        # Sample input line: txyza 1966200716 0 0 0 0
        try:
            tokens = line.split()
            if len(tokens) >= 6:
                # ignore the timestamp and extract the stepper motor counts
                values = [int(s) for s in tokens[2:6]]

                # update the sensor values in a thread-safe way
                with self._lock:
                    self._sensors[0].value = self.steps_to_radians * values[0]
                    self._sensors[1].value = self.steps_to_radians * values[1]
                    self._sensors[2].value = self.steps_to_radians * values[2]
                    self._sensors[3].value = self.steps_to_radians * values[3]

        except ValueError:
            pass

    def get_output_string(self):
        # Generate the next string of output to the hardware, typically called
        # from a serial port thread to generate output for the Arduino.
        # This assumes the StepperWinch sketch is running.
        
        with self._lock:
            # fetch the current motor commands in a thread-safe way
            positions = [m.target for m in self._motors]
            velocities = [m.rate for m in self._motors]

        # Accumulate a list of position targets and a list of velocity targets.
        # The default mode is position, but an infinite position target
        # indicates the use of velocity mode instead.
        v_c = ""
        p_c = ""
        vel  = []
        pos  = []
        for l, p, v in zip('xyza', positions, velocities):
            if not math.isinf(p):
                p_c += l
                pos.append(str(int(self.radians_to_steps * p)))
            else:
                v_c += l
                vel.append(str(int(self.radians_to_steps * v)))

        # create either one or two commands as output
        if v_c != "":
            vcommand = "v " + v_c + " " + " ".join(vel) + "\n"
        else:
            vcommand = ""

        if p_c != "":
            pcommand = "a " + p_c + " " + " ".join(pos) + "\n"
        else:
            pcommand = ""

        return pcommand + vcommand

################################################################
# Emulate the Webots sensor API.  This object just stores and retrieves a single value.
class RealSensor(object):
    def __init__(self, name):
        self.value = 0
        self.name = name
        
    def enable(self, rate):
        pass

    def getValue(self):
        return self.value

################################################################
# Emulate the Webots motor API.  This object just stores the commanded state; the parent robot emulator
# is responsible for communicating it to the hardware controller.

class RealMotor(object):
    def __init__(self, name):
        self.rate   = 0
        self.target = 0
        self.name = name
        
    def setVelocity(self, rate):
        self.rate = rate

    def setPosition(self, target):
        self.target = target

################################################################
# Emulate the Webots 'radio' communication receiver.
class Receiver(object):
    def __init__(self):

        # create a thread-safe queue for received messages
        self.queue = queue.Queue()
        pass
    
    def enable(self, rate):
        pass

    def getQueueLength(self):
        # return the number of available packets
        return self.queue.qsize()
    
    def getData(self):
        # return the next packet data as a bytestring
        # this also removes it from the queue, which doesn't match the Webots behavior, but
        # will work as long as it isn't called twice
        data = self.queue.get()
        print(f"receiver returning {data}")
        return data
    
    def nextPacket(self):
        # remove the oldest packet from the queue
        # because it is removed by getData, this is a no-op
        pass

# Emulate the Webots 'radio' communication transmitter.
class Emitter(object):
    def __init__(self):
        self.receivers = []

    def add_receiver(self, receiver):
        self.receivers.append(receiver)
        
    def send(self, data):
        # send a bytestring to the subscribed receivers
        print(f"emitter broadcasting {data}")
        for r in self.receivers:
            r.queue.put(data)

################################################################