"""network.py
OSC UDP messaging network support common to several scripts."""
import logging
import threading
import queue
import time
from pythonosc import osc_message_builder
from pythonosc import udp_client
from pythonosc import dispatcher
from pythonosc import osc_server
# UDP port numbers for servers
from .config import lighting_UDP_port, motor_UDP_port, director_UDP_port, motor_units, valve_UDP_port, valve_units
from .config import vision_UDP_port, client_UDP_port, camera_roi
# initialize logging for this module
log = logging.getLogger('network')
#================================================================
[docs]
class TheaterNetwork:
"""Manage the UDP sockets for sending data to the theater system."""
def __init__(self, args):
self.args = args
# Create the stepper motor motion networking outputs. These creates a separate socket for each motion server output for simplicity.
self.motion = []
for unit in range(len(motor_units)):
self.motion.append(udp_client.SimpleUDPClient(args.ip, motor_UDP_port + unit))
log.info("Opened motion client socket to send to %s:%d", args.ip, motor_UDP_port + unit)
# Create the valve server networking outputs. These creates a separate socket for each valve server output for simplicity.
self.valve = []
for unit in range(len(valve_units)):
self.valve.append(udp_client.SimpleUDPClient(args.ip, valve_UDP_port + unit))
log.info("Opened valve client socket to send to %s:%d", args.ip, valve_UDP_port + unit)
# Create a socket to communicate to the lighting server.
self.lights = udp_client.SimpleUDPClient(args.ip, lighting_UDP_port)
log.info("Opened lighting client socket to send to %s:%d", args.ip, lighting_UDP_port)
# Create a socket to communicate to the central controller.
self.director = udp_client.SimpleUDPClient(args.ip, director_UDP_port)
log.info("Opened director client socket to send to %s:%d", args.ip, director_UDP_port)
# Create a socket to communicate to the vision system.
self.vision = udp_client.SimpleUDPClient(args.ip, vision_UDP_port)
log.info("Opened vision client socket to send to %s:%d", args.ip, vision_UDP_port)
# create a dictionary of flags corresponding to the active vision regions, indexed by the
# region number (using a dictionary gracefully handles unexpected region indices)
self.motion_detected = {idx:False for idx in range(len(camera_roi))}
return
def close(self):
pass
def num_motion_units(self):
return len(self.motion)
def motion_server(self, unit):
return self.motion[unit]
[docs]
def motion_server_by_name(self, name):
"""Return an OSC client for communicating with a unit of the stepper motor control system."""
for idx, unit in enumerate(motor_units):
if unit.get('name') == name:
return self.motion[idx]
return None
def num_valve_units(self):
return len(self.valve)
def valve_server(self, unit):
return self.valve[unit]
[docs]
def valve_server_by_name(self, name):
"""Return an OSC client for communicating with a unit of the pneumatic valve control system."""
for idx, unit in enumerate(valve_units):
if unit.get('name') == name:
return self.valve[idx]
return None
[docs]
def lighting_server(self, unit=0):
"""Return an OSC client for communicating with the lighting system."""
return self.lights
#----------------------------------------------------------------
[docs]
def start_vision_client(self):
"""Create the vision client socket and start a server thread."""
# Initialize the OSC message dispatch system.
self.vision_dispatch = dispatcher.Dispatcher()
self.vision_dispatch.map("/motion", self.motion_message)
self.vision_dispatch.set_default_handler(self.vision_message)
self.vision_client = osc_server.OSCUDPServer((self.args.recv, client_UDP_port), self.vision_dispatch)
self.vision_thread = threading.Thread(target=self.vision_client.serve_forever)
self.vision_thread.daemon = True
self.vision_thread.start()
log.info("started vision client thread on port %s:%d", self.args.recv, client_UDP_port)
self.request_thread = threading.Thread(target=self.request_vision_data)
self.request_thread.daemon = True
self.request_thread.start()
log.info("started vision request thread")
return
def vision_message(self, msgaddr, *args):
log.info("received unhandled vision system message %s, %s", msgaddr, args)
def motion_message(self, msgaddr, *args):
# process /motion messages received via OSC over UDP
tracker = args[0]
value = args[1]
log.debug("received /motion message with %s %s", tracker, value)
self.motion_detected[tracker] = value
def request_vision_data(self):
# background thread to renew the vision data request
while True:
self.vision.send_message("/request/motion", [])
time.sleep(5)
#----------------------------------------------------------------
#================================================================