Source code for optitrack.csv_reader

"""\
optitrack.csv_reader : a plain-Python parser for reading Optitrack CSV files in version 1.1, 1.2, or 1.21 format.

Copyright (c) 2016-2018, Garth Zeglin. All rights reserved. Licensed under the
terms of the BSD 3-clause license as included in LICENSE.

This uses only Python modules common between CPython, IronPython, and
RhinoPython for compatibility with both Rhino and offline testing.

Reference for the format: http://wiki.optitrack.com/index.php?title=Data_Export:_CSV

Note that the file format changed significantly between version 1.1 and 1.2.

Version 1.1 support added by Josh Bard.

"""
################################################################
# Rhino IronPython does not include the csv module, so a very simple
# implementation of a csv file reader is included.  This only supports the
# limited use of csv as generated by the Optitrack Motive software.

[docs]class CSVReader(object): def __init__(self, stream): self._stream = stream return def __iter__(self): return self
[docs] def next(self): # Read the next raw line from the input. line = self._stream.next().rstrip() # Make sure than empty lines are returned as empty lists. if line == '': return list() # Note: following is a very format-specific hack. Currently, the only # quoted fields are the ID fields, which don't really need them. This # quick trick means that commas are not allowed in any body names. # Remove all quoting marks: unquoted = line.replace('"','') # And then just use split to separate fields based on commas. return unquoted.split(',')
################################################################ # define a utility object for describing the mapping from CSV columns to data objects import collections ColumnMapping = collections.namedtuple('ColumnMapping', ['setter', 'axis', 'column']) ################################################################
[docs]class Marker(object): """Representation of a single marker trajectory. Please note that the RigidBodyMarker trajectories associated with RigidBody objects are *expected marker locations*, not the real marker positions represented by Marker data. """ def __init__(self, label, ID): self.label = label self.ID = ID self.positions = list() # list with one element per frame, either None or [x,y,z] float lists self.times = list() # list with one element per frame with the capture time self.quality = list() # list with one element per frame with None or the scalar quality measure return def _add_frame(self, t): self.times.append(t) self.positions.append(None) self.quality.append(None) def _set_position(self, frame, axis, value): if value != '': if self.positions[frame] is None: self.positions[frame] = [0.0,0.0,0.0] self.positions[frame][axis] = float(value) def _set_quality(self, frame, axis, value): if value != '': self.quality[frame] = float(value)
################################################################
[docs]class RigidBody(object): """Representation of a single rigid body.""" def __init__(self, label, ID): self.label = label self.ID = ID self.markers = dict() # set of Marker objects representing RigidBodyMarker data associated with this body, indexed by name self.calMarkers= dict() # set of Marker objects defined in rb local frame self.positions = list() # list with one element per frame, either None or [x,y,z] float lists self.rotations = list() # list with one element per frame, either None or [x,y,z,w] float lists self.times = list() # list with one element per frame with the capture time return def _add_frame(self, t): self.times.append(t) self.positions.append(None) self.rotations.append(None) def _get_or_add_marker(self, label, ID): if label in self.markers: return self.markers[label] else: marker = Marker(label, ID) self.markers[label] = marker return marker def _get_or_add_cal_marker(self, label, ID): if label in self.calMarkers: return self.calMarkers[label] else: marker = Marker(label, ID) self.calMarkers[label] = marker return marker def _set_position( self, frame, axis, value ): if value != '': if self.positions[frame] is None: self.positions[frame] = [0.0,0.0,0.0] self.positions[frame][axis] = float(value) def _set_rotation( self, frame, axis, value ): if value != '': if self.rotations[frame] is None: self.rotations[frame] = [0.0,0.0,0.0,0.0] self.rotations[frame][axis] = float(value)
[docs] def num_total_frames(self): return len(self.times)
[docs] def num_valid_frames(self): count = 0 for pt in self.positions: if pt is not None: count = count + 1 return count
################################################################
[docs]class Take_V1_2(object): """Representation of a CSV v1.2 motion capture Take. Each CSV file represents one Take. This format version is generated by Optitrack Motive 1.9. """ def __init__(self): # user-accessible properties self.frame_rate = 120.0 self.rotation_type = 'Quaternion' self.units = 'Meters' self.version = 1.2 # user-accessible data self.rigid_bodies = dict() # dict of RigidBody objects, indexed by asset name string self.markers = dict() # dict of Marker objects, indexed by asset name string # raw header information is saved as follows: self._raw_info = dict() # line 1: raw header fields, with values as unparsed strings self._raw_types = list() # line 3: raw column types for all data columns (not including frame and time column) self._raw_labels = list() # line 4: raw asset names for all data columns (not including frame and time column) self._raw_fields = list() # line 6: raw field types for all data columns (not including frame and time column) self._raw_axes = list() # line 7: raw axis designators for all data columns (not including frame and time column) self._ignored_labels = set() # names of all ignored objects self._column_map = list() # list of ColumnMap tuples defining where to store data column elements return
[docs] def readCSV(self, path, verbose=False, raw_markers=False): """Load a CSV motion capture data file. Reading the raw marker data adds significant time and memory usage, so it is disabled by default. Args: verbose: flag to enable debugging printout raw_markers: flag to enable processing of raw marker data """ self.rigid_bodies = dict() self._raw_info = dict() self._ignored_labels = set() self._column_map = list() with open(path, 'r') as file_handle: csv_stream = CSVReader( file_handle ) self._read_header(csv_stream, verbose, raw_markers) self._read_data(csv_stream, verbose) return self
# ================================================================ def _read_header(self, stream, verbose=False, raw_markers=False): # Line 1 consists of a series of token, value pairs. line1 = next(stream) assert line1[0] == 'Format Version', "Unrecognized header cell: %s" % line1[0] format = line1[1] assert format == '1.21' or format == '1.2', "Unsupported format version: %s" % line1[1] for columnidx in range(len(line1)/2): self._raw_info[ line1[2*columnidx]] = line1[2*columnidx+1] # make a few assumptions about data type self.rotation_type = self._raw_info.get('Rotation Type') assert self.rotation_type == 'Quaternion', 'Only the Quaternion rotation type is supported, found: %s.' % self.rotation_type # pull a few values out, supplying reasonable defaults if they are missing self.frame_rate = float(self._raw_info.get('Export Frame Rate', 120)) self.units = self._raw_info.get('Length Units', 'Meters') # Line 2 is blank. line2 = next(stream) assert len(line2) == 0, 'Expected blank second header line, found %s.' % line2 # Line 3 designates the data type for each succeeding column. line3 = next(stream) self._raw_types = line3[2:] # check for any unexpected types on line 3 all_types = set( self._raw_types ) supported_types = set(['Rigid Body', 'Rigid Body Marker', 'Marker']) assert all_types.issubset(supported_types), 'Unsupported object type found in header line 3: %s' % all_types # Line 4 designates the asset labels for each column (e.g. 'Rigid Body 1', or whatever name was assigned) line4 = next(stream) self._raw_labels = line4[2:] # Line 5 designates the marker ID for each column line5 = next(stream) # Line 6 designates the data type for each column: Rotation, Position, Error Per Marker, Marker Quality line6 = next(stream) self._raw_fields = line6[2:] # Line 7 designates the specific axis: Frame, Time, X, Y, Z, W, or blank line7 = next(stream) self._raw_axes = line7[2:] # Process lines 3-7 at once, creating named objects to receive each frame of data for supported asset types. for col,asset_type,label,ID,field,axis in zip( range(len(self._raw_types)), self._raw_types, self._raw_labels, \ line5[2:], self._raw_fields, self._raw_axes ): if asset_type == 'Rigid Body': if label in self.rigid_bodies: body = self.rigid_bodies[label] else: body = RigidBody(label,ID) self.rigid_bodies[label] = body # create a column map entry for each rigid body axis if field == 'Rotation': axis_index = {'X':0, 'Y':1, 'Z':2, 'W': 3}[axis] setter = body._set_rotation self._column_map.append(ColumnMapping(setter, axis_index, col)) elif field == 'Position': axis_index = {'X':0, 'Y':1, 'Z':2}[axis] setter = body._set_position self._column_map.append(ColumnMapping(setter, axis_index, col)) # ================================================================ elif asset_type == 'Rigid Body Marker': # deconstruct the label to a body label and numeric index: body name_# separator = label.rindex('_') body_label = label[0:separator] # find the associated rigid body, assumed to be already defined, and obtain the marker object body = self.rigid_bodies[body_label] marker = body._get_or_add_marker(label, ID) # create a column map entry for each marker axis: Position X, Y, Z, or Marker Quality if field == 'Position': axis_index = {'X':0, 'Y':1, 'Z':2}[axis] setter = marker._set_position self._column_map.append(ColumnMapping(setter, axis_index, col)) elif field == 'Marker Quality': setter = marker._set_quality self._column_map.append(ColumnMapping(setter, 0, col)) # ================================================================ elif asset_type == 'Marker': if raw_markers: if label in self.markers: marker = self.markers[label] else: marker = Marker(label,ID) self.markers[label] = marker # create a column map entry for each marker axis: Position X, Y, Z, or Marker Quality if field == 'Position': axis_index = {'X':0, 'Y':1, 'Z':2}[axis] setter = marker._set_position self._column_map.append(ColumnMapping(setter, axis_index, col)) elif field == 'Marker Quality': setter = marker._set_quality self._column_map.append(ColumnMapping(setter, 0, col)) # ================================================================ else: if label not in self._ignored_labels: if verbose: print "Ignoring object %s of type %s." % (label, asset_type) self._ignored_labels.add(label) # the actual frame data begins with line 8, one frame per line, starting with frame 0 return # ================================================================ def _read_data(self, stream, verbose = False): """Process frame data rows from the CSV stream.""" # Note that the frame_num indices do not necessarily start from zero, # but the setter functions assume that the array indices do. This # implementation just ignores the original frame numbers, the frames are # renumbered from zero. for row_num, row in enumerate(stream): frame_num = int(row[0]) frame_t = float(row[1]) values = row[2:] # if verbose: print "Processing row_num %d, frame_num %d, time %f." % (row_num, frame_num, frame_t) # add the new frame time to each object storing a trajectory for body in self.rigid_bodies.values(): body._add_frame(frame_t) # extend the Rigid Body Marker objects for this body for marker in body.markers.values(): marker._add_frame(frame_t) # extend the Marker objects for raw marker trajectories for marker in self.markers.values(): marker._add_frame(frame_t) # process the columns of interest for mapping in self._column_map: # each mapping is a namedtuple with a setter method, column index, and axis name mapping.setter( row_num, mapping.axis, values[mapping.column] )
##################################################################################
[docs]class Take_V1_1(object): """Representation of a CSV v1.1 motion capture Take. Each CSV file represents one Take. This format version is generated by Optitrack Motive 1.5. """ def __init__(self): # user-accessible properties self.frame_rate = 120.0 self.rotation_type = 'Quaternion' self.units = 'Meters' self.version = 1.1 self.frameCount = 0 self.rbCount = 0 # user-accessible data self.rigid_bodies = dict() # dict of RigidBody objects, indexed by asset name string self.raw_markers = dict() # dict of markeres not associated with rigid bodies return
[docs] def readCSV(self, path, verbose=False): """Load a CSV motion capture data file.""" self.rigid_bodies = dict() self._raw_info = dict() self._ignored_labels = set() self._column_map = list() with open(path, 'r') as file_handle: csv_stream = CSVReader(file_handle) self._read_header(csv_stream,verbose) self._read_data(csv_stream, verbose) return self
# ================================================================ def _read_header(self,stream,verbose = False): line = next(stream) # Check first row for expected motive row labeling assert line[0] == 'comment', "Unexpected row lable: %s" % line[0] #=============================================================== ## Read 'info' and extract relevent data while line[0] != 'info': line = next(stream) # First 'info' row containts version number assert line[1] == 'version', "Unrecognized header cell: %s" % line[0] self.version = float(line[2]) assert self.version == 1.1, "Unsupported format version: %s" % line[2] # Second 'info' row contains total frame count line2 = next(stream) assert line2[1] == 'framecount', 'expected framecount got %s' % line2[1] self.frameCount = int(line2[2]) # Third 'info' row contains total rigid body (rb) count during take line3 = next(stream) assert line3[1] == 'rigidbodycount','expected rigidbodycount got %s' % line3[1] self.rbCount = int(line3[2]) #================================================================== ## Next, initialize each rigid body (and its markers) in scene for rb in range (self.rbCount): rbLine = next(stream) # rigid body info from row label,ID,numMarkers = rbLine[1],int(rbLine[2]),int(rbLine[3]) # create rigid body if label in self.rigid_bodies: body = self.rigid_bodies[label] else: body = RigidBody(label,ID) self.rigid_bodies[label] = body body.numMarkers = numMarkers ## create rigid body markers ## Note: marker IDs undefined here cal-markers and rb- ## markers are not identical for i in range (numMarkers): fos = i*3 #initialize rigid body markers marker_label = "%s-Marker %d" % (label,i+1) marker = body._get_or_add_marker(marker_label,None) #create calibration markers cal_marker_label = "%s-Cal %d" % (label,i+1) cal_marker = body._get_or_add_cal_marker(cal_marker_label,i+1) cal_marker.positions = [[float(x) for x in rbLine[4+fos:7+fos]]] # Specific to V1.1 format return # ================================================================ def _read_data(self,stream,verbose = False): for row_num, row in enumerate(stream): ## process all 'frame' data if row[0] == 'frame': frame_num = int(row[1]) # motive frame frame_t = float(row[2]) # time stamp by frame num_rbs = int(row[3]) # num rbs tracked in current frame num_markers = int(row[4+num_rbs *11]) # specific to motive format # segment frame data for rigid bodies and markers rb_data = row[4:4+num_rbs*11] marker_data = row[5 + num_rbs*11:] # initialize an empty frame for each rb in the scene [self.rigid_bodies[body]._add_frame(frame_t) for body in self.rigid_bodies] # initialize an empty frame for each rb_marker in the scene [[self.rigid_bodies[body].markers[marker]._add_frame(frame_t) \ for marker in self.rigid_bodies[body].markers] for body in self.rigid_bodies] # initialize an empty frame for each raw marker in the scene [self.raw_markers[marker]._add_frame(frame_t) for marker in self.raw_markers] #### for each tracked rigid body in frame parse data for i in range (num_rbs): fos = i*11 # fixed offset per rigid body def in stream ID = int(rb_data [fos]) # motive assigned int position = [float(x) for x in rb_data[1+fos:4+fos]] #[x,y,z] rotation = [float(x) for x in rb_data[4+fos:8+fos]] #[qx,qy,qz,qw] # assign current frame data to correct rb based on ID for body in self.rigid_bodies: if ID == self.rigid_bodies[body].ID: body = self.rigid_bodies[body] body.positions[-1] = position body.rotations[-1] = rotation #### for each marker in the frame parse data for i in range(num_markers): fos = i*5 # fixed offset for marker definitions in V1.1 ID = marker_data[fos+3] # motive assigned ID label = marker_data[fos+4] position = [float(x) for x in marker_data[fos:fos+3]] # deconstruct the marker label separator = label.rindex('-') body_label = label[0:separator] # if rb-marker then update rb-marker frame if body_label in self.rigid_bodies.keys(): body = self.rigid_bodies[body_label] marker = body.markers[label] marker.positions[frame_num] = position if marker.ID == None: marker.ID = ID # else update raw-markers else: if label in self.raw_markers: marker = self.raw_markers[label] else: marker = Marker(label,ID) self.raw_markers[label] = marker marker.positions.append(position) return
################################################################
[docs]def load(path, verbose=False, **kwargs): """Read a Optitrack CSV motion capture take file. Several file formats are supported; this function detects the format and returns a format-specific object. """ # Read the first line of the file. with open(path, 'r') as file: first = file.next().rstrip() # very ad hoc and permissive format detection if first.startswith('comment,'): return Take_V1_1().readCSV(path, verbose) elif first.startswith('Format Version,1.2,'): return Take_V1_2().readCSV(path, verbose, kwargs.get('raw_markers', False))
################################################################