Source code for stage.network

"""network.py

OSC UDP messaging network support common to several scripts."""

import logging
import threading
import queue
import time

from pythonosc import osc_message_builder
from pythonosc import udp_client
from pythonosc import dispatcher
from pythonosc import osc_server

# UDP port numbers for servers
from .config import lighting_UDP_port, motor_UDP_port, director_UDP_port, motor_units, valve_UDP_port, valve_units
from .config import vision_UDP_port, client_UDP_port, camera_roi

# initialize logging for this module
log = logging.getLogger('network')

#================================================================
[docs] class TheaterNetwork: """Manage the UDP sockets for sending data to the theater system.""" def __init__(self, args): self.args = args # Create the stepper motor motion networking outputs. These creates a separate socket for each motion server output for simplicity. self.motion = [] for unit in range(len(motor_units)): self.motion.append(udp_client.SimpleUDPClient(args.ip, motor_UDP_port + unit)) log.info("Opened motion client socket to send to %s:%d", args.ip, motor_UDP_port + unit) # Create the valve server networking outputs. These creates a separate socket for each valve server output for simplicity. self.valve = [] for unit in range(len(valve_units)): self.valve.append(udp_client.SimpleUDPClient(args.ip, valve_UDP_port + unit)) log.info("Opened valve client socket to send to %s:%d", args.ip, valve_UDP_port + unit) # Create a socket to communicate to the lighting server. self.lights = udp_client.SimpleUDPClient(args.ip, lighting_UDP_port) log.info("Opened lighting client socket to send to %s:%d", args.ip, lighting_UDP_port) # Create a socket to communicate to the central controller. self.director = udp_client.SimpleUDPClient(args.ip, director_UDP_port) log.info("Opened director client socket to send to %s:%d", args.ip, director_UDP_port) # Create a socket to communicate to the vision system. self.vision = udp_client.SimpleUDPClient(args.ip, vision_UDP_port) log.info("Opened vision client socket to send to %s:%d", args.ip, vision_UDP_port) # create a dictionary of flags corresponding to the active vision regions, indexed by the # region number (using a dictionary gracefully handles unexpected region indices) self.motion_detected = {idx:False for idx in range(len(camera_roi))} return def close(self): pass def num_motion_units(self): return len(self.motion) def motion_server(self, unit): return self.motion[unit]
[docs] def motion_server_by_name(self, name): """Return an OSC client for communicating with a unit of the stepper motor control system.""" for idx, unit in enumerate(motor_units): if unit.get('name') == name: return self.motion[idx] return None
def num_valve_units(self): return len(self.valve) def valve_server(self, unit): return self.valve[unit]
[docs] def valve_server_by_name(self, name): """Return an OSC client for communicating with a unit of the pneumatic valve control system.""" for idx, unit in enumerate(valve_units): if unit.get('name') == name: return self.valve[idx] return None
[docs] def lighting_server(self, unit=0): """Return an OSC client for communicating with the lighting system.""" return self.lights
#----------------------------------------------------------------
[docs] def start_vision_client(self): """Create the vision client socket and start a server thread.""" # Initialize the OSC message dispatch system. self.vision_dispatch = dispatcher.Dispatcher() self.vision_dispatch.map("/motion", self.motion_message) self.vision_dispatch.set_default_handler(self.vision_message) self.vision_client = osc_server.OSCUDPServer((self.args.recv, client_UDP_port), self.vision_dispatch) self.vision_thread = threading.Thread(target=self.vision_client.serve_forever) self.vision_thread.daemon = True self.vision_thread.start() log.info("started vision client thread on port %s:%d", self.args.recv, client_UDP_port) self.request_thread = threading.Thread(target=self.request_vision_data) self.request_thread.daemon = True self.request_thread.start() log.info("started vision request thread") return
def vision_message(self, msgaddr, *args): log.info("received unhandled vision system message %s, %s", msgaddr, args) def motion_message(self, msgaddr, *args): # process /motion messages received via OSC over UDP tracker = args[0] value = args[1] log.debug("received /motion message with %s %s", tracker, value) self.motion_detected[tracker] = value def request_vision_data(self): # background thread to renew the vision data request while True: self.vision.send_message("/request/motion", []) time.sleep(5)
#---------------------------------------------------------------- #================================================================