Source code for optitrack.csv_reader

"""\
optitrack.csv_reader : a plain-Python parser for reading Optitrack CSV files in version 1.2 or 1.21 format.

This uses only Python modules common between CPython, IronPython, and
RhinoPython for compatibility with both Rhino and offline testing.

Reference for the format: http://wiki.optitrack.com/index.php?title=Data_Export:_CSV

Note that the file format has changed significantly since version 1.1.

Copyright (c) 2016, Garth Zeglin. All rights reserved. Licensed under the
terms of the BSD 3-clause license as included in LICENSE.

"""
################################################################
# Rhino IronPython does not include the csv module, so a very simple
# implementation of a csv file reader is included.  This only supports the
# limited use of csv as generated by the Optitrack Motive software.

[docs]class CSVReader(object): def __init__(self, stream): self._stream = stream return def __iter__(self): return self
[docs] def next(self): # Read the next raw line from the input. line = self._stream.next().rstrip() # Make sure than empty lines are returned as empty lists. if line == '': return list() # Note: following is a very format-specific hack. Currently, the only # quoted fields are the ID fields, which don't really need them. This # quick trick means that commas are not allowed in any body names. # Remove all quoting marks: unquoted = line.replace('"','') # And then just use split to separate fields based on commas. return unquoted.split(',')
################################################################ # define a utility object for describing the mapping from CSV columns to data objects import collections ColumnMapping = collections.namedtuple('ColumnMapping', ['setter', 'axis', 'column']) ################################################################
[docs]class Marker(object): """Representation of a single marker trajectory. Please note that the RigidBodyMarker trajectories associated with RigidBody objects are *expected marker locations*, not the real marker positions represented by Marker data. """ def __init__(self, label, ID): self.label = label self.ID = ID self.positions = list() # list with one element per frame, either None or [x,y,z] float lists self.times = list() # list with one element per frame with the capture time self.quality = list() # list with one element per frame with None of the scalar quality measure return def _add_frame(self, t): self.times.append(t) self.positions.append(None) self.quality.append(None) def _set_position(self, frame, axis, value): if value != '': if self.positions[frame] is None: self.positions[frame] = [0.0,0.0,0.0] self.positions[frame][axis] = float(value) def _set_quality(self, frame, axis, value): if value != '': self.quality[frame] = float(value)
################################################################
[docs]class RigidBody(object): """Representation of a single rigid body.""" def __init__(self, label, ID): self.label = label self.ID = ID self.markers = dict() # set of Marker objects representing RigidBodyMarker data associated with this body, indexed by name self.positions = list() # list with one element per frame, either None or [x,y,z] float lists self.rotations = list() # list with one element per frame, either None or [x,y,z,w] float lists self.times = list() # list with one element per frame with the capture time return def _add_frame(self, t): self.times.append(t) self.positions.append(None) self.rotations.append(None) def _get_or_add_marker(self, label, ID): if label in self.markers: return self.markers[label] else: marker = Marker(label, ID) self.markers[label] = marker return marker def _set_position( self, frame, axis, value ): if value != '': if self.positions[frame] is None: self.positions[frame] = [0.0,0.0,0.0] self.positions[frame][axis] = float(value) def _set_rotation( self, frame, axis, value ): if value != '': if self.rotations[frame] is None: self.rotations[frame] = [0.0,0.0,0.0,0.0] self.rotations[frame][axis] = float(value)
[docs] def num_total_frames(self): return len(self.times)
[docs] def num_valid_frames(self): count = 0 for pt in self.positions: if pt is not None: count = count + 1 return count
################################################################
[docs]class Take(object): """Representation of a motion capture Take. Each CSV file represents one Take. """ def __init__(self): # user-accessible properties self.frame_rate = 120.0 self.rotation_type = 'Quaternion' self.units = 'Meters' # user-accessible data self.rigid_bodies = dict() # dict of RigidBody objects, indexed by asset name string self.markers = dict() # dict of Marker objects, indexed by asset name string # raw header information is saved as follows: self._raw_info = dict() # line 1: raw header fields, with values as unparsed strings self._raw_types = list() # line 3: raw column types for all data columns (not including frame and time column) self._raw_labels = list() # line 4: raw asset names for all data columns (not including frame and time column) self._raw_fields = list() # line 6: raw field types for all data columns (not including frame and time column) self._raw_axes = list() # line 7: raw axis designators for all data columns (not including frame and time column) self._ignored_labels = set() # names of all ignored objects self._column_map = list() # list of ColumnMap tuples defining where to store data column elements return
[docs] def readCSV(self, path, verbose=False, raw_markers=False): """Load a CSV motion capture data file. Reading the raw marker data adds significant time and memory usage, so it is disabled by default. Args: verbose: flag to enable debugging printout raw_markers: flag to enable processing of raw marker data """ self.rigid_bodies = dict() self._raw_info = dict() self._ignored_labels = set() self._column_map = list() with open(path, 'r') as file_handle: csv_stream = CSVReader( file_handle ) self._read_header(csv_stream, verbose, raw_markers) self._read_data(csv_stream, verbose) return self
# ================================================================ def _read_header(self, stream, verbose=False, raw_markers=False): # Line 1 consists of a series of token, value pairs. line1 = next(stream) assert line1[0] == 'Format Version', "Unrecognized header cell: %s" % line1[0] format = line1[1] assert format == '1.21' or format == '1.2', "Unsupported format version: %s" % line1[1] for columnidx in range(len(line1)/2): self._raw_info[ line1[2*columnidx]] = line1[2*columnidx+1] # make a few assumptions about data type self.rotation_type = self._raw_info.get('Rotation Type') assert self.rotation_type == 'Quaternion', 'Only the Quaternion rotation type is supported, found: %s.' % self.rotation_type # pull a few values out, supplying reasonable defaults if they are missing self.frame_rate = float(self._raw_info.get('Export Frame Rate', 120)) self.units = self._raw_info.get('Length Units', 'Meters') # Line 2 is blank. line2 = next(stream) assert len(line2) == 0, 'Expected blank second header line, found %s.' % line2 # Line 3 designates the data type for each succeeding column. line3 = next(stream) self._raw_types = line3[2:] # check for any unexpected types on line 3 all_types = set( self._raw_types ) supported_types = set(['Rigid Body', 'Rigid Body Marker', 'Marker']) assert all_types.issubset(supported_types), 'Unsupported object type found in header line 3: %s' % all_types # Line 4 designates the asset labels for each column (e.g. 'Rigid Body 1', or whatever name was assigned) line4 = next(stream) self._raw_labels = line4[2:] # Line 5 designates the marker ID for each column line5 = next(stream) # Line 6 designates the data type for each column: Rotation, Position, Error Per Marker, Marker Quality line6 = next(stream) self._raw_fields = line6[2:] # Line 7 designates the specific axis: Frame, Time, X, Y, Z, W, or blank line7 = next(stream) self._raw_axes = line7[2:] # Process lines 3-7 at once, creating named objects to receive each frame of data for supported asset types. for col,asset_type,label,ID,field,axis in zip( range(len(self._raw_types)), self._raw_types, self._raw_labels, \ line5[2:], self._raw_fields, self._raw_axes ): if asset_type == 'Rigid Body': if label in self.rigid_bodies: body = self.rigid_bodies[label] else: body = RigidBody(label,ID) self.rigid_bodies[label] = body # create a column map entry for each rigid body axis if field == 'Rotation': axis_index = {'X':0, 'Y':1, 'Z':2, 'W': 3}[axis] setter = body._set_rotation self._column_map.append(ColumnMapping(setter, axis_index, col)) elif field == 'Position': axis_index = {'X':0, 'Y':1, 'Z':2}[axis] setter = body._set_position self._column_map.append(ColumnMapping(setter, axis_index, col)) # ================================================================ elif asset_type == 'Rigid Body Marker': # deconstruct the label to a body label and numeric index: body name_# separator = label.rindex('_') body_label = label[0:separator] # find the associated rigid body, assumed to be already defined, and obtain the marker object body = self.rigid_bodies[body_label] marker = body._get_or_add_marker(label, ID) # create a column map entry for each marker axis: Position X, Y, Z, or Marker Quality if field == 'Position': axis_index = {'X':0, 'Y':1, 'Z':2}[axis] setter = marker._set_position self._column_map.append(ColumnMapping(setter, axis_index, col)) elif field == 'Marker Quality': setter = marker._set_quality self._column_map.append(ColumnMapping(setter, 0, col)) # ================================================================ elif asset_type == 'Marker': if raw_markers: if label in self.markers: marker = self.markers[label] else: marker = Marker(label,ID) self.markers[label] = marker # create a column map entry for each marker axis: Position X, Y, Z, or Marker Quality if field == 'Position': axis_index = {'X':0, 'Y':1, 'Z':2}[axis] setter = marker._set_position self._column_map.append(ColumnMapping(setter, axis_index, col)) elif field == 'Marker Quality': setter = marker._set_quality self._column_map.append(ColumnMapping(setter, 0, col)) # ================================================================ else: if label not in self._ignored_labels: if verbose: print "Ignoring object %s of type %s." % (label, asset_type) self._ignored_labels.add(label) # the actual frame data begins with line 8, one frame per line, starting with frame 0 return # ================================================================ def _read_data(self, stream, verbose = False): """Process frame data rows from the CSV stream.""" # Note that the frame_num indices do not necessarily start from zero, # but the setter functions assume that the array indices do. This # implementation just ignores the original frame numbers, the frames are # renumbered from zero. for row_num, row in enumerate(stream): frame_num = int(row[0]) frame_t = float(row[1]) values = row[2:] # if verbose: print "Processing row_num %d, frame_num %d, time %f." % (row_num, frame_num, frame_t) # add the new frame time to each object storing a trajectory for body in self.rigid_bodies.values(): body._add_frame(frame_t) # extend the Rigid Body Marker objects for this body for marker in body.markers.values(): marker._add_frame(frame_t) # extend the Marker objects for raw marker trajectories for marker in self.markers.values(): marker._add_frame(frame_t) # process the columns of interest for mapping in self._column_map: # each mapping is a namedtuple with a setter method, column index, and axis name mapping.setter( row_num, mapping.axis, values[mapping.column] )
# ================================================================