from email.mime import base
import cv2 as cv
import numpy as np
import os
import pandas as pd
import random as rd
import yaml
from typing import NewType
bridge_width = 228
bridge_height = 8
frame_rate = 30
codec_code = cv.VideoWriter.fourcc(*'png ')
dtype = 'int16' # used so we can mask with -1, converted to uint8 that opencv expects before writing
RGB = NewType('RGB', tuple[int, int, int])
Indices = NewType(
'Indices', tuple[tuple[int, int], tuple[int, int], tuple[int, int]])
[docs]def read_palette(filename):
return [parse_tuple(color) for color in pd.read_csv(filename).colors]
[docs]def parse_tuple(s, dtype=int):
s = s.replace('(', '').replace(')', '')
return tuple(dtype(num) for num in s.split(','))
[docs]def parse_field(data, field, optional=False, default=(0, 0), dtype=int):
''' parse yaml field into appropriate tuple values
:param data: data dictionary
:param field: field to access data dictionary from
:param optional: [optional] if True, return default value if field not in data
:param default: [optional] value to return if optional flag is true
:param dtype: [optional] what to cast tuple vals into (default is integer) '''
if optional and field not in data:
return default
return parse_tuple(data[field], dtype)
[docs]def parse_sprite_yaml(data, curr_time):
''' parses color, position, etc from sprite '''
params = {}
params['base_rgb'] = parse_field(data, 'bg_color', True, (-1, -1, -1), int)
params['highlight_rgb'] = parse_field(data, 'sprite_color')
for entry in data['positions']:
params['pos'] = parse_field(entry, 'start')
params['velocity'] = parse_field(entry, 'velocity', True, dtype=float)
params['acceleration'] = parse_field(
entry, 'acceleration', True, dtype=float)
params['start_time'] = curr_time
params['end_time'] = params['start_time'] + int(entry['duration'])
yield params
[docs]class PauschFrame:
def __init__(self):
self.frame = np.zeros((bridge_height, bridge_width, 3), dtype=dtype)
[docs] def get_base_indices(self):
return [(0, bridge_height), (0, bridge_width), (0, 3)]
[docs] def get_top(self, indices: Indices = None) -> Indices:
indices = indices if indices is not None else self.get_base_indices()
(height_start, height_stop), width, color = indices
return (height_start, int(height_stop / 2)), width, color
[docs] def get_bottom(self, indices: Indices = None) -> Indices:
indices = indices if indices is not None else self.get_base_indices()
(height_start, height_stop), width, color = indices
return ((height_start - height_stop) / 2, height_stop), width, color
[docs] def get_region(self, start, end, indices: Indices = None) -> Indices:
indices = indices if indices is not None else self.get_base_indices()
height, _, color = indices
return height, (start, end), color
[docs] def set_values(self, indices: Indices, subframe: np.matrix):
height, width, rgb = [slice(start, stop) for start, stop in indices]
mask_data = subframe != -1
self.frame[height, width, rgb] = np.where(
mask_data > 0, subframe, self.frame[height, width, rgb])
[docs]class PauschBridge:
def __init__(self, num_frames: int = 0):
self.frames = [PauschFrame() for _ in range(num_frames)]
def __add__(self, other):
pbl = PauschBridge()
pbl.frames = self.frames + other.frames
return pbl
def _effect_params(self, start_time: int, end_time: int, slices: list[Indices]):
''' boilerplate parameters often needed for any effect methods
:param start_time: time (sec) of effect start
:param end_time: time (sec) of effect end
:param slices: [optional] subset of frame on which the effect takes place
:return tuple of start_frame index, end_frame index, and slices '''
self.add_missing_frames(end_time)
start_frame = start_time * frame_rate
end_frame = end_time * frame_rate
slices = slices if slices is not None else [
frame.get_base_indices() for frame in self.frames[start_frame:end_frame]]
return start_frame, end_frame, slices
[docs] def add_missing_frames(self, end_time: int):
''' if self.frames is not large enough to incorporate end_time, pad it
:param end_time: time (sec) to fill self.frames up to'''
end_index = end_time * frame_rate
# add missing frames if needed
if len(self.frames) < end_index:
self.frames += [PauschFrame()
for _ in range(len(self.frames), end_index)]
[docs] def set_values(self, indices: list[Indices], frames: list[np.matrix], start_time, end_time):
''' set frame values within the specified timeframe
:param indices: subset of frame on which the effect takes place
:param frames: frame list to update self.frames, should match size specified by indices
:param start_time: time (sec) of effect start
:param end_time: time (sec) of effect end '''
start_frame = start_time * frame_rate
end_frame = end_time * frame_rate
for inds, mat, frame in zip(indices, frames, range(start_frame, end_frame)):
self.frames[frame].set_values(inds, mat)
[docs] def get_top(self, duration, start_time=0):
''' gets list of indices specifying the top half of Pausch Bridge only
:param duration: time (sec) of effect end
:param start_time: [optional] time (sec) of effect start'''
self.add_missing_frames(duration - start_time)
# calculate frame indices
start_index = start_time * frame_rate
end_index = duration * frame_rate
return [frame.get_top() for frame in self.frames[start_index:end_index]]
[docs] def get_bottom(self, duration, start_time=0):
''' gets list of indices specifying the bottom half of Pausch Bridge only
:param duration: time (sec) of effect end
:param start_time: [optional] time (sec) of effect start'''
self.add_missing_frames(duration - start_time)
# calculate frame indices
start_index = start_time * frame_rate
end_index = duration * frame_rate
return [frame.get_bottom() for frame in self.frames[start_index:end_index]]
[docs] def get_region(self, duration, region_start, region_end, start_time=0):
''' gets list of indices specifying the bottom half of Pausch Bridge only
:param duration: time (sec) of effect end
:param start_time: [optional] time (sec) of effect start'''
self.add_missing_frames(duration - start_time)
# calculate frame indices
start_index = start_time * frame_rate
end_index = duration * frame_rate
return [frame.get_region(region_start, region_end) for frame in self.frames[start_index:end_index]]
[docs] def solid_color(self, rgb: RGB, end_time: int, start_time: int = 0, slices: list[Indices] = None):
''' effect that displays a solid color on the bridge
:param rgb: RGB values of the desired color
:param end_time: time (sec) of effect end
:param start_time: [optional] time (sec) of effect start, defaults to 0
:param slices: [optional] list of the subset of the frame to display effect on, defaults to whole frame'''
_, _, slices = self._effect_params(start_time, end_time, slices)
self.set_values(slices, [rgb for _ in slices], start_time, end_time)
return self
[docs] def hue_shift(self, start_rgb: RGB, end_rgb: RGB, end_time: int, start_time: int = 0, slices: list[Indices] = None):
''' effect that displays a gradual (linear) shift from one color to another
:param start_rgb: RGB values of the desired starting color
:param end_rgb: RGB values of the desired ending color
:param end_time: time (sec) of effect end
:param start_time: [optional] time (sec) of effect start
:param slices: [optional] list of the subset of the frame to display effect on, defaults to whole frame'''
def rgb_ranges(start_rgb: RGB, end_rgb: RGB, num_frames: int):
''' generator for hue shift'''
ranges = [np.linspace(start, end, num_frames)
for start, end in zip(start_rgb, end_rgb)]
for tup in zip(*ranges):
yield tup
start_frame, end_frame, slices = self._effect_params(
start_time, end_time, slices)
start_frame = start_time * frame_rate
end_frame = end_time * frame_rate
num_frames = end_frame - start_frame
self.set_values(slices, rgb_ranges(
start_rgb, end_rgb, num_frames), start_time, end_time)
return self
[docs] def sprite_from_file(self, filename: str, end_time: int, start_time: int = 0):
''' effect that moves a sprite based on data given from filename
:param filename: path to file
:param end_time: time (sec) of effect end
:param start_time: time (sec) of effect start'''
# check that file exists
if not os.path.exists(filename):
print('filename {} does not exist!'.format(filename))
# parse actual data
with open('sprite_data.yaml', 'r') as f:
data = yaml.load(f, Loader=yaml.FullLoader)
# each separate entry represents a different sprite
for sprite_data in data:
for params in parse_sprite_yaml(sprite_data, start_time):
self.sprite(**params)
start_time = params['end_time']
return self
[docs] def sprite(self, highlight_rgb: RGB, start_time: int, end_time: int, pos: tuple[int, int], velocity: tuple[int, int], acceleration: tuple[int, int], base_rgb: RGB, slices: list[Indices] = None):
''' effect that displays a small sprite moving linearly
:param highlight_rgb: RGB values of the desired sparkle color
:param start_time: time (sec) of effect start
:param end_time: time (sec) of effect end
:param pos: starting position of small sprite
:param velocity: velocity of small sprite (2-d tuple)
:param base_rgb: [optional] RGB values of the desired base color
:param slices: [optional] list of the subset of the frame to display effect on, defaults to whole frame'''
def gen_slice(pos: tuple[int, int], size: int = 3, limit: tuple[int, int] = (8, 228)):
x, y = map(round, pos)
half = size // 2
min_x = x - half if x - half >= 0 else 0
min_y = y - half if y - half >= 0 else 0
max_x = x + half + 1 if x + half + 1 < limit[0] else limit[0]
max_y = y + half + 1 if y + half + 1 < limit[1] else limit[1]
# check if any are outside the frame bounds
if max_x < 0 or max_y < 0:
return None, None
return slice(min_x, max_x), slice(min_y, max_y)
def gen_sprite_movement(num_frames):
curr_pos = pos
curr_vel = velocity
for _ in range(num_frames):
frame = np.full((bridge_height, bridge_width, 3),
base_rgb, dtype=dtype)
x, y = gen_slice(curr_pos)
if x is not None:
frame[x, y] = highlight_rgb
curr_vel = [v + a for v, a in zip(curr_vel, acceleration)]
curr_pos = [p + v for p, v in zip(curr_pos, curr_vel)]
yield frame
start_frame, end_frame, slices = self._effect_params(
start_time, end_time, slices)
self.set_values(slices, gen_sprite_movement(
end_frame - start_frame), start_time, end_time)
return self
[docs] def sparkle(self, highlight_rgb: RGB, end_time: int, start_time: int = 0, base_rgb: RGB = (-1, -1, -1), slices: list[Indices] = None):
''' effect that displays sparkles of a desired color on a solid background color
:param highlight_rgb: RGB values of the desired sparkle color
:param end_time: time (sec) of effect end
:param start_time: [optional] time (sec) of effect start
:param base_rgb: [optional] RGB values of the desired base color. If not specified, will not overwrite base color
:param slices: [optional] list of the subset of the frame to display effect on, defaults to whole frame'''
def gen_sparkles(num_frames):
''' generator frame function for the sparkles'''
sparkles = {}
for frame_i in range(num_frames):
# gen 15 sparkle every 3 frames
if not frame_i % 3:
for _ in range(15):
inds = (rd.randrange(bridge_height),
rd.randrange(bridge_width))
sparkles[inds] = rd.randrange(3, 7)
frame = np.full((bridge_height, bridge_width, 3),
base_rgb, dtype=dtype)
for (row, col), value in sparkles.items():
if not value:
continue
sparkles[row, col] -= 1
frame[row, col, :] = highlight_rgb
yield frame
start_frame, end_frame, slices = self._effect_params(
start_time, end_time, slices)
self.set_values(slices, gen_sparkles(
end_frame - start_frame), start_time, end_time)
return self
[docs] def wave(self, highlight_rgb: RGB, end_time: int, start_time: int = 0, base_rgb: RGB = (-1, -1, -1), slices: list[Indices] = None, width: float = 0.1, speed: int = 30) -> np.matrix:
''' effect that displays a wave of desired color & width on a base color
:param highlight_rgb: RGB values of the desired wave color
:param end_time: time (sec) of effect end
:param start_time: [optional] time (sec) of effect start
:param base_rgb: [optional] RGB values of the desired base color. If not specified, will overlay wave on top of existing color in frames
:param slices: [optional] list of the subset of the frame to display effect on, defaults to whole frame
:param width: desired width of wave in relation to bridge width, i.e. 0.5 means half the bridge width
:param speed: desired speed of wave in pixels / second '''
def gen_wave(start_frame, end_frame, wave_width):
dims = tuple([end - start for start, end in slices[0]])
frame = np.full(dims, base_rgb, dtype=dtype)
wave_pos = -1
for _ in range(start_frame, end_frame):
wave_pos += int(speed / frame_rate)
wave_start = max(wave_pos - wave_width, 0)
wave_end = wave_pos
frame[:, 0:wave_start, :] = base_rgb
frame[:, wave_start:wave_end, :] = highlight_rgb
if wave_start >= bridge_width: # the wave has gone through the whole bridge, start over
wave_pos = -1
yield frame
start_frame, end_frame, slices = self._effect_params(
start_time, end_time, slices)
wave_width = int(width * bridge_width) # in pixels
self.set_values(slices, gen_wave(
start_frame, end_frame, wave_width), start_time, end_time)
return self
[docs] def color_block(self, palette: list[RGB], end_time: int, start_time: int = 0, slices: list[Indices] = None, width: int = 4, speed: int = 30):
''' effect that displays a wave of desired color & width on a base color
:param palette: list of RGB values to randomly pick from
:param end_time: time (sec) of effect end
:param start_time: [optional] time (sec) of effect start
:param base_rgb: [optional] RGB values of the desired base color. If not specified, will overlay wave on top of existing color in frames
:param slices: [optional] list of the subset of the frame to display effect on, defaults to whole frame
:param width: desired width of wave in relation to bridge width, i.e. 0.5 means half the bridge width
:param speed: desired speed of wave in pixels / second '''
def gen_color_block(start_frame, end_frame):
dims = tuple([end - start for start, end in slices[0]])
# generate the starting frame first
frame = np.zeros(dims, dtype=dtype)
prev_color = None
for pos in range(0, dims[1], width):
# randomly choose a color and add it to the bridge, ensure it's not the previously generated color
if prev_color:
curr_palette = [p for p in palette if p != prev_color]
else:
curr_palette = palette
prev_color = rd.choice(curr_palette)
frame[:, pos:pos+width] = prev_color
for frame_index in range(end_frame - start_frame):
if frame_index % speed == 0: # time to move colors down
frame[:, :-width, :] = frame[:, width:, :]
prev_color = tuple(frame[-1, -1, :])
frame[:, -width:,
:] = rd.choice([p for p in palette if p != prev_color])
yield frame
start_frame, end_frame, slices = self._effect_params(
start_time, end_time, slices)
self.set_values(slices, gen_color_block(
start_frame, end_frame), start_time, end_time)
return self
[docs] def save(self, basename):
''' save frame output to .avi file
:param basename: base filename (without extension) '''
filename = basename + '.avi'
out = cv.VideoWriter(filename, codec_code,
frame_rate, (bridge_width, bridge_height))
for frame in self.frames:
frame = np.uint8(frame.frame)
out.write(frame)
out.release()
[docs]def full_day_simulation():
black = (0, 0, 0)
dark_red = (14, 1, 134)
yellow = (0, 228, 236)
sky_blue = (255, 208, 65)
cloud_grey = (237, 237, 237)
white = (255, 255, 255)
pbl = PauschBridge().hue_shift(black, dark_red, 30)
pbl += PauschBridge().hue_shift(dark_red, yellow, 28)
pbl += PauschBridge().hue_shift(yellow, sky_blue, 2)
pbl += PauschBridge().solid_color(sky_blue, 60).wave(cloud_grey,
60, slices=pbl.get_top(60))
pbl += PauschBridge().hue_shift(sky_blue, yellow, 2)
pbl += PauschBridge().hue_shift(yellow, dark_red, 28)
pbl += PauschBridge().hue_shift(dark_red, black, 30)
pbl += PauschBridge().sparkle(white, 60, base_rgb=black)
pbl.save('full_day_simulation')
[docs]def region_select_test():
sky_blue = (255, 208, 65)
black = (0, 0, 0)
white = (255, 255, 255)
pbl = PauschBridge()
pbl.solid_color(sky_blue, 5).hue_shift(
black, white, 5, slices=pbl.get_region(5, 40, 80))
pbl.save('test_region')
if __name__ == '__main__':
spare_test()
# full_day_simulation()