#!/usr/bin/env python3
"""pb_instrument.py
Pausch Bridge visual light instrument for real-time performance and improvisation.
Sends image output in real time as OSC messages to a previewer and/or the bridge.
"""
#-------------------------------------------------------------------------------
# standard Python libraries
import os, sys, logging, datetime, time
import configparser, argparse, json
import math, itertools
import signal, tty, termios, select
# This uses the OpenCV computer vision library for image processing.
# installation: pip3 install opencv-python
# pypi description: https://pypi.org/project/opencv-python/
# API documentation: https://docs.opencv.org/master/
import cv2 as cv
# This uses the NumPy numerical library for image matrix manipulation.
# installation: pip3 install numpy
# pypi description: https://pypi.org/project/numpy
# API documentation: https://numpy.org/doc/stable/
import numpy as np
# This uses python-osc to send UDP packets with image data encoded using OSC.
# installation: pip3 install python-osc
# source code: https://github.com/attwad/python-osc
# pypi description: https://pypi.org/project/python-osc/
from pythonosc import udp_client
from pythonosc import osc_bundle_builder
from pythonosc import osc_message_builder
#--------------------------------------------------------------------------------
# set up logger for module
scriptname = os.path.splitext(os.path.basename(__file__))[0]
log = logging.getLogger(scriptname)
#-------------------------------------------------------------------------------
[docs]def create_defaults():
"""Create a configparser object with default values for all configurable
settings. Note that this includes values for several different objects.
"""
config = configparser.ConfigParser()
config['log'] = { 'file_log_level' : '20', # log INFO to file
'console_log_level' : '30', } # log WARNING to console
config['video'] = { 'frame_width' : '228', 'frame_height' : '8', 'frame_rate' : '30', 'codec' : 'png' }
config['network'] = { 'osc_preview_host' : '127.0.0.1',
'osc_preview_port' : '23432',
'osc_bridge_host' : '',
'osc_bridge_port' : '23432'}
# colors are stored as JSON in the configuration file
config['colors'] = { 'colors' : """{"white":[255,255,255],"black":[0,0,0]}""",
'palettes' : """{"mono": [[255,255,255],[0,0,0]], "Ukraine":[[0,91,187],[255,213,0]]}""" }
# PianoRoll parameters
config['pianoroll'] = { 'images' : '[]', 'tempo' : '30' }
# ConwayLife parameters
config['life'] = { 'world_rows' : '32', 'world_cols' : '228', 'min_alive' : '200',
'tempo' : '240', 'random' : '10' }
# Diffusion parameters
config['diffusion'] = {'impulse_interval': '60', 'decay' : '1.0' }
# Instrument parameters
config['instrument'] = {'spatial_quantize' : 'False'}
return config
#-------------------------------------------------------------------------------
# Enable logging at different levels to both a log file and the console.
def _configure_logging(args, config):
# Add root log handler to stream messages to the terminal console.
console_handler = logging.StreamHandler()
console_level = int(config['log']['console_log_level'])
if args.verbose: console_level = min(logging.DEBUG, console_level)
console_handler.setLevel(console_level)
console_handler.setFormatter(logging.Formatter('%(levelname)s:%(name)s: %(message)s'))
logging.getLogger().addHandler(console_handler)
# Add a file handler to the root logger.
log_path = scriptname + '.log'
file_handler = logging.FileHandler(log_path)
file_handler.setLevel(int(config['log']['file_log_level']))
file_handler.setFormatter(logging.Formatter('%(asctime)s:%(levelname)s:%(name)s: %(message)s', datefmt="%Y-%m-%d %H:%M:%S"))
logging.getLogger().addHandler(file_handler)
# Set the root logger to the lowest requested log level.
logging.getLogger().setLevel(min(console_handler.level, file_handler.level))
#================================================================
[docs]def keyframe_interpolator(keyframes, tempo=60, frame_rate=30.0):
"""Generator function to produce successive frames of a video sequence by
linear interpolation between keyframes at a constant tempo. Yields a video
image frame. Exits once the sequence is complete. Being a generator, it
returns a function which follows the iterator protocol, e.g. which can be
evaluated using next() to produce images.
:param keyframes: iterable of images, typically a generator function
:param tempo: keyframe rate in beats per minute
"""
keyframe_phase = 0.0 # unit phase for the cross-fade, cycles over 0 to 1
frame_interval = 1.0 / frame_rate # seconds between video frames
keyframe_interval = 60.0 / tempo # seconds between key frames
keyframe_rate = 1.0 / (frame_rate * keyframe_interval) # phase / frame
keyframe_generator = iter(keyframes) # iterator for the keyframes
# Generate the first keyframe or end sequence.
try:
frame0 = next(keyframe_generator)
except StopIteration:
return
# Generate the second keyframe. If null, repeat the first keyframe for a single beat.
try:
frame1 = next(keyframe_generator)
except StopIteration:
frame1 = frame0
while True:
# Cross-fade between successive key frames at the given tempo. This will
# return a new output frame with a blend of the source images.
frame = cv.addWeighted(frame0, (1.0 - keyframe_phase), frame1, keyframe_phase, 0.0)
# Return the frame and advance the generator state.
yield frame
# Advance the cross-fade phase.
keyframe_phase += keyframe_rate
# Once the second keyframe is reached, reset the fade and generate the successor.
if keyframe_phase > 1.0:
keyframe_phase -= 1.0
frame0 = frame1
# generate the next keyframe or end sequence if done
try:
frame1 = next(keyframe_generator)
except StopIteration:
return
#===============================================================================
[docs]def crossfade(source1, source2, duration):
"""Generator function to produce successive frames of a finite video
sequence which fades from one source to another. Yields a video image
frame. Exits once the fade sequence is complete. Being a generator, it
returns a function which follows the iterator protocol, e.g. which can be
evaluated using next() to produce images.
:param source1: iterable of images fading out
:param source2: iterable of images fading in
:param duration: cross-fade duration in frames
"""
# one-shot fade-in, after which the sequence ends
if duration > 0:
source1 = iter(source1)
source2 = iter(source2)
for count in range(duration):
phase = count / (duration-1)
frame1 = next(source1)
frame2 = next(source2)
yield cv.addWeighted(frame1, 1.0 - phase, frame2, phase, 0.0)
#===============================================================================
[docs]def cyclefade(source1, source2, period):
"""Generator function to produce successive frames of a video sequence which
fades back and forth from one source to another. Yields a video image
frame. Does not exit, so infinite sources produce infinite sequences.
Being a generator, it returns a function which follows the iterator
protocol, e.g. which can be evaluated using next() to produce images.
:param source1: iterable of images
:param source2: iterable of images
:param period: cross-fade cycle duration in frames
"""
if period > 1:
count = 0
source1 = iter(source1)
source2 = iter(source2)
# sequence will only end if either input ends
while True:
frame1 = next(source1)
frame2 = next(source2)
phase = (count % period) / period
if phase < 0.5:
w1 = 2 * (0.5 - phase) # fading out
w2 = 2 * phase # fading in
else:
w1 = 2 * (phase - 0.5) # fading in
w2 = 2 * (1.0 - phase) # fading out
yield cv.addWeighted(frame1, w1, frame2, w2, 0.0)
count += 1
#================================================================================
[docs]class PianoRoll:
"""Collection of images which can be iterated to emit each row as a video
frame. Implements the iterator protocol to return generator objects
which produce a sequence of smoothly interpolated image frames at a
constant tempo. The semantics of PianoRoll are that of a collection;
each iteration of the collection is independent and starts at the
beginning.
"""
def __init__(self, config):
self.config = config
self.frame_width = int(config['video']['frame_width'])
self.frame_height = int(config['video']['frame_height'])
self.frame_rate = int(config['video']['frame_rate'])
self.tempo = int(config['pianoroll']['tempo'])
image_path_list = self.config['pianoroll']['images']
log.debug("PianoRoll found image paths %s", image_path_list)
try:
image_paths = json.loads(image_path_list)
except json.decoder.JSONDecodeError:
log.warning("Unable to parse piano roll image file list from configuration file.")
self.image_paths = []
self.images = []
self.image_names = []
for path in image_paths:
img = cv.cvtColor(cv.imread(path), cv.COLOR_BGR2RGB)
key = os.path.basename(path)
self.images.append(img)
self.image_names.append(key)
log.info("Loaded image %s", path)
if len(self.images) == 0:
log.info("No piano roll images loaded, generating one random image.")
img = np.random.randint(0, 256, size=(20, self.frame_width // 4, 3), dtype=np.uint8)
self.images.append(img)
self.image_names.append('random')
return
def __iter__(self):
"""Return a frame generator function which sequences the rows of the source images."""
# Create an interpolator to produce image rames smoothly transitioning
# between the keyframe generated from each image row at the slower tempo rate.
return keyframe_interpolator(self._keyframes(), tempo=self.tempo, frame_rate=self.frame_rate)
def _keyframes(self):
"""Generator function to produce a sequence of keyframes by expanding successive
rows of a set of source image into full frames. The keyframe images are floating
point but preserve the value range of the source, e.g. 0-255 for 8-bit
integer inputs. The function exits once all input rows are exhausted.
"""
while True:
# if no source images were provided, just return RGB black frames
if (len(self.images) == 0):
yield np.zeros((self.frame_height, self.frame_width, 3), dtype=np.float32)
else:
# cycle through all the images, returning each row expanded to full frame
for source in self.images:
source_rows = source.shape[0]
for r in range(source_rows):
row = (source[r:r+1,:,:] * (1.0/255.0)).astype(np.float32)
yield cv.resize(row, dsize=(self.frame_width, self.frame_height), interpolation=cv.INTER_NEAREST)
#================================================================================
[docs]class Waves:
"""Iterable object to simulate 1D waves. Implements the iterator
protocol to generate image frames. Note that all iterators returned use
the same object state, so evaluating multiple iterations simultaneously
may produce unexpected results.
"""
def __init__(self, config):
self.config = config
self.frame_width = int(config['video']['frame_width'])
self.frame_height = int(config['video']['frame_height'])
self.frame_rate = int(config['video']['frame_rate'])
# scalar constants
self.W = 3 # Number of waves (one for each color channel).
self.N = self.frame_width // 4 # Number of sample points for each wave.
self.k = 100.0*np.ones((self.W)) # Spring rate for node coupling for each wave.
self.b = 0.7*np.ones((self.W)) # Node damping factor for each wave.
# Modify the spring rates to be different for each model for different propagation velocity.
self.k[0] *= 0.90 # red travels a little slower
self.k[0] *= 0.95 # green travels slightly slower
self.period = 3.0*np.ones((self.W)) # Period of the default excitation for each wave.
self.duty = 0.3*np.ones((self.W)) # Duty cycle of the default excitation for each wave.
self.magnitude = 1.0*np.ones((self.W)) # Magnitude of the default excitation for each wave.
self.qE = np.zeros((self.W)) # Position of the driven node for each wave.
self.inputE = np.zeros((self.W), dtype=int) # Index of the driven node for each wave.
self.inputE += self.N // 2 # Move the driven nodes to the center
# q is a (W,N) matrix of generalized wave coordinates (positions)
# qd is a (W,N) matrix of generalized wave derivatives (velocities)
self.q = np.zeros((self.W, self.N))
self.qd = np.zeros((self.W, self.N))
# elapsed integrator time
self.t = 0.0
return
def __iter__(self):
return self
def __next__(self):
# Run the wave simulator for a number of cycles.
frame_interval = 1.0 / self.frame_rate
self._update_for_interval(frame_interval)
# Render the wave position matrix as an RGB image.
rows = np.clip(np.abs(self.q), 0.0, 1.0)
frame = np.ndarray((self.frame_height, self.N, 3), dtype=np.float32)
frame[:,:,0] = rows[0,:] # red
frame[:,:,1] = rows[1,:] # green
frame[:,:,2] = rows[2,:] # blue
# Return the RGB image at the output frame size.
return cv.resize(frame, dsize=(self.frame_width, self.frame_height), interpolation=cv.INTER_NEAREST)
def _update_for_interval(self, interval):
"""Run the simulator for the given interval, which may include one or more integration steps."""
while interval > 0.0:
dt = min(interval, 0.004)
interval -= dt
self._step(dt)
def _step(self, dt):
# Calculate the excitation. Each wave model has a single node which is
# driven with a rectangular signal of given period, duty cycle, and magnitude.
if self.t < 0:
self.qE = np.zeros((self.W))
else:
# phase ramps from [0,1) in a sawtooth
phase = np.mod(self.t, self.period) / self.period
# create a pulse train with the specified duty cycle
self.qE = (phase < self.duty) * self.magnitude
# create an accumulator for forces on all nodes
force = np.ndarray((self.W, self.N))
# calculate coupling spring forces on all interior nodes
# k (W, 1) * dQ (W, N-2)
force[:,1:-1] = self.k.reshape((self.W, 1)) * (0.5*(self.q[:, 2:] + self.q[:, 0:-2]) - self.q[:,1:-1])
# Calculate coupling spring forces on boundary nodes. This assumes an open boundary condition at each end.
force[:, 0] = self.k * (self.q[:, 1] - self.q[:, 0])
force[:,-1] = self.k * (self.q[:, -2] - self.q[:,-1])
# calculate damping on all nodes
force -= self.b.reshape((self.W, 1)) * self.qd
# apply the drive condition as an extra node coupling spring and damper
force[:, self.inputE] += 2 * self.k * (self.qE - self.q[:, self.inputE]) - 10 * self.b * (self.qd[:,self.inputE])
# calculate the node dynamics with forward Euler integration
self.q += self.qd * dt
self.qd += force * dt
self.t += dt
# for testing, just force the driven nodes to the target position
# self.q[:, self.inputE] = self.qE
return
#================================================================================
[docs]class ConwayLife:
"""Iterable object to simulate Conway's Life. Implements the iterator
protocol to return generator objects which produce interpolated image
frames. Typically simulates a binary world somewhat larger than the
frame size. Produces a smoothly interpolated image at the frame rate
while the simulation runs at a slower tempo. Note that all iterators
returned use the same object state, so evaluating multiple iterations
simultaneously may produce unexpected results.
"""
def __init__(self, config):
self.config = config
self.frame_width = int(config['video']['frame_width'])
self.frame_height = int(config['video']['frame_height'])
self.frame_rate = int(config['video']['frame_rate'])
self.world_rows = int(self.config['life']['world_rows'])
self.world_cols = int(self.config['life']['world_cols'])
self.min_alive = int(self.config['life']['min_alive'])
self.tempo = float(self.config['life']['tempo'])
random_percentage = float(self.config['life']['random'])
self.random_rate = 1 + int(100.0 / random_percentage)
# Conway life state
if self.random_rate > 1:
self.random_reset() # initialize self.world
else:
self.world = np.zeros((self.world_rows, self.world_cols), dtype=np.bool)
#---------------------------------------------------------------------------
# Control methods to affect the animation.
[docs] def random_reset(self):
"""Flood the world with random cells."""
self.world = np.random.randint(0, self.random_rate, (self.world_rows, self.world_cols)) == 0
#---------------------------------------------------------------------------
# Iterator protocol implementation.
def __iter__(self):
"""Return a generator function which wraps the Conway Life simulation in a
keyframe interpolator. The world updates once per frame which is quite
fast, so the interpolator requests frames at the slower tempo rate.
"""
return keyframe_interpolator(self._keyframes(), self.tempo, self.frame_rate)
# Internal method to return an unwrapped generator function.
def _keyframes(self):
while True:
yield self.__next__()
# The __next__ method updates the world model and returns the next image. This is
# generally called from the interpolator at the tempo rate slower than the frame rate.
def __next__(self):
self._update()
self._spontaneous_generation()
# Crop out a frame-sized result of the world model.
mono = self.world[0:self.frame_height, 0:self.frame_width]
# For now, just return as white or black in floating point RGB.
color = np.ones((3), dtype=np.float32)
return mono.reshape(self.frame_height, self.frame_width, 1) * color
#---------------------------------------------------------------------------
# Simulation model.
def _update(self):
"""Compute one new generation for a game of Conway's Life."""
rows, cols = self.world_rows, self.world_cols
board = self.world
# Create a padded board to simplify computing the wraparound of a toroidal
# world. This also converts to a numeric array so the neighbor calculation
# will produce a count.
padded = np.zeros( (rows+2, cols+2), dtype=np.uint8 )
# Set the interior cells to the source board.
padded[1:rows+1, 1:cols+1] = board
# Copy the edge rows across to the opposite side to make a toroidal world.
padded[ 0, :] = padded[-2,:] # set top row from last board row
padded[-1, :] = padded[ 1,:] # set bottom row from first board row
padded[:, 0] = padded[:, -2] # set first column from last board column
padded[:, -1] = padded[:, 1] # set last column from first board column
# Copy corners diagonally.
padded[ 0, 0] = padded[-2, -2]
padded[ 0, -1] = padded[-2, 1]
padded[ -1,-1] = padded[ 1, 1]
padded[ -1, 0] = padded[ 1, -2]
# Compute a count of the neighbors for each interior cell. The result is
# the same size as the original board. Each source sum element is a
# displaced submatrix.
neighbors = ( padded[ 0:-2, 0:-2 ] +
padded[ 0:-2, 1:-1 ] +
padded[ 0:-2, 2: ] +
padded[ 1:-1, 0:-2 ] +
padded[ 1:-1, 2: ] +
padded[ 2: , 0:-2 ] +
padded[ 2: , 1:-1 ] +
padded[ 2: , 2: ] )
# Apply the Life rules:
# 1. If the cell is alive, keep it alive if it has two or three neighbors.
# 2. If the cell is not alive, make it alive if it has three neighbors.
rule1 = np.logical_or( neighbors==2, neighbors==3)
rule2 = neighbors == 3
# Combine the two rules, applying rule1 if a cell was originally alive and
# rule2 if it was originally not alive. Replace the previous world.
self.world = np.where( board, rule1, rule2)
#---------------------------------------------------------------------------
def _spontaneous_generation(self):
"""Create new spontaneously generated cells in a Conway Life simulation. This
helps a simulation to run entirely autonomously by making sure it never
completely dies off. min_alive specifies the lower threshold of living
cells to try to guarantee, a good value is about 3% of the array size.
"""
num_alive = np.sum(self.world)
# Choose the number of random cells to write.
rcells = np.random.randint(0, 4) + max(0, self.min_alive - num_alive)
rows, cols = self.world.shape
# Set the specified number of randomly chosen cells to alive.
for i in range(rcells):
self.world[np.random.randint(0, rows), np.random.randint(0, cols)] = True
#---------------------------------------------------------------------------
# Compute the interpolated Conway Life result, crop out the frame-size
# result, and blend it in on all channels. The interpolated result is a
# grayscale image scaled between 0 and 255.
# def alpha_blend(rgb, alpha):
# """Image operator to blend a grayscale alpha image into an RGB image"""
# if False:
# life_world = next(self.lifegen)
# composite = (1.0 - life_alpha[..., np.newaxis]/255.0) * self.frame + life_alpha[..., np.newaxis] * np.ones((1, 3))
# self.frame[:] = np.clip(composite, 0, 255).astype(np.uint8)
#--------------------------------------------------------------------------------
[docs]class Diffusion:
"""Iterable object which produces an animation of blurring impulses. Note that
all iterators returned use the same object state, so evaluating multiple iterations
simultaneously may produce unexpected results.
"""
def __init__(self, config, color_table):
self.config = config
self.impulse_interval = int(config['diffusion']['impulse_interval'])
self.decay = float(config['diffusion']['decay'])
self.frame_rate = int(config['video']['frame_rate'])
self.color_cycle = [color for color in color_table.values()]
# Create working matrices. These will be stored in RGB order. The output
# frame buffer uses an 8-bit unsigned integer per channel. (Note that OpenCV
# conventionally uses BGR order.)
frame_width = int(config['video']['frame_width'])
frame_height = int(config['video']['frame_height'])
self.frame_shape = (frame_height, frame_width, 3)
self.buf1 = np.zeros(self.frame_shape, dtype=np.float32)
self.buf2 = np.zeros(self.frame_shape, dtype=np.float32)
# frame counter for driving animation
self.count = 0
def __iter__(self):
return self
def __next__(self):
"""Iterator function to return the next image in the sequence."""
frame_height, frame_width, channel_count = self.frame_shape
# deliver impulses at a regular tempo
if ((self.count % self.impulse_interval) == 0):
cycle = self.count // self.impulse_interval
separation = (cycle * 10) % (frame_width//2)
center1 = frame_width//2 - separation
center2 = frame_width//2 + separation
color1 = self.color_cycle[cycle % (len(self.color_cycle))]
color2 = self.color_cycle[-cycle % (len(self.color_cycle))]
self.buf1[:, center1-8:center1+8] = color1
self.buf1[:, center2-8:center2+8] = color2
# apply diffusion and decay on every frame
cv.blur(self.buf1, (3,3), self.buf2)
self.buf1[:] = self.decay * self.buf2[:]
# return the buffer as a frame
self.count += 1
return self.buf1
#--------------------------------------------------------------------------------
[docs]class Colors:
"""Color and palette tables. Can produce several different image
generators with static color fields or simple generated patterns."""
def __init__(self, config):
self.config = config
self.frame_rate = int(config['video']['frame_rate'])
frame_width = int(config['video']['frame_width'])
frame_height = int(config['video']['frame_height'])
self.frame_shape = (frame_height, frame_width, 3)
# Initialize color tables from the configuration. Colors are internally
# stored as floating point RGB numpy arrays with a unit range (0.0, 1.0).
# Parse the color dictionary as a JSON string
try:
self.color_table = json.loads(self.config['colors']['colors'])
log.debug("Loaded color table: %s", self.color_table)
except json.decoder.JSONDecodeError:
log.warning("Unable to parse color table from configuration file.")
self.color_table = {"white" : [255,255,255], "black" : [0,0,0]}
# Parse the palette dictionary as a JSON string
try:
self.palette_table = json.loads(self.config['colors']['palettes'])
log.debug("Loaded palette table: %s", self.palette_table)
except json.decoder.JSONDecodeError:
log.warning("Unable to parse palette table from configuration file.")
self.palette_table = { "mono" : [[255,255,255],[0,0,0]] }
# Replace all 8-bit integer color table entries with unit-scale floating point.
for name, color in self.color_table.items():
self.color_table[name] = np.array(color, dtype=np.float32)/255.0
for name, array in self.palette_table.items():
self.palette_table[name] = [np.array(color, dtype=np.float32)/255.0 for color in array]
#----------------------------------------------------------------------------
[docs] def color_by_name(self, name, duration=None):
"""Create a color field generator which produces a floating point frame
of the named color. The default is an infinite sequence, or a duration
in frames can be specified."""
rgb = self.color_table[name]
frame = np.full(self.frame_shape, rgb, dtype=np.float32)
if duration is None:
while True:
yield frame
else:
for count in range(duration):
yield frame
#----------------------------------------------------------------------------
[docs] def palette_by_name(self, name, duration=None):
"""Create a palette color field generator. The default is an infinite
sequence, or a duration in frames can be specified.
"""
palette = self.palette_table[name]
num_colors = len(palette)
# Create color table as RGB image with a single row.
table = np.ndarray((1, num_colors, 3), dtype=np.float32)
table[:] = palette
# Generate a frame by resizing the color table to fill the width and height
width = self.frame_shape[1]
height = self.frame_shape[0]
frame = cv.resize(table, dsize=(width, height), interpolation=cv.INTER_NEAREST)
if duration is None:
while True:
yield frame
else:
for count in range(duration):
yield frame
#----------------------------------------------------------------------------
[docs] def rainbow(self, duration=None):
"""Create a rainbow field generator. The default is an infinite
sequence, or a duration in frames can be specified.
"""
# Generate a rainbow row of evenly distributed hue values between 0.0 and
# 180.0. This is the OpenCV convention for hue values.
width = self.frame_shape[1]
hues = np.arange(width) * (180.0 / width)
hsv = np.ones(self.frame_shape, dtype=np.float32)
hsv[:,:,0] = hues
frame = cv.cvtColor(hsv, cv.COLOR_HSV2RGB)
return self.iterate_image(frame, duration)
#----------------------------------------------------------------------------
[docs] def noise(self, duration=None, tempo=300):
"""Create a noise field generator. The default is an infinite sequence,
or a duration in frames can be specified. Image are generated at a
tempo slower than frame rate and interpolated.
"""
def noisegen():
while True:
yield np.random.rand(*self.frame_shape).astype(np.float32)
return keyframe_interpolator(noisegen(), tempo=tempo, frame_rate=self.frame_rate)
#----------------------------------------------------------------------------
[docs] def iterate_image(self, frame, duration=None):
"""Image sequence generator which produces a stream of the same image.
The default is an infinite sequence, or a duration in frames can be
specified.
"""
if duration is None:
while True:
yield frame
else:
for count in range(duration):
yield frame
#--------------------------------------------------------------------------------
[docs]class VideoWriter:
"""Video file writer to transcode image frames into a video file with the
given path. The default codec is lossless PNG image frames in an AVI
container.
"""
def __init__(self, config, path=None):
self.config = config
if path is None:
path = scriptname + datetime.datetime.now().strftime("-%Y-%m-%d-%H-%M-%S.avi")
self.path = path
self.frame_width = int(config['video']['frame_width'])
self.frame_height = int(config['video']['frame_height'])
self.frame_rate = int(config['video']['frame_rate'])
# default codec: PNG images, lossless, clean block edges
self.codec = config['video']['codec']
if self.codec == 'png':
self.codec = 'png '
# Open the actual writer. N.B. It appears that cv.VideoWriter will
# silently fail if the path is invalid, e.g. the folder doesn't exist.
codec_code = cv.VideoWriter.fourcc(*self.codec)
log.info("Opening video writer for %s with fourcc 0x%x", path, codec_code)
self._out = cv.VideoWriter(path, codec_code, self.frame_rate, (self.frame_width, self.frame_height))
if self._out is None:
log.warning("Failed to open video writer for %s.", self.path)
[docs] def write(self, frame):
"""Write an RGB image to the video file."""
if self._out is not None:
# The OpenCV VideoWriter expects BGR format.
bgr = cv.cvtColor(frame, cv.COLOR_RGB2BGR)
self._out.write(bgr)
[docs] def close(self):
"""Finalize the stream and close the output file."""
if self._out is not None:
self._out.release()
self._out = None
log.info("Closed video writer for %s", self.path)
#--------------------------------------------------------------------------------
[docs]class BridgeInstrument:
"""Real-time multi-function image generator for the Pausch Bridge.
Implemented as an infinite iterator, i.e. responds to __next__() to produce
image frames, responds to __iter__() to return self.
"""
def __init__(self, config):
# Cache configurable properties.
self.config = config
self.frame_rate = int(config['video']['frame_rate'])
frame_width = int(config['video']['frame_width'])
frame_height = int(config['video']['frame_height'])
self.frame_shape = (frame_height, frame_width, 3)
# Frame counter and other state variables.
self.count = 0
self.quantize = self.config['instrument']['spatial_quantize'] != 'False'
# The iterator for the current image stream. Sometimes this holds a crossfade
# generator transitioning to the next stream.
self.image_iterator = None
# The iterator for the following image stream, usually held temporarily
# until the crossfade is complete.
self.next_iterator = None
# The most recently generated frame.
self.frame = None
# Generator objects which can produce component image streams.
self.colors = Colors(config)
self.diffusion = Diffusion(config, self.colors.color_table)
self.life = ConwayLife(config)
self.pianoroll = PianoRoll(config)
self.waves = Waves(config)
self.solid_white = self.colors.color_by_name('white')
self.solid_black = self.colors.color_by_name('black')
# Set initial animation.
self.image_iterator = itertools.chain(crossfade(self.solid_black, self.solid_white, 4*self.frame_rate),
self.solid_white)
#----------------------------------------------------------------------------
# Manage the image generator graph.
[docs] def reset_graph(self):
"""Safety function to reset the operator graph if the sequence ends."""
log.debug("BridgeInstrument invoked reset_graph() safety event.")
self.image_iterator = self.solid_black
self.next_iterator = None
[docs] def transition(self, next_iterable):
"""Start a transition to the next effect."""
if self.next_iterator is not None:
# a transition is being interrupted, so replace it starting from the current output
log.debug("Transition interrupted.")
self.next_iterator = iter(next_iterable)
self.image_iterator = crossfade(self.colors.iterate_image(self.frame), self.next_iterator, self.frame_rate // 2)
else:
self.next_iterator = iter(next_iterable)
self.image_iterator = crossfade(self.image_iterator, self.next_iterator, self.frame_rate // 2)
#----------------------------------------------------------------------------
# Iterator protocol.
def __iter_(self):
return self
# Compute the next frame of the animation.
def __next__(self):
# evaluate the next frame using the current operator graph
while True:
try:
frame = next(self.image_iterator)
break
except StopIteration:
if self.next_iterator is None:
self.reset_graph()
else:
self.image_iterator = self.next_iterator
self.next_iterator = None
# optionally quantize image to the 57x2 fixture resolution
if self.quantize:
frame_height, frame_width, channel_count = self.frame_shape
blocky = cv.resize(frame, dsize=(57, 2), interpolation=cv.INTER_NEAREST)
frame = cv.resize(blocky, dsize=(frame_width, frame_height), interpolation=cv.INTER_NEAREST)
# update frame count
self.count += 1
self.frame = frame
return frame
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
[docs]class MainApp:
"""Main application object implementing the user input processing, frame timing, and networking I/O."""
def __init__(self, config, instrument):
self.config = config
self.instrument = instrument
# Output stream.
self.video_writer = None
# Open network sockets.
self._initialize_networking()
# Prepare to regulate frame timing.
self.frame_rate = int(config['video']['frame_rate'])
self.frame_interval = int(1000000000 / self.frame_rate)
self.frame_timer = time.monotonic_ns()
# Attach a handler to the keyboard interrupt (control-C).
self.initial_terminal_attributes = None
signal.signal(signal.SIGINT, self.sigint_handler)
# Prepare to receive keyboard characters as they are types.
self._enter_raw_input()
#-------------------------------------------------------------------------------
def _initialize_networking(self):
# Open a OSC UDP client to transmit image frames for preview.
preview_host = self.config['network']['osc_preview_host']
preview_port = int(self.config['network']['osc_preview_port'])
self.preview_xmitter = udp_client.UDPClient(preview_host, preview_port)
log.info("Opened UDP client for %s:%d", preview_host, preview_port)
# Optionally open a second OSC UDP client to transmit image frames for performance.
bridge_host = self.config['network']['osc_bridge_host']
if bridge_host != '':
bridge_port = int(self.config['network']['osc_bridge_port'])
self.bridge_xmitter = udp_client.UDPClient(bridge_host, bridge_port)
log.info("Opened UDP client for %s:%d", bridge_host, bridge_port)
else:
self.bridge_xmitter = None
#-------------------------------------------------------------------------------
def _frame_sleep(self):
# Wait for next frame time to transmit on a precise schedule. This
# will attempt to keep a precise average time.
self.frame_timer += self.frame_interval
now = time.monotonic_ns()
delay_ns = self.frame_timer - now
if delay_ns > 0.0:
time.sleep(1e-9 * delay_ns)
# print(f"Slept {delay_ns} ns.")
#-------------------------------------------------------------------------------
def _send_frame(self, frame):
frame_height, frame_width, channel_count = frame.shape
# Send several messages together as a bundle.
bundle = osc_bundle_builder.OscBundleBuilder(osc_bundle_builder.IMMEDIATELY)
metadata = osc_message_builder.OscMessageBuilder(address='/pbridge/frame/size')
metadata.add_arg(frame_height)
metadata.add_arg(frame_width)
metadata.add_arg(channel_count)
pixels = osc_message_builder.OscMessageBuilder(address='/pbridge/frame/data')
pixels.add_arg(frame.tobytes(), arg_type=pixels.ARG_TYPE_BLOB)
bundle.add_content(metadata.build())
bundle.add_content(pixels.build())
packet = bundle.build()
self.preview_xmitter.send(packet)
if self.bridge_xmitter is not None:
self.bridge_xmitter.send(packet)
#-------------------------------------------------------------------------------
def _enter_raw_input(self):
self.initial_terminal_attributes = termios.tcgetattr(sys.stdin.fileno())
log.debug("Saving terminal attributes and entering raw terminal mode.")
tty.setcbreak(sys.stdin.fileno())
print("Entered immediate terminal mode. Press control-C to exit.")
print("Active control keys include qwlbpdrun1.")
def _exit_raw_input(self):
if self.initial_terminal_attributes is not None:
log.debug("Restoring terminal attributes.")
termios.tcsetattr(sys.stdin.fileno(), termios.TCSADRAIN, self.initial_terminal_attributes)
def _is_key_ready(self):
r, w, x = select.select([sys.stdin], [], [], 0)
return r == [sys.stdin]
[docs] def process_user_input(self):
"""Logic for interpreting keystrokes and triggering animation events,
intended to be customized for a particular show."""
while self._is_key_ready():
key = sys.stdin.read(1)
print("User typed:", key)
if key == 'q':
self.instrument.quantize = not self.instrument.quantize
elif key == 'v':
self._toggle_recording()
elif key == 'w':
self.instrument.transition(self.instrument.waves)
elif key == 'l':
self.instrument.transition(self.instrument.life)
elif key == 'b':
self.instrument.transition(self.instrument.solid_black)
elif key == 'p':
self.instrument.transition(self.instrument.pianoroll)
elif key == 'd':
self.instrument.transition(self.instrument.diffusion)
elif key == 'r':
self.instrument.transition(self.instrument.colors.rainbow())
elif key == 'u':
self.instrument.transition(self.instrument.colors.palette_by_name('Ukraine'))
elif key == 'n':
self.instrument.transition(self.instrument.colors.noise())
elif key == '1':
# define a composite sequence using itertools
noise = self.instrument.colors.noise(tempo=120)
black = self.instrument.colors.color_by_name('black')
loop = itertools.cycle(itertools.chain(crossfade(black, noise, 4*self.frame_rate),
crossfade(noise, black, 1*self.frame_rate)))
self.instrument.transition(loop)
#-------------------------------------------------------------------------------
[docs] def sigint_handler(self, signal_number, stack_frame): # pylint: disable=unused-argument
"""Handler for user keyboard interrupts (e.g. control-C), to be customized with
close or shutdown handlers."""
print("Keyboard interrupt caught, running close handlers...")
# restore state as needed
self._exit_raw_input()
sys.exit(0)
#-------------------------------------------------------------------------------
def _toggle_recording(self):
if self.video_writer is None:
self.video_writer = VideoWriter(self.config)
else:
self.video_writer.close()
self.video_writer = None
#-------------------------------------------------------------------------------
# Generate images in real time representing the bridge illumination.
[docs] def run(self):
"""Infinite event loop to interpret user input and stream out image frames."""
while True:
# Check for user input.
self.process_user_input()
# Compute the next output frame.
float_frame = next(self.instrument)
# convert the frame to 8-bit RGB for output
int_frame = np.clip(255 * float_frame, 0, 255).astype(np.uint8)
# Wait for the next transmit time.
self._frame_sleep()
# Sent out new frame.
self._send_frame(int_frame)
# Optionally save to disk.
if self.video_writer is not None:
self.video_writer.write(int_frame)
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
[docs]def main(args):
"""Main script entry point. Reads configuration files, sets up logging, creates the main
application objecs, then enters the run loop."""
# Create a configuration object with default settings.
config = create_defaults()
# Load configuration files.
configuration_file_path = scriptname + '.config'
files_read = config.read([configuration_file_path] + args.configs)
# Set up logging to both console and a file.
_configure_logging(args, config)
# Report on the configuration file results after logging is established.
if len(files_read) > 0:
log.info("Read configuration from %s", files_read)
else:
log.info("Unable to read configuration from %s", configuration_file_path)
if args.path is not None:
log.info("Writing full configuration to %s", args.path)
with open(args.path, "w") as output:
config.write(output)
# Disable bridge network output unless specifically enabled at the command line.
if not args.bridge:
config['network']['osc_bridge_host'] = ""
else:
log.info("Live bridge output requested.")
# Initialize the image generator object.
instrument = BridgeInstrument(config)
# Initialize the timing and networking object.
main = MainApp(config, instrument)
# Enter the run loop to generate images in real-time.
main.run()
#-------------------------------------------------------------------------------
# Main script follows. This sequence is executed when the script is initiated from the command line.
if __name__ == "__main__":
parser = argparse.ArgumentParser( description = """Real-time interactive bridge animation generator.""")
parser.add_argument( '-v', '--verbose', action='store_true', help="Enable more detailed output.")
parser.add_argument( '-s', dest='path', type=str, help="Save full configuration to given path.")
parser.add_argument( '--bridge', action='store_true', help="Enable live output to bridge controller.")
parser.add_argument( 'configs', nargs="*", help = "Paths of additional configuration files.")
args = parser.parse_args()
main(args)