python/scripts

Motion Capture Processing Example

This is a command-line script for testing the Optitrack CSV reader.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
#!/usr/bin/env python
"""\
test_optitrack_csv_reader.py : offline test for thee Optitrack CSV file parser.

Copyright (c) 2016-2017, Garth Zeglin. All rights reserved. Licensed under the
terms of the BSD 3-clause license as included in LICENSE.
"""

import sys, os, argparse

# Make sure that the Python libraries also contained within this course package
# are on the load path.  This adds the parent folder to the load path, assuming that this
# script is still located with the scripts/ subfolder of the Python library tree.
sys.path.insert(0, os.path.dirname(os.path.abspath(os.path.dirname(__file__))))

import optitrack.csv_reader as csv
from optitrack.geometry import *

################################################################
# begin the script

if __name__=="__main__":

    # process command line arguments

    parser = argparse.ArgumentParser( description = """Parse an Optitrack v1.2 or v1.21 CSV file and report summary information.""")
    parser.add_argument( '-v', '--verbose', action='store_true', help='Enable more detailed output.' )
    parser.add_argument( 'csv', help = 'Filename of Optitrack CSV motion capture data to process.' )

    args = parser.parse_args()

    take = csv.Take().readCSV(args.csv, args.verbose, raw_markers=True)

    print "Found rigid bodies:", take.rigid_bodies.keys()
    print "Found raw markers:", take.markers.keys()

    for body in take.rigid_bodies.values():
        print "Body %s: %d valid frames out of %d." % (body.label, body.num_valid_frames(), body.num_total_frames())

        for marker in body.markers.values():
            print "  Rigid Body Marker: %s" % marker.label
            
        if args.verbose:
            print "Position track:"
            for point in body.positions: print point

            for quat in body.rotations:
                if quat is not None:
                    # matrix = quaternion_to_rotation_matrix(quat)
                    # print matrix
                    xaxis, yaxis = quaternion_to_xaxis_yaxis(quat)
                    print xaxis, yaxis

    for marker in take.markers.values():
        print "raw marker: %s" % marker.label

Simulated Motion Capture Streaming

This is perhaps useful for debugging the motion capture streaming receiver by sending multicast mocap data packets.

from __future__ import print_function

import os, sys, socket, argparse, time

# Make sure that the Python libraries also contained within this course package
# are on the load path.  This adds the parent folder to the load path, assuming that this
# script is still located with the scripts/ subfolder of the Python library tree.

sys.path.insert(0, os.path.dirname(os.path.abspath(os.path.dirname(__file__))))
import optirx as rx

MULTICAST_ADDRESS =           "239.255.42.99"     # IANA, local network
PORT_DATA =                   1511                # Default multicast group

def gethostip():
    return socket.gethostbyname(socket.gethostname())

def make_data_sender_socket(ip_address=None):
    """Create a socket for sending multicast data."""
    ip_address = gethostip() if not ip_address else ip_address
    datasock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
    datasock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
    datasock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    datasock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 20)
    
    # print("Binding source socket to %s" % ip_address)
    datasock.bind((ip_address, 0))
    return datasock

def send_optitrack_packet(datasock, data, multicast_address=MULTICAST_ADDRESS, port=PORT_DATA):
    datasock.sendto(data, (multicast_address, port))
    return

def main(paths):
    datasock = make_data_sender_socket()
    for path in paths:
        with open(path, "rb") as input:
            print("Sending %s" % path)
            data = input.read()
            send_optitrack_packet(datasock, data)
            time.sleep(0.1)
            
if __name__ == "__main__":
    parser = argparse.ArgumentParser( description = """Send Optitrack test data packets.""")
    parser.add_argument( '-v', '--verbose', action='store_true', help='Enable more detailed output.' )
    parser.add_argument( 'filename', nargs='+', help = 'Names of binary packet files to send.')
    args = parser.parse_args()
    main(args.filename)

Mocap Streaming Data Translation Test

This is probably moot; this script tests receiving mocap packets and resending them in a simpler format into Grasshopper.

from __future__ import print_function

import os, sys, socket

# Make sure that the Python libraries also contained within this course package
# are on the load path.  This adds the parent folder to the load path, assuming that this
# script is still located with the scripts/ subfolder of the Python library tree.

sys.path.insert(0, os.path.dirname(os.path.abspath(os.path.dirname(__file__))))
import optirx as rx

from optitrack.geometry import quaternion_to_xaxis_yaxis

sdk_version = (2, 9, 0, 0)  # the latest SDK version

# Arbitrary UDP port number at which to receive data in Grasshopper.
RHINO_PORT=35443

def make_udp_sender(ip_address=None):
    """Create a normal UDP socket for sending unicast data to Rhino."""
    ip_address = rx.gethostip() if not ip_address else ip_address
    sender = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sender.bind((ip_address, 0))
    return sender

def body_record(body):
    """Create a one-line ASCII data record to transmit for each body."""
    position = "%f %f %f " % body.position
    xaxis, yaxis = quaternion_to_xaxis_yaxis(body.orientation)
    xvec = "%f %f %f " % tuple(xaxis)
    yvec = "%f %f %f\n" % tuple(yaxis)
    return position + xvec + yvec
    
def main():
    # create a multicast UDP receiver socket
    receiver = rx.mkdatasock()

    # create a unicast UDP sender socket
    sender = make_udp_sender()

    # the hostname running Grasshopper, assumed to be the same machine
    host = gethostip()
    
    while True:
        data = receiver.recv(rx.MAX_PACKETSIZE)
        packet = rx.unpack(data, version=sdk_version)
        if type(packet) is rx.SenderData:
            version = packet.natnet_version
            print("NatNet version received:", version)

        if type(packet)==rx.FrameOfData:
            print("Received frame data.")
            records = [body_record(body) for body in packet.rigid_bodies]
            msg = "".join(records)
            print("Sending", msg)
            sender.sendto( msg, (host, RHINO_PORT))

if __name__ == "__main__":
    main()

Mocap Streaming Data Capture Viewer

This is somewhat moot; this script prints out the contents of a file of captured mocap data packets.

from __future__ import print_function

import os, sys, socket, argparse

# Make sure that the Python libraries also contained within this course package
# are on the load path.  This adds the parent folder to the load path, assuming that this
# script is still located with the scripts/ subfolder of the Python library tree.
sys.path.insert(0, os.path.dirname(os.path.abspath(os.path.dirname(__file__))))
import optirx as rx

def main(paths):

    # process a Motive version string into a corresponding NatNet SDK identifier tuple
    motive_version = [int(s) for s in args.version.split('.')]
    
    if motive_version[0] == 1 and motive_version[1] == 5:
        sdk_version = (2,5,0,0)

    elif motive_version[0] == 1 and motive_version[1] == 9:
        sdk_version = (2,9,0,0)
    else:
        raise NotImplementedError
    
    paths = args.filename
    for path in paths:
        with open(path, "rb") as input:
            print("Reading %s" % path)
            data = input.read()
            packet = rx.unpack(data, sdk_version)
            print(packet)
            
if __name__ == "__main__":
    parser = argparse.ArgumentParser( description = """Parse and print a file of raw streaming Optitrack Motive NatNet packets.""")
    parser.add_argument( '--version', default='1.5.0', help='Specify the Motive version.  Default: 1.5.0')
    parser.add_argument( '-v', '--verbose', action='store_true', help='Enable more detailed output.' )
    parser.add_argument( 'filename', nargs='+', help = 'Names of binary packet files to read.')
    args = parser.parse_args()
    main(args)