Source code for streaming.pb_instrument

#!/usr/bin/env python3
"""pb_instrument.py

Pausch Bridge visual light instrument for real-time performance and improvisation.
Sends image output in real time as OSC messages to a previewer and/or the bridge.
"""
#-------------------------------------------------------------------------------
# standard Python libraries
import os, sys, logging, datetime, time
import configparser, argparse, json
import math, itertools
import signal, tty, termios, select

# This uses the OpenCV computer vision library for image processing.
#   installation:      pip3 install opencv-python
#   pypi description:  https://pypi.org/project/opencv-python/
#   API documentation: https://docs.opencv.org/master/
import cv2 as cv

# This uses the NumPy numerical library for image matrix manipulation.
#   installation:      pip3 install numpy
#   pypi description:  https://pypi.org/project/numpy
#   API documentation: https://numpy.org/doc/stable/
import numpy as np

# This uses python-osc to send UDP packets with image data encoded using OSC.
#   installation:      pip3 install python-osc
#   source code:       https://github.com/attwad/python-osc
#   pypi description:  https://pypi.org/project/python-osc/
from pythonosc import udp_client
from pythonosc import osc_bundle_builder
from pythonosc import osc_message_builder

#--------------------------------------------------------------------------------
# set up logger for module
scriptname = os.path.splitext(os.path.basename(__file__))[0]
log = logging.getLogger(scriptname)

