Camera Input

The source files can be browsed directly from camera_input/ or may be downloaded as camera_input.zip.

It requires OpenCV, numpy, and python-osc installed in your Python 3 system (see Python 3 Installation).

motion_detector.py

 1#!/usr/bin/env python3
 2"""motion_detector.py : self-contained camera video processing example
 3"""
 4import argparse, time
 5import numpy as np
 6
 7# https://docs.opencv.org/modules/highgui/doc/highgui.html
 8# https://docs.opencv.org/modules/highgui/doc/reading_and_writing_images_and_video.html
 9# https://docs.opencv.org/modules/video/doc/motion_analysis_and_object_tracking.html
10import cv2 as cv
11
12from pythonosc import osc_message_builder
13from pythonosc import udp_client
14
15#================================================================
16if __name__ == "__main__":
17    parser = argparse.ArgumentParser( description = "Process a video stream from the default camera.")
18    parser.add_argument( '--camera', default=0, type=int, help="Select camera by number (default: %(default)s).")
19    parser.add_argument( '-s','--saturation', action='store_true', help='Use only saturation channel.')
20    parser.add_argument( '-b','--background', action='store_false', help='Disable background subtraction.')
21    parser.add_argument( '-q','--quiet', action='store_true', help="Run without viewer window or console output.")
22    parser.add_argument( '--debug', action='store_true', help='Enable debugging output to log file.' )
23    parser.add_argument( '--verbose', action='store_true', help='Enable even more detailed logging output.' )
24    parser.add_argument("--ip", default="127.0.0.1",  help="IP address of the OSC receiver (default: %(default)s).")
25    parser.add_argument("--port", type=int, default=22000,  help="OSC receiver UDP port (default: %(default)s).")
26    args = parser.parse_args()
27
28    capture = cv.VideoCapture(args.camera)
29    if not capture.isOpened():
30        print("Unable to open camera, exiting.")
31        exit(1)
32
33    if not args.quiet:
34        cv.namedWindow("motion detector")
35
36    if args.background:
37        bgs = cv.createBackgroundSubtractorMOG2()
38
39    client = udp_client.SimpleUDPClient(args.ip, args.port)
40
41    def repeat():
42        success, frame = capture.read()
43        if success:
44            if args.saturation:
45                # transform the image
46                frame = cv.cvtColor(frame, cv.COLOR_BGR2HSV)
47                # pick out saturation
48                frame = frame[:,:,1]
49
50            if args.background:
51                foreground_mask = bgs.apply(frame)
52                frame = foreground_mask
53
54            # reduce the lower half of the frame to a tiny 1D image
55            rows, cols = frame.shape[0:2]
56            motion = cv.resize(frame[rows//2:][:], (16, 1))
57
58            # replace the original lower half with a blocky 1D image
59            frame[rows//2:][:] = cv.resize(motion, (cols, rows//2), interpolation=cv.INTER_NEAREST)
60
61            # print ("Showing image...")
62            if not args.quiet:
63                cv.imshow("viewer", frame)
64
65            # send black and white values as an OSC message
66            if args.background:
67                msg = [int(i) for i in motion[:][0]]
68                # print("Sending", msg, type(msg[0]))
69                client.send_message("/motion", msg)
70
71        else:
72            if args.verbose:
73                print ("no image")
74
75    try:
76        while True:
77            repeat()
78
79            # Calling waitKey allows the window system event loop to run and update
80            # the display.  The waitKey argument is in milliseconds.
81            if cv.waitKey(33) & 0xFF == ord('q'):
82                break
83
84    except KeyboardInterrupt:
85        print ("User quit.")
86
87    capture.release()