"""\
optitrack.csv_reader : a plain-Python parser for reading Optitrack CSV files in version 1.1, 1.2, or 1.21 format.
Copyright (c) 2016-2018, Garth Zeglin. All rights reserved. Licensed under the
terms of the BSD 3-clause license as included in LICENSE.
This uses only Python modules common between CPython, IronPython, and
RhinoPython for compatibility with both Rhino and offline testing.
Reference for the format: http://wiki.optitrack.com/index.php?title=Data_Export:_CSV
Note that the file format changed significantly between version 1.1 and 1.2.
Version 1.1 support added by Josh Bard.
"""
################################################################
# Rhino IronPython does not include the csv module, so a very simple
# implementation of a csv file reader is included. This only supports the
# limited use of csv as generated by the Optitrack Motive software.
[docs]class CSVReader(object):
def __init__(self, stream):
self._stream = stream
return
def __iter__(self):
return self
[docs] def next(self):
# Read the next raw line from the input.
line = self._stream.next().rstrip()
# Make sure than empty lines are returned as empty lists.
if line == '':
return list()
# Note: following is a very format-specific hack. Currently, the only
# quoted fields are the ID fields, which don't really need them. This
# quick trick means that commas are not allowed in any body names.
# Remove all quoting marks:
unquoted = line.replace('"','')
# And then just use split to separate fields based on commas.
return unquoted.split(',')
################################################################
# define a utility object for describing the mapping from CSV columns to data objects
import collections
ColumnMapping = collections.namedtuple('ColumnMapping', ['setter', 'axis', 'column'])
################################################################
[docs]class Marker(object):
"""Representation of a single marker trajectory. Please note that the
RigidBodyMarker trajectories associated with RigidBody objects are *expected
marker locations*, not the real marker positions represented by Marker data.
"""
def __init__(self, label, ID):
self.label = label
self.ID = ID
self.positions = list() # list with one element per frame, either None or [x,y,z] float lists
self.times = list() # list with one element per frame with the capture time
self.quality = list() # list with one element per frame with None or the scalar quality measure
return
def _add_frame(self, t):
self.times.append(t)
self.positions.append(None)
self.quality.append(None)
def _set_position(self, frame, axis, value):
if value != '':
if self.positions[frame] is None:
self.positions[frame] = [0.0,0.0,0.0]
self.positions[frame][axis] = float(value)
def _set_quality(self, frame, axis, value):
if value != '':
self.quality[frame] = float(value)
################################################################
[docs]class RigidBody(object):
"""Representation of a single rigid body."""
def __init__(self, label, ID):
self.label = label
self.ID = ID
self.markers = dict() # set of Marker objects representing RigidBodyMarker data associated with this body, indexed by name
self.calMarkers= dict() # set of Marker objects defined in rb local frame
self.positions = list() # list with one element per frame, either None or [x,y,z] float lists
self.rotations = list() # list with one element per frame, either None or [x,y,z,w] float lists
self.times = list() # list with one element per frame with the capture time
return
def _add_frame(self, t):
self.times.append(t)
self.positions.append(None)
self.rotations.append(None)
def _get_or_add_marker(self, label, ID):
if label in self.markers:
return self.markers[label]
else:
marker = Marker(label, ID)
self.markers[label] = marker
return marker
def _get_or_add_cal_marker(self, label, ID):
if label in self.calMarkers:
return self.calMarkers[label]
else:
marker = Marker(label, ID)
self.calMarkers[label] = marker
return marker
def _set_position( self, frame, axis, value ):
if value != '':
if self.positions[frame] is None:
self.positions[frame] = [0.0,0.0,0.0]
self.positions[frame][axis] = float(value)
def _set_rotation( self, frame, axis, value ):
if value != '':
if self.rotations[frame] is None:
self.rotations[frame] = [0.0,0.0,0.0,0.0]
self.rotations[frame][axis] = float(value)
[docs] def num_total_frames(self):
return len(self.times)
[docs] def num_valid_frames(self):
count = 0
for pt in self.positions:
if pt is not None:
count = count + 1
return count
################################################################
[docs]class Take_V1_2(object):
"""Representation of a CSV v1.2 motion capture Take. Each CSV file represents
one Take. This format version is generated by Optitrack Motive 1.9.
"""
def __init__(self):
# user-accessible properties
self.frame_rate = 120.0
self.rotation_type = 'Quaternion'
self.units = 'Meters'
self.version = 1.2
# user-accessible data
self.rigid_bodies = dict() # dict of RigidBody objects, indexed by asset name string
self.markers = dict() # dict of Marker objects, indexed by asset name string
# raw header information is saved as follows:
self._raw_info = dict() # line 1: raw header fields, with values as unparsed strings
self._raw_types = list() # line 3: raw column types for all data columns (not including frame and time column)
self._raw_labels = list() # line 4: raw asset names for all data columns (not including frame and time column)
self._raw_fields = list() # line 6: raw field types for all data columns (not including frame and time column)
self._raw_axes = list() # line 7: raw axis designators for all data columns (not including frame and time column)
self._ignored_labels = set() # names of all ignored objects
self._column_map = list() # list of ColumnMap tuples defining where to store data column elements
return
[docs] def readCSV(self, path, verbose=False, raw_markers=False):
"""Load a CSV motion capture data file.
Reading the raw marker data adds significant time and memory usage, so it is disabled by default.
Args:
verbose: flag to enable debugging printout
raw_markers: flag to enable processing of raw marker data
"""
self.rigid_bodies = dict()
self._raw_info = dict()
self._ignored_labels = set()
self._column_map = list()
with open(path, 'r') as file_handle:
csv_stream = CSVReader( file_handle )
self._read_header(csv_stream, verbose, raw_markers)
self._read_data(csv_stream, verbose)
return self
# ================================================================
def _read_header(self, stream, verbose=False, raw_markers=False):
# Line 1 consists of a series of token, value pairs.
line1 = next(stream)
assert line1[0] == 'Format Version', "Unrecognized header cell: %s" % line1[0]
format = line1[1]
assert format == '1.22' or format == '1.21' or format == '1.2', "Unsupported format version: %s" % line1[1]
for columnidx in range(len(line1)/2):
self._raw_info[ line1[2*columnidx]] = line1[2*columnidx+1]
# make a few assumptions about data type
self.rotation_type = self._raw_info.get('Rotation Type')
assert self.rotation_type == 'Quaternion', 'Only the Quaternion rotation type is supported, found: %s.' % self.rotation_type
# pull a few values out, supplying reasonable defaults if they are missing
self.frame_rate = float(self._raw_info.get('Export Frame Rate', 120))
self.units = self._raw_info.get('Length Units', 'Meters')
# Line 2 is blank.
line2 = next(stream)
assert len(line2) == 0, 'Expected blank second header line, found %s.' % line2
# Line 3 designates the data type for each succeeding column.
line3 = next(stream)
self._raw_types = line3[2:]
# check for any unexpected types on line 3
all_types = set( self._raw_types )
supported_types = set(['Rigid Body', 'Rigid Body Marker', 'Marker'])
assert all_types.issubset(supported_types), 'Unsupported object type found in header line 3: %s' % all_types
# Line 4 designates the asset labels for each column (e.g. 'Rigid Body 1', or whatever name was assigned)
line4 = next(stream)
self._raw_labels = line4[2:]
# Line 5 designates the marker ID for each column
line5 = next(stream)
# Line 6 designates the data type for each column: Rotation, Position, Error Per Marker, Marker Quality
line6 = next(stream)
self._raw_fields = line6[2:]
# Line 7 designates the specific axis: Frame, Time, X, Y, Z, W, or blank
line7 = next(stream)
self._raw_axes = line7[2:]
# Process lines 3-7 at once, creating named objects to receive each frame of data for supported asset types.
for col,asset_type,label,ID,field,axis in zip( range(len(self._raw_types)), self._raw_types, self._raw_labels, \
line5[2:], self._raw_fields, self._raw_axes ):
if asset_type == 'Rigid Body':
if label in self.rigid_bodies:
body = self.rigid_bodies[label]
else:
body = RigidBody(label,ID)
self.rigid_bodies[label] = body
# create a column map entry for each rigid body axis
if field == 'Rotation':
axis_index = {'X':0, 'Y':1, 'Z':2, 'W': 3}[axis]
setter = body._set_rotation
self._column_map.append(ColumnMapping(setter, axis_index, col))
elif field == 'Position':
axis_index = {'X':0, 'Y':1, 'Z':2}[axis]
setter = body._set_position
self._column_map.append(ColumnMapping(setter, axis_index, col))
# ================================================================
elif asset_type == 'Rigid Body Marker':
# deconstruct the label to a body label and numeric index: body name_#
# for v1.22, expanding this to allow a colon instead: body name:#
if ':' in label:
separator = label.rindex(':')
else:
separator = label.rindex('_')
body_label = label[0:separator]
# find the associated rigid body, assumed to be already defined, and obtain the marker object
body = self.rigid_bodies[body_label]
marker = body._get_or_add_marker(label, ID)
# create a column map entry for each marker axis: Position X, Y, Z, or Marker Quality
if field == 'Position':
axis_index = {'X':0, 'Y':1, 'Z':2}[axis]
setter = marker._set_position
self._column_map.append(ColumnMapping(setter, axis_index, col))
elif field == 'Marker Quality':
setter = marker._set_quality
self._column_map.append(ColumnMapping(setter, 0, col))
# ================================================================
elif asset_type == 'Marker':
if raw_markers:
if label in self.markers:
marker = self.markers[label]
else:
marker = Marker(label,ID)
self.markers[label] = marker
# create a column map entry for each marker axis: Position X, Y, Z, or Marker Quality
if field == 'Position':
axis_index = {'X':0, 'Y':1, 'Z':2}[axis]
setter = marker._set_position
self._column_map.append(ColumnMapping(setter, axis_index, col))
elif field == 'Marker Quality':
setter = marker._set_quality
self._column_map.append(ColumnMapping(setter, 0, col))
# ================================================================
else:
if label not in self._ignored_labels:
if verbose: print "Ignoring object %s of type %s." % (label, asset_type)
self._ignored_labels.add(label)
# the actual frame data begins with line 8, one frame per line, starting with frame 0
return
# ================================================================
def _read_data(self, stream, verbose = False):
"""Process frame data rows from the CSV stream."""
# Note that the frame_num indices do not necessarily start from zero,
# but the setter functions assume that the array indices do. This
# implementation just ignores the original frame numbers, the frames are
# renumbered from zero.
for row_num, row in enumerate(stream):
frame_num = int(row[0])
frame_t = float(row[1])
values = row[2:]
# if verbose: print "Processing row_num %d, frame_num %d, time %f." % (row_num, frame_num, frame_t)
# add the new frame time to each object storing a trajectory
for body in self.rigid_bodies.values():
body._add_frame(frame_t)
# extend the Rigid Body Marker objects for this body
for marker in body.markers.values():
marker._add_frame(frame_t)
# extend the Marker objects for raw marker trajectories
for marker in self.markers.values():
marker._add_frame(frame_t)
# process the columns of interest
for mapping in self._column_map:
# each mapping is a namedtuple with a setter method, column index, and axis name
mapping.setter( row_num, mapping.axis, values[mapping.column] )
##################################################################################
[docs]class Take_V1_1(object):
"""Representation of a CSV v1.1 motion capture Take. Each CSV file represents
one Take. This format version is generated by Optitrack Motive 1.5.
"""
def __init__(self):
# user-accessible properties
self.frame_rate = 120.0
self.rotation_type = 'Quaternion'
self.units = 'Meters'
self.version = 1.1
self.frameCount = 0
self.rbCount = 0
# user-accessible data
self.rigid_bodies = dict() # dict of RigidBody objects, indexed by asset name string
self.raw_markers = dict() # dict of markeres not associated with rigid bodies
return
[docs] def readCSV(self, path, verbose=False):
"""Load a CSV motion capture data file."""
self.rigid_bodies = dict()
self._raw_info = dict()
self._ignored_labels = set()
self._column_map = list()
with open(path, 'r') as file_handle:
csv_stream = CSVReader(file_handle)
self._read_header(csv_stream,verbose)
self._read_data(csv_stream, verbose)
return self
# ================================================================
def _read_header(self,stream,verbose = False):
line = next(stream)
# Check first row for expected motive row labeling
assert line[0] == 'comment', "Unexpected row lable: %s" % line[0]
#===============================================================
## Read 'info' and extract relevent data
while line[0] != 'info':
line = next(stream)
# First 'info' row containts version number
assert line[1] == 'version', "Unrecognized header cell: %s" % line[0]
self.version = float(line[2])
assert self.version == 1.1, "Unsupported format version: %s" % line[2]
# Second 'info' row contains total frame count
line2 = next(stream)
assert line2[1] == 'framecount', 'expected framecount got %s' % line2[1]
self.frameCount = int(line2[2])
# Third 'info' row contains total rigid body (rb) count during take
line3 = next(stream)
assert line3[1] == 'rigidbodycount','expected rigidbodycount got %s' % line3[1]
self.rbCount = int(line3[2])
#==================================================================
## Next, initialize each rigid body (and its markers) in scene
for rb in range (self.rbCount):
rbLine = next(stream)
# rigid body info from row
label,ID,numMarkers = rbLine[1],int(rbLine[2]),int(rbLine[3])
# create rigid body
if label in self.rigid_bodies:
body = self.rigid_bodies[label]
else:
body = RigidBody(label,ID)
self.rigid_bodies[label] = body
body.numMarkers = numMarkers
## create rigid body markers
## Note: marker IDs undefined here cal-markers and rb-
## markers are not identical
for i in range (numMarkers):
fos = i*3
#initialize rigid body markers
marker_label = "%s-Marker %d" % (label,i+1)
marker = body._get_or_add_marker(marker_label,None)
#create calibration markers
cal_marker_label = "%s-Cal %d" % (label,i+1)
cal_marker = body._get_or_add_cal_marker(cal_marker_label,i+1)
cal_marker.positions = [[float(x) for x in rbLine[4+fos:7+fos]]] # Specific to V1.1 format
return
# ================================================================
def _read_data(self,stream,verbose = False):
for row_num, row in enumerate(stream):
## process all 'frame' data
if row[0] == 'frame':
frame_num = int(row[1]) # motive frame
frame_t = float(row[2]) # time stamp by frame
num_rbs = int(row[3]) # num rbs tracked in current frame
num_markers = int(row[4+num_rbs *11]) # specific to motive format
# segment frame data for rigid bodies and markers
rb_data = row[4:4+num_rbs*11]
marker_data = row[5 + num_rbs*11:]
# initialize an empty frame for each rb in the scene
[self.rigid_bodies[body]._add_frame(frame_t) for body in self.rigid_bodies]
# initialize an empty frame for each rb_marker in the scene
[[self.rigid_bodies[body].markers[marker]._add_frame(frame_t) \
for marker in self.rigid_bodies[body].markers] for body in self.rigid_bodies]
# initialize an empty frame for each raw marker in the scene
[self.raw_markers[marker]._add_frame(frame_t) for marker in self.raw_markers]
#### for each tracked rigid body in frame parse data
for i in range (num_rbs):
fos = i*11 # fixed offset per rigid body def in stream
ID = int(rb_data [fos]) # motive assigned int
position = [float(x) for x in rb_data[1+fos:4+fos]] #[x,y,z]
rotation = [float(x) for x in rb_data[4+fos:8+fos]] #[qx,qy,qz,qw]
# assign current frame data to correct rb based on ID
for body in self.rigid_bodies:
if ID == self.rigid_bodies[body].ID:
body = self.rigid_bodies[body]
body.positions[-1] = position
body.rotations[-1] = rotation
#### for each marker in the frame parse data
for i in range(num_markers):
fos = i*5 # fixed offset for marker definitions in V1.1
ID = marker_data[fos+3] # motive assigned ID
label = marker_data[fos+4]
position = [float(x) for x in marker_data[fos:fos+3]]
# deconstruct the marker label
separator = label.rindex('-')
body_label = label[0:separator]
# if rb-marker then update rb-marker frame
if body_label in self.rigid_bodies.keys():
body = self.rigid_bodies[body_label]
marker = body.markers[label]
marker.positions[frame_num] = position
if marker.ID == None:
marker.ID = ID
# else update raw-markers
else:
if label in self.raw_markers:
marker = self.raw_markers[label]
else:
marker = Marker(label,ID)
self.raw_markers[label] = marker
marker.positions.append(position)
return
################################################################
[docs]def load(path, verbose=False, **kwargs):
"""Read a Optitrack CSV motion capture take file. Several file formats are
supported; this function detects the format and returns a format-specific
object.
"""
# Read the first line of the file.
with open(path, 'r') as file:
first = file.next().rstrip()
# very ad hoc and permissive format detection
if first.startswith('comment,'):
return Take_V1_1().readCSV(path, verbose)
elif first.startswith('Format Version,1.2,'):
return Take_V1_2().readCSV(path, verbose, kwargs.get('raw_markers', False))
elif first.startswith('Format Version,1.22,'):
return Take_V1_2().readCSV(path, verbose, kwargs.get('raw_markers', False))
################################################################