python/optitrack¶
This module contains Python code for managing Optitrack motion capture data.
Contents
optitrack.csv_reader¶
This is a generic Python module for parsing Optitrack CSV motion capture data. It is intended to remain compatible between command line C Python and the IronPython used for RhinoPython.
optitrack.csv_reader : a plain-Python parser for reading Optitrack CSV files in version 1.1, 1.2, or 1.21 format.
Copyright (c) 2016-2018, Garth Zeglin. All rights reserved. Licensed under the terms of the BSD 3-clause license as included in LICENSE.
This uses only Python modules common between CPython, IronPython, and RhinoPython for compatibility with both Rhino and offline testing.
Reference for the format: http://wiki.optitrack.com/index.php?title=Data_Export:_CSV
Note that the file format changed significantly between version 1.1 and 1.2.
Version 1.1 support added by Josh Bard.
-
class
optitrack.csv_reader.
ColumnMapping
(setter, axis, column)¶ -
axis
¶ Alias for field number 1
-
column
¶ Alias for field number 2
-
setter
¶ Alias for field number 0
-
-
class
optitrack.csv_reader.
Marker
(label, ID)[source]¶ Representation of a single marker trajectory. Please note that the RigidBodyMarker trajectories associated with RigidBody objects are expected marker locations, not the real marker positions represented by Marker data.
-
class
optitrack.csv_reader.
Take_V1_1
[source]¶ Representation of a CSV v1.1 motion capture Take. Each CSV file represents one Take. This format version is generated by Optitrack Motive 1.5.
optitrack.geometry¶
optitrack.geometry : plain-Python geometric utility functions.
This uses only Python modules common between CPython, IronPython, and RhinoPython for compatibility with both Rhino and offline testing.
Copyright (c) 2016, Garth Zeglin. All rights reserved. Licensed under the terms of the BSD 3-clause license as included in LICENSE.
optitrack.optiload¶
This file provides all the functions needed by a GhPython script to translate motion capture CSV take data into Grasshopper data trees.
# optiload.py : CSV v1.1 or v1.2 motion capture data loader for use within Grasshopper GhPython objects
# Copyright (c) 2016-2018, Garth Zeglin. All rights reserved. Licensed under the
# terms of the BSD 3-clause license.
# This provides the glue between GhPython scripts and the underlying CSV reader
# functions; it uses RhinoCommon and Grasshopper API to create Grasshopper data
# trees from the Python objects returned by the CSV loader.
# Please note that this converts data from default Optitrack coordinates with a XZ
# ground plane to default Rhino coordinates with XY ground plane.
# use RhinoCommon API
import Rhino
# Make sure that the Python libraries that are also contained within this course
# package are on the load path. This adds the python/ folder to the load path
# *after* the current folder. The path manipulation assumes that this module is
# still located within the Grasshopper/MocapDemo subfolder, and so the package
# modules are at ../../python.
# import sys, os
# sys.path.insert(1, os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(os.path.dirname(__file__)))), "python"))
# use itertools for subsampling sequences
import itertools
# import the Optitrack CSV file parser
import optitrack.csv_reader as csv
# import a quaternion conversion function
from optitrack.geometry import quaternion_to_xaxis_yaxis
# load the Grasshopper utility functions from the course packages
from ghutil import *
#================================================================
def load_csv_file(path):
take = csv.load(path)
return take
#================================================================
# Convert from default Optitrack coordinates with a XZ ground plane to default
# Rhino coordinates with XY ground plane.
# For Motive 2.0.1, the default coordinates have changed: Optitrack X is now lab -X, Optitrack Z is now lab +Y, and Optitrack Y is still lab +Z.
def rotated_point(pt):
if pt is None:
return None
else:
return Rhino.Geometry.Point3d( -pt[0], pt[2], pt[1])
def rotated_orientation(q):
if q is None:
return [0,0,0,1]
else:
return [-q[0], q[2], q[1], q[3]]
def plane_or_null(origin,x,y):
"""Utility function to create a Plane unless the origin is None, in which case it returns None."""
if origin is None:
return None
else:
return Rhino.Geometry.Plane(origin, x, y)
#================================================================
def all_Planes(take, stride=1):
"""Return a DataTree of trajectories containing Planes or None.
The tree has one branch for each rigid body; each branch contains a list of
objects, either Plane for a valid sample or None if the sample is missing.
Due to implicit ghpython conversions, the branches will end up described by
paths {0;0},{0;1},{0;2}, etc.
:param stride: the skip factor to apply for subsampling input (default=1, no subsampling)
"""
# Extract the origin position data and convert to Point3d objects within a
# Python list structure. The subsampling is handled within the
# comprehension. Note that missing data is returned as None. Each 'body'
# in the take.rigid_bodies dictionary is a RigidBody object. body.positions
# is a list with one element per frame, either None or [x,y,z].
origins = [ [rotated_point(pos) for pos in itertools.islice(body.positions, 0, len(body.positions), stride)] \
for body in take.rigid_bodies.values()]
# Similar to extract a tree of quaternion trajectories. The leaves are
# numbers, the dimensions are (num_bodies, num_frames, 4).
quats = [ [rotated_orientation(rot) for rot in itertools.islice(body.rotations, 0, len(body.rotations), stride)] \
for body in take.rigid_bodies.values()]
# Generate a tree of basis vector pairs (xaxis, yaxis). Dimensions are (num_bodies, num_frames, 2, 3)
basis_vectors = [[quaternion_to_xaxis_yaxis(rot) for rot in body] for body in quats]
# Extract the X axis basis elements into a tree of Vector3d objects with dimension (num_bodies, num_frames).
xaxes = [[Rhino.Geometry.Vector3d(*(basis[0])) for basis in body] for body in basis_vectors]
# Same for Y.
yaxes = [[Rhino.Geometry.Vector3d(*(basis[1])) for basis in body] for body in basis_vectors]
# Iterate over the 2D list structures, combining them into a 2D list of Plane objects.
planes = [[plane_or_null(origin, x, y) for origin,x,y in zip(os,xs,ys)] for os,xs,ys in zip(origins, xaxes, yaxes)]
# Recursively convert from a Python tree to a data tree.
return list_to_tree(planes)
#================================================================
def rigid_body_marker_names(take):
"""Return a two-level DataTree of the rigid body marker names, organized by
body. N.B., these are the reconstructed (expected) marker trajectories, not
the raw marker positions.
"""
names = list()
for body in take.rigid_bodies.values():
names.append(body.markers.keys())
return list_to_tree(names)
#================================================================
def rigid_body_marker_points(take, stride=1):
"""Return a two-level DataTree of the rigid body marker trajectories, organized by
body. N.B., these are the reconstructed (expected) marker trajectories, not
the raw marker positions.
:param stride: the skip factor to apply for subsampling input (default=1, no subsampling)
"""
points = [ [ [rotated_point(pos) for pos in itertools.islice(marker.positions, 0, len(marker.positions), stride)]
for marker in body.markers.values() ]
for body in take.rigid_bodies.values() ]
return list_to_tree(points)
#==================================================================
def rigid_body_definition_points(take, stride=1):
"""Return a two-level DataTree of the rigid body calibration, organized by
body. N.B., these are the marker locations defined relative to the rigid body frame.
:param stride: the skip factor to apply for subsampling input (default=1, no subsampling)
"""
points = [ [ [rotated_point(pos) for pos in itertools.islice(marker.positions, 0, len(marker.positions), stride)]
for marker in body.calMarkers.values() ]
for body in take.rigid_bodies.values() ]
return list_to_tree(points)
def raw_marker_points(take, stride=1):
"""Return a two-level DataTree of the markers not labeled with a rigid bodey in scene, organized by
body. N.B., these are the reconstructed (expected) marker trajectories, not
the raw marker positions.
:param stride: the skip factor to apply for subsampling input (default=1, no subsampling)
"""
points = [ [rotated_point(pos) for pos in itertools.islice(marker.positions, 0, len(marker.positions), stride)]
for marker in take.raw_markers.values() ]
return list_to_tree(points)
#==================================================================
optitrack.optirecv¶
Support for GhPython scripts to receive real-time streaming motion capture data and translate it into Grasshopper data trees.
# optirecv.py : motion capture data receiver for use within Grasshopper ghpython objects
# Copyright (c) 2016, Garth Zeglin. All rights reserved. Licensed under the
# terms of the BSD 3-clause license.
# use RhinoCommon API
import Rhino
# Make sure that the Python libraries that are also contained within this course
# package are on the load path. This adds the python/ folder to the load path
# *after* the current folder. The path manipulation assumes that this module is
# still located within the Grasshopper/MocapDemo subfolder, and so the package
# modules are at ../../python.
# import sys, os
# sys.path.insert(1, os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(os.path.dirname(__file__)))), "python"))
# import the Optitrack stream decoder
import optirx
# import a quaternion conversion function
from optitrack.geometry import quaternion_to_xaxis_yaxis
# load the Grasshopper utility functions from the course packages
from ghutil import *
# share the mocap coordinate conversion code with the CSV loader
from optitrack.optiload import rotated_point, rotated_orientation, plane_or_null
#================================================================
class OptitrackReceiver(object):
def __init__(self, version_string, ip_address=None):
# The version string should be of the form "2900" and should match the SDK version of the Motive software.
# E.g. Motive 1.9 == SDK 2.9.0.0 == "2900"
# Motive 1.8 == SDK 2.8.0.0 == "2800"
self.sdk_version = tuple(map(int,version_string)) # e.g. result is (2,9,0,0)
# create a multicast UDP receiver socket
self.receiver = optirx.mkdatasock(ip_address=ip_address)
# set non-blocking mode so the socket can be polled
self.receiver.setblocking(0)
# Keep track of the most recent results. These are stored as normal Python list structures, but
# already rotated into Rhino coordinate conventions.
self.positions = list() # list of Point3d objects
self.rotations = list() # list of [x,y,z,w] quaternions as Python list of numbers
self.bodynames = list() # list of name strings associated with the bodies
self.markers = list() # list of individual rigid body markers locations
return
#================================================================
def make_plane_list(self):
"""Return the received rigid body frames as a list of Plane or None (for missing data), one entry per rigid body stream."""
# convert each quaternion into a pair of X,Y basis vectors
basis_vectors = [quaternion_to_xaxis_yaxis(rot) for rot in self.rotations]
# Extract the X and Y axis basis elements into lists of Vector3d objects.
xaxes = [Rhino.Geometry.Vector3d(*(basis[0])) for basis in basis_vectors]
yaxes = [Rhino.Geometry.Vector3d(*(basis[1])) for basis in basis_vectors]
# Generate either Plane or None for each coordinate frame.
planes = [plane_or_null(origin, x, y) for origin,x,y in zip(self.positions, xaxes, yaxes)]
return planes
#================================================================
def get_markers(self):
return self.markers
#================================================================
def _markers_coincide(self, m1, m2):
"""For now, an exact match (could be fuzzy match)."""
return m1[0] == m2[0] and m1[1] == m2[1] and m1[2] == m2[2]
def _identify_rigid_bodies(self, sets, bodies):
"""Compare marker positions to associate a named marker set with a rigid body.
:param sets: dictionary of lists of marker coordinate triples
:param bodies: list of rigid bodies
:return: dictionary mapping body ID numbers to body name
Some of the relevant fields:
bodies[].markers is a list of marker coordinate triples
bodies[].id is an integer body identifier with the User Data field specified for the body in Motive
"""
# for now, do a simple direct comparison on a single marker on each body
mapping = dict()
for body in bodies:
marker1 = body.markers[0]
try:
for name,markerset in sets.items():
if name != 'all':
for marker in markerset:
if self._markers_coincide(marker1, marker):
mapping[body.id] = name
raise StopIteration
except StopIteration:
pass
return mapping
def _identify_labeled_markers(self, packet):
# If the markers are left with streaming ID 0 in Motive, they will be
# considered 'legacy labels' and the labeled marker ID will have
# the body ID in the upper 16 bits and the marker ID in the
# lower 16 bits. The following extracts the labeled markers
# into a hierarchical dictionary organized by body ID and marker ID.
if packet.labeled_markers is None:
return None
labeled_markers = dict()
for lmarker in packet.labeled_markers:
body_id = lmarker.id >> 16
marker_id = lmarker.id & 0xffff
if not body_id in labeled_markers:
labeled_markers[body_id] = dict()
# save the marker position; other possible fields: size occluded point_cloud_solved model_solved
labeled_markers[body_id][marker_id] = rotated_point(lmarker.position)
return labeled_markers
#================================================================
def poll(self):
"""Poll the mocap receiver port and return True if new data is available."""
try:
data = self.receiver.recv(optirx.MAX_PACKETSIZE)
except:
return False
packet = optirx.unpack(data, version=self.sdk_version)
self.packet = packet
if type(packet) is optirx.SenderData:
version = packet.natnet_version
print "NatNet version received:", version
elif type(packet) is optirx.FrameOfData:
nbodies = len(packet.rigid_bodies)
# print "Received frame data with %d rigid bodies." % nbodies
# print "Received FrameOfData with sets:", packet.sets
# There appears to be one marker set per rigid body plus 'all'.
# print "Received FrameOfData with names:", packet.sets.keys()
# print "First marker of first marker set:", packet.sets#.values()#[0][0]
# print "Received FrameOfData with rigid body IDs:", [body.id for body in packet.rigid_bodies]
# print "First marker of first rigid body:", packet.rigid_bodies[0].markers[0]
# print "First tracking flag of first rigid body:", packet.rigid_bodies[0].tracking_valid
# compare markers to associate the numbered rigid bodies with the named marker sets
# mapping = self._identify_rigid_bodies( packet.sets, packet.rigid_bodies)
# print "Body identification:", mapping
# organize the labeled markers by body
self.labeled_markers = self._identify_labeled_markers(packet)
if nbodies > 0:
# print packet.rigid_bodies[0]
# rotate the coordinates into Rhino conventions and save them in the object instance as Python lists
self.positions = [ rotated_point(body.position) for body in packet.rigid_bodies]
self.rotations = [ rotated_orientation(body.orientation) for body in packet.rigid_bodies]
self.bodynames = [ body.id for body in packet.rigid_bodies]
# self.markers = [rotated_point(point) for point in body.markers for body in packet.rigid_bodies]
# self.markers = [body.markers for body in packet.rigid_bodies]
captureSet = []
for body in packet.rigid_bodies:
markerSet = []
# this became moot in Motive 2.0.1 with NatNet streaming protocol 3.0.0:
if body.markers is not None:
for marker in body.markers:
markerSet.append(rotated_point(marker))
captureSet.append(markerSet)
self.markers = captureSet
# return a new data indication
return True
elif type(packet) is optirx.ModelDefs:
print "Received ModelDefs:", packet
else:
print "Received unhandled NatNet packet type:", packet
# else return a null result
return False
#================================================================
def frames_to_tree(frame_list):
"""Utility function to convert a list of list of Plane objects representing a trajectory segment into a GH data tree."""
# Transpose the frame list for output. As accumulated, it is a list of lists:
# [[body1_sample0, body2_sample0, body3_sample0, ...], [body1_sample1, body2_sample1, body3_sample1, ...], ...]
segment = zip(*frame_list)
# Convert a Python list-of-lists into a data tree. Segment is a list of trajectories:
# [[body1_sample0, body1_sample1, body1_sample2, ...], [body2_sample0, body2_sample1, body2_sample2, ...], ...]
return list_to_tree(segment)
#================================================================