Source code for script.primitives

import csv, random, logging
import numpy as np
log = logging.getLogger('primitives')

################################################################
[docs]class MotionPrimitives(object): """Class to encapsulate the set of motion primitives available for this performance. This also abstracts the basic interface to both motor and simulation and supports periodic execution. """ def __init__(self, main, pose_csv_path='script/poses'): super().__init__() self.main = main # global parameters self.tempo = 60.0 # beats per minute self.magnitude = 1.0 self.frequency = 1.0 self.damping_ratio = 1.0 self.all_axes = range(4) # index list for updating all motors self.non_zero_velocity = False self.non_zero_velocity_timeout = 0.0 # algorithmic generators self.random_mode = False self.random_mode_timer = 0 # try reading the pose table self.poses = {} self.pose_csv_path = pose_csv_path with open(self.pose_csv_path, newline='') as posefile: posereader = csv.reader(posefile) for pose in posereader: if len(pose) > 0: name = pose[0] positions = [int(s) for s in pose[1:]] self.poses[name] = positions log.debug("Read pose table: %s", self.poses) return #---- methods for distributing winch events across multiple winch sets and simulators ---------
[docs] def set_freq_damping(self): for winch in self.main.winches: winch.set_freq_damping(self.all_axes, self.frequency, self.damping_ratio) for sim in self.main.sims: sim.set_freq_damping(self.all_axes, self.frequency, self.damping_ratio) self.main.window.set_status("Frequency: %f, damping ratio: %f" % (self.frequency, self.damping_ratio)) return
[docs] def increment_target(self, winch_index, steps): """Map a given winch index to the particular winch set.""" set_index = winch_index // 4 winch_id = winch_index % 4 if set_index < self.main.num_winch_sets: self.main.winches[set_index].increment_target(winch_id, steps) self.main.sims[set_index].increment_target(winch_id, steps) return
[docs] def increment_reference(self, winch_index, steps): """Map a given winch index to the particular winch set.""" set_index = winch_index // 4 winch_id = winch_index % 4 if set_index < self.main.num_winch_sets: self.main.winches[set_index].increment_reference(winch_id, steps) self.main.sims[set_index].increment_reference(winch_id, steps) return
[docs] def set_target(self, winch_index, steps): """Map a given winch index to the particular winch set.""" set_index = winch_index // 4 winch_id = winch_index % 4 if set_index < self.main.num_winch_sets: self.main.winches[set_index].set_target(winch_id, steps) self.main.sims[set_index].set_target(winch_id, steps) return
[docs] def set_speed(self, winch_index, steps): """Map a given winch index to the particular winch set.""" set_index = winch_index // 4 winch_id = winch_index % 4 if set_index < self.main.num_winch_sets: self.main.winches[set_index].set_speed(winch_id, steps) self.main.sims[set_index].set_speed(winch_id, steps) self.main.window.set_status("Winch %d speed: %d" % (winch_index, steps)) return
[docs] def simulator_velocity(self): """Extract the current velocity vector from the simulator, which is not currently reported by the serial winches.""" return np.array([simset.qd for simset in self.main.sims]).flatten()
#--- methods for processing cue messages -------------------
[docs] def set_frequency(self, f): self.frequency = f; self.set_freq_damping() return
[docs] def set_damping(self, d): self.damping_ratio = d; self.set_freq_damping() return
[docs] def set_pose(self, name): position = self.poses.get(name) if position is not None: for i, p in enumerate(position): self.set_target(i, p)
[docs] def process_cue(self, args): if args[0] == 'pose': self.set_pose(args[1]) elif args[0] == 'random': self.random_mode = args[1] elif args[0] == 'tempo': self.tempo = args[1] elif args[0] == 'magnitude': self.magnitude = args[1] elif args[0] == 'gains': self.frequency = args[1] self.damping = args[2] self.set_freq_damping() elif args[0] == 'magnitude': self.magnitude = args[1] elif args[0] == 'gains': self.frequency = args[1] self.damping = args[2] self.set_freq_damping() return
[docs] def user_parameter_change(self, parameter, value): """Hook for user-controlled sliders, useful for debugging and tuning.""" log.debug("Motion primitives user_parameter_change: %d, %f", parameter, value)
#--- methods for periodic algorithmic activity -----------
[docs] def update_for_interval(self, interval): # always poll the simulators vel = self.simulator_velocity() sqvelmag = vel.dot(vel) self.non_zero_velocity_timeout -= interval if self.non_zero_velocity_timeout < 0: if self.non_zero_velocity: if sqvelmag < 5.0: self.non_zero_velocity = False self.main.script.input.put(('status', 'stopped')) log.debug("primitives issuing message ('status', 'stopped')") self.non_zero_velocity_timeout = 0.1 else: if sqvelmag > 10.0: self.non_zero_velocity = True self.main.script.input.put(('status', 'moving')) log.debug("primitives issuing message ('status', 'moving')") self.non_zero_velocity_timeout = 0.1 # run the random generator if needed if self.random_mode: self.random_mode_timer -= interval if self.random_mode_timer < 0: self.random_mode_timer += self.tempo/60.0 winch = random.randint(0,3) limit = int(800*self.magnitude) offset = random.randint(-limit, limit) self.increment_target(winch, offset)