#-------------------------------------------------------------------------------
[docs]def create_defaults(): """Create a configparser object with default values for all configurable settings. Note that this includes values for several different objects. """ config = configparser.ConfigParser() config['log'] = { 'file_log_level' : '20', # log INFO to file 'console_log_level' : '30', } # log WARNING to console config['video'] = { 'frame_width' : '228', 'frame_height' : '8', 'frame_rate' : '30', 'codec' : 'png' } config['network'] = { 'osc_preview_host' : '127.0.0.1', 'osc_preview_port' : '23432', 'osc_bridge_host' : '', 'osc_bridge_port' : '23432'} # colors are stored as JSON in the configuration file config['colors'] = { 'colors' : """{"white":[255,255,255],"black":[0,0,0]}""", 'palettes' : """{"mono": [[255,255,255],[0,0,0]], "Ukraine":[[0,91,187],[255,213,0]]}""" } # PianoRoll parameters config['pianoroll'] = { 'images' : '[]', 'tempo' : '30' } # ConwayLife parameters config['life'] = { 'world_rows' : '32', 'world_cols' : '228', 'min_alive' : '200', 'tempo' : '240', 'random' : '10' } # Diffusion parameters config['diffusion'] = {'impulse_interval': '60', 'decay' : '1.0' } # Instrument parameters config['instrument'] = {'spatial_quantize' : 'False'} return config
#------------------------------------------------------------------------------- # Enable logging at different levels to both a log file and the console. def _configure_logging(args, config): # Add root log handler to stream messages to the terminal console. console_handler = logging.StreamHandler() console_level = int(config['log']['console_log_level']) if args.verbose: console_level = min(logging.DEBUG, console_level) console_handler.setLevel(console_level) console_handler.setFormatter(logging.Formatter('%(levelname)s:%(name)s: %(message)s')) logging.getLogger().addHandler(console_handler) # Add a file handler to the root logger. log_path = scriptname + '.log' file_handler = logging.FileHandler(log_path) file_handler.setLevel(int(config['log']['file_log_level'])) file_handler.setFormatter(logging.Formatter('%(asctime)s:%(levelname)s:%(name)s: %(message)s', datefmt="%Y-%m-%d %H:%M:%S")) logging.getLogger().addHandler(file_handler) # Set the root logger to the lowest requested log level. logging.getLogger().setLevel(min(console_handler.level, file_handler.level)) #================================================================
[docs]def keyframe_interpolator(keyframes, tempo=60, frame_rate=30.0): """Generator function to produce successive frames of a video sequence by linear interpolation between keyframes at a constant tempo. Yields a video image frame. Exits once the sequence is complete. Being a generator, it returns a function which follows the iterator protocol, e.g. which can be evaluated using next() to produce images. :param keyframes: iterable of images, typically a generator function :param tempo: keyframe rate in beats per minute """ keyframe_phase = 0.0 # unit phase for the cross-fade, cycles over 0 to 1 frame_interval = 1.0 / frame_rate # seconds between video frames keyframe_interval = 60.0 / tempo # seconds between key frames keyframe_rate = 1.0 / (frame_rate * keyframe_interval) # phase / frame keyframe_generator = iter(keyframes) # iterator for the keyframes # Generate the first keyframe or end sequence. try: frame0 = next(keyframe_generator) except StopIteration: return # Generate the second keyframe. If null, repeat the first keyframe for a single beat. try: frame1 = next(keyframe_generator) except StopIteration: frame1 = frame0 while True: # Cross-fade between successive key frames at the given tempo. This will # return a new output frame with a blend of the source images. frame = cv.addWeighted(frame0, (1.0 - keyframe_phase), frame1, keyframe_phase, 0.0) # Return the frame and advance the generator state. yield frame # Advance the cross-fade phase. keyframe_phase += keyframe_rate # Once the second keyframe is reached, reset the fade and generate the successor. if keyframe_phase > 1.0: keyframe_phase -= 1.0 frame0 = frame1 # generate the next keyframe or end sequence if done try: frame1 = next(keyframe_generator) except StopIteration: return
#===============================================================================
[docs]def crossfade(source1, source2, duration): """Generator function to produce successive frames of a finite video sequence which fades from one source to another. Yields a video image frame. Exits once the fade sequence is complete. Being a generator, it returns a function which follows the iterator protocol, e.g. which can be evaluated using next() to produce images. :param source1: iterable of images fading out :param source2: iterable of images fading in :param duration: cross-fade duration in frames """ # one-shot fade-in, after which the sequence ends if duration > 0: source1 = iter(source1) source2 = iter(source2) for count in range(duration): phase = count / (duration-1) frame1 = next(source1) frame2 = next(source2) yield cv.addWeighted(frame1, 1.0 - phase, frame2, phase, 0.0)
#===============================================================================
[docs]def cyclefade(source1, source2, period): """Generator function to produce successive frames of a video sequence which fades back and forth from one source to another. Yields a video image frame. Does not exit, so infinite sources produce infinite sequences. Being a generator, it returns a function which follows the iterator protocol, e.g. which can be evaluated using next() to produce images. :param source1: iterable of images :param source2: iterable of images :param period: cross-fade cycle duration in frames """ if period > 1: count = 0 source1 = iter(source1) source2 = iter(source2) # sequence will only end if either input ends while True: frame1 = next(source1) frame2 = next(source2) phase = (count % period) / period if phase < 0.5: w1 = 2 * (0.5 - phase) # fading out w2 = 2 * phase # fading in else: w1 = 2 * (phase - 0.5) # fading in w2 = 2 * (1.0 - phase) # fading out yield cv.addWeighted(frame1, w1, frame2, w2, 0.0) count += 1
#================================================================================
[docs]class PianoRoll: """Collection of images which can be iterated to emit each row as a video frame. Implements the iterator protocol to return generator objects which produce a sequence of smoothly interpolated image frames at a constant tempo. The semantics of PianoRoll are that of a collection; each iteration of the collection is independent and starts at the beginning. """ def __init__(self, config): self.config = config self.frame_width = int(config['video']['frame_width']) self.frame_height = int(config['video']['frame_height']) self.frame_rate = int(config['video']['frame_rate']) self.tempo = int(config['pianoroll']['tempo']) image_path_list = self.config['pianoroll']['images'] log.debug("PianoRoll found image paths %s", image_path_list) try: image_paths = json.loads(image_path_list) except json.decoder.JSONDecodeError: log.warning("Unable to parse piano roll image file list from configuration file.") self.image_paths = [] self.images = [] self.image_names = [] for path in image_paths: img = cv.cvtColor(cv.imread(path), cv.COLOR_BGR2RGB) key = os.path.basename(path) self.images.append(img) self.image_names.append(key) log.info("Loaded image %s", path) if len(self.images) == 0: log.info("No piano roll images loaded, generating one random image.") img = np.random.randint(0, 256, size=(20, self.frame_width // 4, 3), dtype=np.uint8) self.images.append(img) self.image_names.append('random') return def __iter__(self): """Return a frame generator function which sequences the rows of the source images.""" # Create an interpolator to produce image rames smoothly transitioning # between the keyframe generated from each image row at the slower tempo rate. return keyframe_interpolator(self._keyframes(), tempo=self.tempo, frame_rate=self.frame_rate) def _keyframes(self): """Generator function to produce a sequence of keyframes by expanding successive rows of a set of source image into full frames. The keyframe images are floating point but preserve the value range of the source, e.g. 0-255 for 8-bit integer inputs. The function exits once all input rows are exhausted. """ while True: # if no source images were provided, just return RGB black frames if (len(self.images) == 0): yield np.zeros((self.frame_height, self.frame_width, 3), dtype=np.float32) else: # cycle through all the images, returning each row expanded to full frame for source in self.images: source_rows = source.shape[0] for r in range(source_rows): row = (source[r:r+1,:,:] * (1.0/255.0)).astype(np.float32) yield cv.resize(row, dsize=(self.frame_width, self.frame_height), interpolation=cv.INTER_NEAREST)
#================================================================================
[docs]class Waves: """Iterable object to simulate 1D waves. Implements the iterator protocol to generate image frames. Note that all iterators returned use the same object state, so evaluating multiple iterations simultaneously may produce unexpected results. """ def __init__(self, config): self.config = config self.frame_width = int(config['video']['frame_width']) self.frame_height = int(config['video']['frame_height']) self.frame_rate = int(config['video']['frame_rate']) # scalar constants self.W = 3 # Number of waves (one for each color channel). self.N = self.frame_width // 4 # Number of sample points for each wave. self.k = 100.0*np.ones((self.W)) # Spring rate for node coupling for each wave. self.b = 0.7*np.ones((self.W)) # Node damping factor for each wave. # Modify the spring rates to be different for each model for different propagation velocity. self.k[0] *= 0.90 # red travels a little slower self.k[0] *= 0.95 # green travels slightly slower self.period = 3.0*np.ones((self.W)) # Period of the default excitation for each wave. self.duty = 0.3*np.ones((self.W)) # Duty cycle of the default excitation for each wave. self.magnitude = 1.0*np.ones((self.W)) # Magnitude of the default excitation for each wave. self.qE = np.zeros((self.W)) # Position of the driven node for each wave. self.inputE = np.zeros((self.W), dtype=int) # Index of the driven node for each wave. self.inputE += self.N // 2 # Move the driven nodes to the center # q is a (W,N) matrix of generalized wave coordinates (positions) # qd is a (W,N) matrix of generalized wave derivatives (velocities) self.q = np.zeros((self.W, self.N)) self.qd = np.zeros((self.W, self.N)) # elapsed integrator time self.t = 0.0 return def __iter__(self): return self def __next__(self): # Run the wave simulator for a number of cycles. frame_interval = 1.0 / self.frame_rate self._update_for_interval(frame_interval) # Render the wave position matrix as an RGB image. rows = np.clip(np.abs(self.q), 0.0, 1.0) frame = np.ndarray((self.frame_height, self.N, 3), dtype=np.float32) frame[:,:,0] = rows[0,:] # red frame[:,:,1] = rows[1,:] # green frame[:,:,2] = rows[2,:] # blue # Return the RGB image at the output frame size. return cv.resize(frame, dsize=(self.frame_width, self.frame_height), interpolation=cv.INTER_NEAREST) def _update_for_interval(self, interval): """Run the simulator for the given interval, which may include one or more integration steps.""" while interval > 0.0: dt = min(interval, 0.004) interval -= dt self._step(dt) def _step(self, dt): # Calculate the excitation. Each wave model has a single node which is # driven with a rectangular signal of given period, duty cycle, and magnitude. if self.t < 0: self.qE = np.zeros((self.W)) else: # phase ramps from [0,1) in a sawtooth phase = np.mod(self.t, self.period) / self.period # create a pulse train with the specified duty cycle self.qE = (phase < self.duty) * self.magnitude # create an accumulator for forces on all nodes force = np.ndarray((self.W, self.N)) # calculate coupling spring forces on all interior nodes # k (W, 1) * dQ (W, N-2) force[:,1:-1] = self.k.reshape((self.W, 1)) * (0.5*(self.q[:, 2:] + self.q[:, 0:-2]) - self.q[:,1:-1]) # Calculate coupling spring forces on boundary nodes. This assumes an open boundary condition at each end. force[:, 0] = self.k * (self.q[:, 1] - self.q[:, 0]) force[:,-1] = self.k * (self.q[:, -2] - self.q[:,-1]) # calculate damping on all nodes force -= self.b.reshape((self.W, 1)) * self.qd # apply the drive condition as an extra node coupling spring and damper force[:, self.inputE] += 2 * self.k * (self.qE - self.q[:, self.inputE]) - 10 * self.b * (self.qd[:,self.inputE]) # calculate the node dynamics with forward Euler integration self.q += self.qd * dt self.qd += force * dt self.t += dt # for testing, just force the driven nodes to the target position # self.q[:, self.inputE] = self.qE return
#================================================================================
[docs]class ConwayLife: """Iterable object to simulate Conway's Life. Implements the iterator protocol to return generator objects which produce interpolated image frames. Typically simulates a binary world somewhat larger than the frame size. Produces a smoothly interpolated image at the frame rate while the simulation runs at a slower tempo. Note that all iterators returned use the same object state, so evaluating multiple iterations simultaneously may produce unexpected results. """ def __init__(self, config): self.config = config self.frame_width = int(config['video']['frame_width']) self.frame_height = int(config['video']['frame_height']) self.frame_rate = int(config['video']['frame_rate']) self.world_rows = int(self.config['life']['world_rows']) self.world_cols = int(self.config['life']['world_cols']) self.min_alive = int(self.config['life']['min_alive']) self.tempo = float(self.config['life']['tempo']) random_percentage = float(self.config['life']['random']) self.random_rate = 1 + int(100.0 / random_percentage) # Conway life state if self.random_rate > 1: self.random_reset() # initialize self.world else: self.world = np.zeros((self.world_rows, self.world_cols), dtype=np.bool) #--------------------------------------------------------------------------- # Control methods to affect the animation.
[docs] def random_reset(self): """Flood the world with random cells.""" self.world = np.random.randint(0, self.random_rate, (self.world_rows, self.world_cols)) == 0
#--------------------------------------------------------------------------- # Iterator protocol implementation. def __iter__(self): """Return a generator function which wraps the Conway Life simulation in a keyframe interpolator. The world updates once per frame which is quite fast, so the interpolator requests frames at the slower tempo rate. """ return keyframe_interpolator(self._keyframes(), self.tempo, self.frame_rate) # Internal method to return an unwrapped generator function. def _keyframes(self): while True: yield self.__next__() # The __next__ method updates the world model and returns the next image. This is # generally called from the interpolator at the tempo rate slower than the frame rate. def __next__(self): self._update() self._spontaneous_generation() # Crop out a frame-sized result of the world model. mono = self.world[0:self.frame_height, 0:self.frame_width] # For now, just return as white or black in floating point RGB. color = np.ones((3), dtype=np.float32) return mono.reshape(self.frame_height, self.frame_width, 1) * color #--------------------------------------------------------------------------- # Simulation model. def _update(self): """Compute one new generation for a game of Conway's Life.""" rows, cols = self.world_rows, self.world_cols board = self.world # Create a padded board to simplify computing the wraparound of a toroidal # world. This also converts to a numeric array so the neighbor calculation # will produce a count. padded = np.zeros( (rows+2, cols+2), dtype=np.uint8 ) # Set the interior cells to the source board. padded[1:rows+1, 1:cols+1] = board # Copy the edge rows across to the opposite side to make a toroidal world. padded[ 0, :] = padded[-2,:] # set top row from last board row padded[-1, :] = padded[ 1,:] # set bottom row from first board row padded[:, 0] = padded[:, -2] # set first column from last board column padded[:, -1] = padded[:, 1] # set last column from first board column # Copy corners diagonally. padded[ 0, 0] = padded[-2, -2] padded[ 0, -1] = padded[-2, 1] padded[ -1,-1] = padded[ 1, 1] padded[ -1, 0] = padded[ 1, -2] # Compute a count of the neighbors for each interior cell. The result is # the same size as the original board. Each source sum element is a # displaced submatrix. neighbors = ( padded[ 0:-2, 0:-2 ] + padded[ 0:-2, 1:-1 ] + padded[ 0:-2, 2: ] + padded[ 1:-1, 0:-2 ] + padded[ 1:-1, 2: ] + padded[ 2: , 0:-2 ] + padded[ 2: , 1:-1 ] + padded[ 2: , 2: ] ) # Apply the Life rules: # 1. If the cell is alive, keep it alive if it has two or three neighbors. # 2. If the cell is not alive, make it alive if it has three neighbors. rule1 = np.logical_or( neighbors==2, neighbors==3) rule2 = neighbors == 3 # Combine the two rules, applying rule1 if a cell was originally alive and # rule2 if it was originally not alive. Replace the previous world. self.world = np.where( board, rule1, rule2) #--------------------------------------------------------------------------- def _spontaneous_generation(self): """Create new spontaneously generated cells in a Conway Life simulation. This helps a simulation to run entirely autonomously by making sure it never completely dies off. min_alive specifies the lower threshold of living cells to try to guarantee, a good value is about 3% of the array size. """ num_alive = np.sum(self.world) # Choose the number of random cells to write. rcells = np.random.randint(0, 4) + max(0, self.min_alive - num_alive) rows, cols = self.world.shape # Set the specified number of randomly chosen cells to alive. for i in range(rcells): self.world[np.random.randint(0, rows), np.random.randint(0, cols)] = True
#--------------------------------------------------------------------------- # Compute the interpolated Conway Life result, crop out the frame-size # result, and blend it in on all channels. The interpolated result is a # grayscale image scaled between 0 and 255. # def alpha_blend(rgb, alpha): # """Image operator to blend a grayscale alpha image into an RGB image""" # if False: # life_world = next(self.lifegen) # composite = (1.0 - life_alpha[..., np.newaxis]/255.0) * self.frame + life_alpha[..., np.newaxis] * np.ones((1, 3)) # self.frame[:] = np.clip(composite, 0, 255).astype(np.uint8) #--------------------------------------------------------------------------------
[docs]class Diffusion: """Iterable object which produces an animation of blurring impulses. Note that all iterators returned use the same object state, so evaluating multiple iterations simultaneously may produce unexpected results. """ def __init__(self, config, color_table): self.config = config self.impulse_interval = int(config['diffusion']['impulse_interval']) self.decay = float(config['diffusion']['decay']) self.frame_rate = int(config['video']['frame_rate']) self.color_cycle = [color for color in color_table.values()] # Create working matrices. These will be stored in RGB order. The output # frame buffer uses an 8-bit unsigned integer per channel. (Note that OpenCV # conventionally uses BGR order.) frame_width = int(config['video']['frame_width']) frame_height = int(config['video']['frame_height']) self.frame_shape = (frame_height, frame_width, 3) self.buf1 = np.zeros(self.frame_shape, dtype=np.float32) self.buf2 = np.zeros(self.frame_shape, dtype=np.float32) # frame counter for driving animation self.count = 0 def __iter__(self): return self def __next__(self): """Iterator function to return the next image in the sequence.""" frame_height, frame_width, channel_count = self.frame_shape # deliver impulses at a regular tempo if ((self.count % self.impulse_interval) == 0): cycle = self.count // self.impulse_interval separation = (cycle * 10) % (frame_width//2) center1 = frame_width//2 - separation center2 = frame_width//2 + separation color1 = self.color_cycle[cycle % (len(self.color_cycle))] color2 = self.color_cycle[-cycle % (len(self.color_cycle))] self.buf1[:, center1-8:center1+8] = color1 self.buf1[:, center2-8:center2+8] = color2 # apply diffusion and decay on every frame cv.blur(self.buf1, (3,3), self.buf2) self.buf1[:] = self.decay * self.buf2[:] # return the buffer as a frame self.count += 1 return self.buf1
#--------------------------------------------------------------------------------
[docs]class Colors: """Color and palette tables. Can produce several different image generators with static color fields or simple generated patterns.""" def __init__(self, config): self.config = config self.frame_rate = int(config['video']['frame_rate']) frame_width = int(config['video']['frame_width']) frame_height = int(config['video']['frame_height']) self.frame_shape = (frame_height, frame_width, 3) # Initialize color tables from the configuration. Colors are internally # stored as floating point RGB numpy arrays with a unit range (0.0, 1.0). # Parse the color dictionary as a JSON string try: self.color_table = json.loads(self.config['colors']['colors']) log.debug("Loaded color table: %s", self.color_table) except json.decoder.JSONDecodeError: log.warning("Unable to parse color table from configuration file.") self.color_table = {"white" : [255,255,255], "black" : [0,0,0]} # Parse the palette dictionary as a JSON string try: self.palette_table = json.loads(self.config['colors']['palettes']) log.debug("Loaded palette table: %s", self.palette_table) except json.decoder.JSONDecodeError: log.warning("Unable to parse palette table from configuration file.") self.palette_table = { "mono" : [[255,255,255],[0,0,0]] } # Replace all 8-bit integer color table entries with unit-scale floating point. for name, color in self.color_table.items(): self.color_table[name] = np.array(color, dtype=np.float32)/255.0 for name, array in self.palette_table.items(): self.palette_table[name] = [np.array(color, dtype=np.float32)/255.0 for color in array] #----------------------------------------------------------------------------
[docs] def color_by_name(self, name, duration=None): """Create a color field generator which produces a floating point frame of the named color. The default is an infinite sequence, or a duration in frames can be specified.""" rgb = self.color_table[name] frame = np.full(self.frame_shape, rgb, dtype=np.float32) if duration is None: while True: yield frame else: for count in range(duration): yield frame
#----------------------------------------------------------------------------
[docs] def palette_by_name(self, name, duration=None): """Create a palette color field generator. The default is an infinite sequence, or a duration in frames can be specified. """ palette = self.palette_table[name] num_colors = len(palette) # Create color table as RGB image with a single row. table = np.ndarray((1, num_colors, 3), dtype=np.float32) table[:] = palette # Generate a frame by resizing the color table to fill the width and height width = self.frame_shape[1] height = self.frame_shape[0] frame = cv.resize(table, dsize=(width, height), interpolation=cv.INTER_NEAREST) if duration is None: while True: yield frame else: for count in range(duration): yield frame
#----------------------------------------------------------------------------
[docs] def rainbow(self, duration=None): """Create a rainbow field generator. The default is an infinite sequence, or a duration in frames can be specified. """ # Generate a rainbow row of evenly distributed hue values between 0.0 and # 180.0. This is the OpenCV convention for hue values. width = self.frame_shape[1] hues = np.arange(width) * (180.0 / width) hsv = np.ones(self.frame_shape, dtype=np.float32) hsv[:,:,0] = hues frame = cv.cvtColor(hsv, cv.COLOR_HSV2RGB) return self.iterate_image(frame, duration)
#----------------------------------------------------------------------------
[docs] def noise(self, duration=None, tempo=300): """Create a noise field generator. The default is an infinite sequence, or a duration in frames can be specified. Image are generated at a tempo slower than frame rate and interpolated. """ def noisegen(): while True: yield np.random.rand(*self.frame_shape).astype(np.float32) return keyframe_interpolator(noisegen(), tempo=tempo, frame_rate=self.frame_rate)
#----------------------------------------------------------------------------
[docs] def iterate_image(self, frame, duration=None): """Image sequence generator which produces a stream of the same image. The default is an infinite sequence, or a duration in frames can be specified. """ if duration is None: while True: yield frame else: for count in range(duration): yield frame
#--------------------------------------------------------------------------------
[docs]class VideoWriter: """Video file writer to transcode image frames into a video file with the given path. The default codec is lossless PNG image frames in an AVI container. """ def __init__(self, config, path=None): self.config = config if path is None: path = scriptname + datetime.datetime.now().strftime("-%Y-%m-%d-%H-%M-%S.avi") self.path = path self.frame_width = int(config['video']['frame_width']) self.frame_height = int(config['video']['frame_height']) self.frame_rate = int(config['video']['frame_rate']) # default codec: PNG images, lossless, clean block edges self.codec = config['video']['codec'] if self.codec == 'png': self.codec = 'png ' # Open the actual writer. N.B. It appears that cv.VideoWriter will # silently fail if the path is invalid, e.g. the folder doesn't exist. codec_code = cv.VideoWriter.fourcc(*self.codec) log.info("Opening video writer for %s with fourcc 0x%x", path, codec_code) self._out = cv.VideoWriter(path, codec_code, self.frame_rate, (self.frame_width, self.frame_height)) if self._out is None: log.warning("Failed to open video writer for %s.", self.path)
[docs] def write(self, frame): """Write an RGB image to the video file.""" if self._out is not None: # The OpenCV VideoWriter expects BGR format. bgr = cv.cvtColor(frame, cv.COLOR_RGB2BGR) self._out.write(bgr)
[docs] def close(self): """Finalize the stream and close the output file.""" if self._out is not None: self._out.release() self._out = None log.info("Closed video writer for %s", self.path)
#--------------------------------------------------------------------------------
[docs]class BridgeInstrument: """Real-time multi-function image generator for the Pausch Bridge. Implemented as an infinite iterator, i.e. responds to __next__() to produce image frames, responds to __iter__() to return self. """ def __init__(self, config): # Cache configurable properties. self.config = config self.frame_rate = int(config['video']['frame_rate']) frame_width = int(config['video']['frame_width']) frame_height = int(config['video']['frame_height']) self.frame_shape = (frame_height, frame_width, 3) # Frame counter and other state variables. self.count = 0 self.quantize = self.config['instrument']['spatial_quantize'] != 'False' # The iterator for the current image stream. Sometimes this holds a crossfade # generator transitioning to the next stream. self.image_iterator = None # The iterator for the following image stream, usually held temporarily # until the crossfade is complete. self.next_iterator = None # The most recently generated frame. self.frame = None # Generator objects which can produce component image streams. self.colors = Colors(config) self.diffusion = Diffusion(config, self.colors.color_table) self.life = ConwayLife(config) self.pianoroll = PianoRoll(config) self.waves = Waves(config) self.solid_white = self.colors.color_by_name('white') self.solid_black = self.colors.color_by_name('black') # Set initial animation. self.image_iterator = itertools.chain(crossfade(self.solid_black, self.solid_white, 4*self.frame_rate), self.solid_white) #---------------------------------------------------------------------------- # Manage the image generator graph.
[docs] def reset_graph(self): """Safety function to reset the operator graph if the sequence ends.""" log.debug("BridgeInstrument invoked reset_graph() safety event.") self.image_iterator = self.solid_black self.next_iterator = None
[docs] def transition(self, next_iterable): """Start a transition to the next effect.""" if self.next_iterator is not None: # a transition is being interrupted, so replace it starting from the current output log.debug("Transition interrupted.") self.next_iterator = iter(next_iterable) self.image_iterator = crossfade(self.colors.iterate_image(self.frame), self.next_iterator, self.frame_rate // 2) else: self.next_iterator = iter(next_iterable) self.image_iterator = crossfade(self.image_iterator, self.next_iterator, self.frame_rate // 2)
#---------------------------------------------------------------------------- # Iterator protocol. def __iter_(self): return self # Compute the next frame of the animation. def __next__(self): # evaluate the next frame using the current operator graph while True: try: frame = next(self.image_iterator) break except StopIteration: if self.next_iterator is None: self.reset_graph() else: self.image_iterator = self.next_iterator self.next_iterator = None # optionally quantize image to the 57x2 fixture resolution if self.quantize: frame_height, frame_width, channel_count = self.frame_shape blocky = cv.resize(frame, dsize=(57, 2), interpolation=cv.INTER_NEAREST) frame = cv.resize(blocky, dsize=(frame_width, frame_height), interpolation=cv.INTER_NEAREST) # update frame count self.count += 1 self.frame = frame return frame
#------------------------------------------------------------------------------- #-------------------------------------------------------------------------------
[docs]class MainApp: """Main application object implementing the user input processing, frame timing, and networking I/O.""" def __init__(self, config, instrument): self.config = config self.instrument = instrument # Output stream. self.video_writer = None # Open network sockets. self._initialize_networking() # Prepare to regulate frame timing. self.frame_rate = int(config['video']['frame_rate']) self.frame_interval = int(1000000000 / self.frame_rate) self.frame_timer = time.monotonic_ns() # Attach a handler to the keyboard interrupt (control-C). self.initial_terminal_attributes = None signal.signal(signal.SIGINT, self.sigint_handler) # Prepare to receive keyboard characters as they are types. self._enter_raw_input() #------------------------------------------------------------------------------- def _initialize_networking(self): # Open a OSC UDP client to transmit image frames for preview. preview_host = self.config['network']['osc_preview_host'] preview_port = int(self.config['network']['osc_preview_port']) self.preview_xmitter = udp_client.UDPClient(preview_host, preview_port) log.info("Opened UDP client for %s:%d", preview_host, preview_port) # Optionally open a second OSC UDP client to transmit image frames for performance. bridge_host = self.config['network']['osc_bridge_host'] if bridge_host != '': bridge_port = int(self.config['network']['osc_bridge_port']) self.bridge_xmitter = udp_client.UDPClient(bridge_host, bridge_port) log.info("Opened UDP client for %s:%d", bridge_host, bridge_port) else: self.bridge_xmitter = None #------------------------------------------------------------------------------- def _frame_sleep(self): # Wait for next frame time to transmit on a precise schedule. This # will attempt to keep a precise average time. self.frame_timer += self.frame_interval now = time.monotonic_ns() delay_ns = self.frame_timer - now if delay_ns > 0.0: time.sleep(1e-9 * delay_ns) # print(f"Slept {delay_ns} ns.") #------------------------------------------------------------------------------- def _send_frame(self, frame): frame_height, frame_width, channel_count = frame.shape # Send several messages together as a bundle. bundle = osc_bundle_builder.OscBundleBuilder(osc_bundle_builder.IMMEDIATELY) metadata = osc_message_builder.OscMessageBuilder(address='/pbridge/frame/size') metadata.add_arg(frame_height) metadata.add_arg(frame_width) metadata.add_arg(channel_count) pixels = osc_message_builder.OscMessageBuilder(address='/pbridge/frame/data') pixels.add_arg(frame.tobytes(), arg_type=pixels.ARG_TYPE_BLOB) bundle.add_content(metadata.build()) bundle.add_content(pixels.build()) packet = bundle.build() self.preview_xmitter.send(packet) if self.bridge_xmitter is not None: self.bridge_xmitter.send(packet) #------------------------------------------------------------------------------- def _enter_raw_input(self): self.initial_terminal_attributes = termios.tcgetattr(sys.stdin.fileno()) log.debug("Saving terminal attributes and entering raw terminal mode.") tty.setcbreak(sys.stdin.fileno()) print("Entered immediate terminal mode. Press control-C to exit.") print("Active control keys include qwlbpdrun1.") def _exit_raw_input(self): if self.initial_terminal_attributes is not None: log.debug("Restoring terminal attributes.") termios.tcsetattr(sys.stdin.fileno(), termios.TCSADRAIN, self.initial_terminal_attributes) def _is_key_ready(self): r, w, x = select.select([sys.stdin], [], [], 0) return r == [sys.stdin]
[docs] def process_user_input(self): """Logic for interpreting keystrokes and triggering animation events, intended to be customized for a particular show.""" while self._is_key_ready(): key = sys.stdin.read(1) print("User typed:", key) if key == 'q': self.instrument.quantize = not self.instrument.quantize elif key == 'v': self._toggle_recording() elif key == 'w': self.instrument.transition(self.instrument.waves) elif key == 'l': self.instrument.transition(self.instrument.life) elif key == 'b': self.instrument.transition(self.instrument.solid_black) elif key == 'p': self.instrument.transition(self.instrument.pianoroll) elif key == 'd': self.instrument.transition(self.instrument.diffusion) elif key == 'r': self.instrument.transition(self.instrument.colors.rainbow()) elif key == 'u': self.instrument.transition(self.instrument.colors.palette_by_name('Ukraine')) elif key == 'n': self.instrument.transition(self.instrument.colors.noise()) elif key == '1': # define a composite sequence using itertools noise = self.instrument.colors.noise(tempo=120) black = self.instrument.colors.color_by_name('black') loop = itertools.cycle(itertools.chain(crossfade(black, noise, 4*self.frame_rate), crossfade(noise, black, 1*self.frame_rate))) self.instrument.transition(loop)
#-------------------------------------------------------------------------------
[docs] def sigint_handler(self, signal_number, stack_frame): # pylint: disable=unused-argument """Handler for user keyboard interrupts (e.g. control-C), to be customized with close or shutdown handlers.""" print("Keyboard interrupt caught, running close handlers...") # restore state as needed self._exit_raw_input() sys.exit(0)
#------------------------------------------------------------------------------- def _toggle_recording(self): if self.video_writer is None: self.video_writer = VideoWriter(self.config) else: self.video_writer.close() self.video_writer = None #------------------------------------------------------------------------------- # Generate images in real time representing the bridge illumination.
[docs] def run(self): """Infinite event loop to interpret user input and stream out image frames.""" while True: # Check for user input. self.process_user_input() # Compute the next output frame. float_frame = next(self.instrument) # convert the frame to 8-bit RGB for output int_frame = np.clip(255 * float_frame, 0, 255).astype(np.uint8) # Wait for the next transmit time. self._frame_sleep() # Sent out new frame. self._send_frame(int_frame) # Optionally save to disk. if self.video_writer is not None: self.video_writer.write(int_frame)
#------------------------------------------------------------------------------- #-------------------------------------------------------------------------------
[docs]def main(args): """Main script entry point. Reads configuration files, sets up logging, creates the main application objecs, then enters the run loop.""" # Create a configuration object with default settings. config = create_defaults() # Load configuration files. configuration_file_path = scriptname + '.config' files_read = config.read([configuration_file_path] + args.configs) # Set up logging to both console and a file. _configure_logging(args, config) # Report on the configuration file results after logging is established. if len(files_read) > 0: log.info("Read configuration from %s", files_read) else: log.info("Unable to read configuration from %s", configuration_file_path) if args.path is not None: log.info("Writing full configuration to %s", args.path) with open(args.path, "w") as output: config.write(output) # Disable bridge network output unless specifically enabled at the command line. if not args.bridge: config['network']['osc_bridge_host'] = "" else: log.info("Live bridge output requested.") # Initialize the image generator object. instrument = BridgeInstrument(config) # Initialize the timing and networking object. main = MainApp(config, instrument) # Enter the run loop to generate images in real-time. main.run()
#------------------------------------------------------------------------------- # Main script follows. This sequence is executed when the script is initiated from the command line. if __name__ == "__main__": parser = argparse.ArgumentParser( description = """Real-time interactive bridge animation generator.""") parser.add_argument( '-v', '--verbose', action='store_true', help="Enable more detailed output.") parser.add_argument( '-s', dest='path', type=str, help="Save full configuration to given path.") parser.add_argument( '--bridge', action='store_true', help="Enable live output to bridge controller.") parser.add_argument( 'configs', nargs="*", help = "Paths of additional configuration files.") args = parser.parse_args() main(args)