Pirate TV for the esp32
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

295 lines
10 KiB

#!/usr/bin/env python3
"""
Stream video to ESP32 Channel3 RF Broadcast
This script takes RGB24 frames from ffmpeg via stdin, dithers them to 16 colors,
packs them as 4bpp, and streams them to the ESP32 over TCP.
Usage:
ffmpeg -i video.mp4 -vf "scale=116:220" -f rawvideo -pix_fmt rgb24 - | python stream_video.py <ESP32_IP>
Options:
-p, --port Port number (default: 5000)
-f, --fps Target frame rate (default: 30)
--no-dither Disable Floyd-Steinberg dithering (faster but lower quality)
"""
import sys
import socket
import argparse
import time
import numpy as np
# Image dimensions
WIDTH = 116
HEIGHT = 220
FRAME_SIZE_RGB = WIDTH * HEIGHT * 3
FRAME_SIZE_4BPP = WIDTH * HEIGHT // 2
# CGA-like 16 color palette (RGB values)
PALETTE = np.array([
[0, 0, 0], # 0: Black
[0, 0, 170], # 1: Blue
[0, 170, 0], # 2: Green
[0, 170, 170], # 3: Cyan
[170, 0, 0], # 4: Red
[170, 0, 170], # 5: Magenta
[170, 85, 0], # 6: Brown
[170, 170, 170], # 7: Light Gray
[85, 85, 85], # 8: Dark Gray
[85, 85, 255], # 9: Light Blue
[85, 255, 85], # 10: Light Green
[85, 255, 255], # 11: Light Cyan
[255, 85, 85], # 12: Light Red
[255, 85, 255], # 13: Light Magenta
[255, 255, 85], # 14: Yellow
[255, 255, 255], # 15: White
], dtype=np.float32)
# Luminance values for each palette color (ITU-R BT.601)
# Y = 0.299*R + 0.587*G + 0.114*B
PALETTE_LUMINANCE = np.array([
0.299*0 + 0.587*0 + 0.114*0, # 0: Black = 0
0.299*0 + 0.587*0 + 0.114*170, # 1: Blue = 19.4
0.299*0 + 0.587*170 + 0.114*0, # 2: Green = 99.8
0.299*0 + 0.587*170 + 0.114*170, # 3: Cyan = 119.2
0.299*170 + 0.587*0 + 0.114*0, # 4: Red = 50.8
0.299*170 + 0.587*0 + 0.114*170, # 5: Magenta = 70.2
0.299*170 + 0.587*85 + 0.114*0, # 6: Brown = 100.7
0.299*170 + 0.587*170 + 0.114*170, # 7: Light Gray = 170
0.299*85 + 0.587*85 + 0.114*85, # 8: Dark Gray = 85
0.299*85 + 0.587*85 + 0.114*255, # 9: Light Blue = 104.4
0.299*85 + 0.587*255 + 0.114*85, # 10: Light Green = 185.3
0.299*85 + 0.587*255 + 0.114*255, # 11: Light Cyan = 204.6
0.299*255 + 0.587*85 + 0.114*85, # 12: Light Red = 135.9
0.299*255 + 0.587*85 + 0.114*255, # 13: Light Magenta = 155.2
0.299*255 + 0.587*255 + 0.114*85, # 14: Yellow = 235.6
0.299*255 + 0.587*255 + 0.114*255, # 15: White = 255
], dtype=np.float32)
# Palette indices sorted by luminance (darkest to brightest)
GRAYSCALE_ORDER = np.argsort(PALETTE_LUMINANCE) # [0,1,4,5,8,2,6,9,3,12,13,7,10,11,14,15]
SORTED_LUMINANCE = PALETTE_LUMINANCE[GRAYSCALE_ORDER]
def find_nearest_grayscale_fast(img):
"""
Convert RGB image to grayscale and map to palette by luminance.
This gives 16 distinct gray levels on a B&W TV.
"""
# Convert to grayscale using luminance formula
gray = 0.299 * img[:,:,0] + 0.587 * img[:,:,1] + 0.114 * img[:,:,2]
# Find nearest luminance level
gray_expanded = gray[:, :, np.newaxis] # (H, W, 1)
lum_expanded = SORTED_LUMINANCE[np.newaxis, np.newaxis, :] # (1, 1, 16)
distances = np.abs(gray_expanded - lum_expanded)
nearest_idx = np.argmin(distances, axis=2)
# Map back to actual palette index
return GRAYSCALE_ORDER[nearest_idx].astype(np.uint8)
def find_nearest_colors_fast(img):
"""
Find nearest palette color for each pixel using vectorized operations.
Args:
img: numpy array of shape (H, W, 3) with RGB values (float32)
Returns:
numpy array of shape (H, W) with palette indices 0-15
"""
# Reshape for broadcasting: (H, W, 3) -> (H, W, 1, 3)
img_expanded = img[:, :, np.newaxis, :]
# PALETTE shape: (16, 3) -> (1, 1, 16, 3)
palette_expanded = PALETTE[np.newaxis, np.newaxis, :, :]
# Calculate squared distances to all palette colors
# Result shape: (H, W, 16)
distances = np.sum((img_expanded - palette_expanded) ** 2, axis=3)
# Find index of minimum distance for each pixel
return np.argmin(distances, axis=2).astype(np.uint8)
def dither_frame_fast(frame):
"""
Convert RGB frame to 16-color indexed using optimized Floyd-Steinberg dithering.
Uses vectorized row operations for better performance.
Args:
frame: numpy array of shape (HEIGHT, WIDTH, 3) with RGB values
Returns:
numpy array of shape (HEIGHT, WIDTH) with palette indices 0-15
"""
img = frame.astype(np.float32)
output = np.zeros((HEIGHT, WIDTH), dtype=np.uint8)
for y in range(HEIGHT):
# Process entire row at once for color matching
row = np.clip(img[y], 0, 255)
# Find nearest colors for entire row
row_expanded = row[:, np.newaxis, :] # (W, 1, 3)
palette_expanded = PALETTE[np.newaxis, :, :] # (1, 16, 3)
distances = np.sum((row_expanded - palette_expanded) ** 2, axis=2) # (W, 16)
indices = np.argmin(distances, axis=1).astype(np.uint8)
output[y] = indices
# Calculate errors for entire row
chosen_colors = PALETTE[indices] # (W, 3)
errors = row - chosen_colors # (W, 3)
# Distribute errors (Floyd-Steinberg)
# Right pixel: 7/16
if y < HEIGHT:
img[y, 1:, :] += errors[:-1, :] * (7.0 / 16.0)
# Next row
if y + 1 < HEIGHT:
# Bottom-left: 3/16
img[y + 1, :-1, :] += errors[1:, :] * (3.0 / 16.0)
# Bottom: 5/16
img[y + 1, :, :] += errors * (5.0 / 16.0)
# Bottom-right: 1/16
img[y + 1, 1:, :] += errors[:-1, :] * (1.0 / 16.0)
return output
def dither_frame_none(frame):
"""
Convert RGB frame to 16-color indexed without dithering (fastest).
Args:
frame: numpy array of shape (HEIGHT, WIDTH, 3) with RGB values
Returns:
numpy array of shape (HEIGHT, WIDTH) with palette indices 0-15
"""
return find_nearest_colors_fast(frame.astype(np.float32))
def pack_4bpp_fast(indexed_frame):
"""
Pack indexed frame (0-15 values) into 4bpp format using vectorized operations.
Two pixels per byte: high nibble = first pixel, low nibble = second pixel.
Args:
indexed_frame: numpy array of shape (HEIGHT, WIDTH) with values 0-15
Returns:
bytes object of length FRAME_SIZE_4BPP
"""
flat = indexed_frame.flatten()
# Take pairs of pixels and pack them
high_nibbles = flat[0::2].astype(np.uint8) << 4
low_nibbles = flat[1::2].astype(np.uint8)
packed = high_nibbles | low_nibbles
return packed.tobytes()
def main():
parser = argparse.ArgumentParser(
description='Stream video to ESP32 Channel3 RF Broadcast',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
Basic usage (with PAR correction for wide TV pixels):
ffmpeg -f lavfi -i color=black:293x220 -i video.mp4 -filter_complex "[1:v]scale=293:220:force_original_aspect_ratio=decrease[vid];[0:v][vid]overlay=(W-w)/2:(H-h)/2,scale=116:220" -f rawvideo -pix_fmt rgb24 -shortest - | python stream_video.py 192.168.1.100
Stream webcam:
ffmpeg -f dshow -i video="Your Webcam" -vf "scale=116:220" -f rawvideo -pix_fmt rgb24 - | python stream_video.py 192.168.1.100
Fast mode (no dithering):
ffmpeg -i video.mp4 -vf "scale=116:220" -f rawvideo -pix_fmt rgb24 - | python stream_video.py 192.168.1.100 --no-dither -f 60
"""
)
parser.add_argument('host', help='ESP32 IP address')
parser.add_argument('-p', '--port', type=int, default=5000, help='Port number (default: 5000)')
parser.add_argument('-f', '--fps', type=float, default=30, help='Target frame rate (default: 30)')
parser.add_argument('--no-dither', action='store_true', help='Disable dithering (faster)')
parser.add_argument('--grayscale', '--bw', action='store_true', help='Grayscale mode for B&W TVs (16 distinct gray levels)')
args = parser.parse_args()
# Calculate frame timing
frame_interval = 1.0 / args.fps
print(f"Connecting to {args.host}:{args.port}...")
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # Disable Nagle's algorithm
sock.connect((args.host, args.port))
print(f"Connected! Streaming at {args.fps} fps target...")
print("Press Ctrl+C to stop")
except Exception as e:
print(f"Connection failed: {e}")
sys.exit(1)
frame_count = 0
start_time = time.time()
# Select processing function based on mode
if args.grayscale:
# Grayscale mode: map by luminance for 16 distinct gray levels on B&W TV
dither_func = lambda f: find_nearest_grayscale_fast(f.astype(np.float32))
print("Grayscale mode: mapping to 16 luminance levels")
elif args.no_dither:
dither_func = dither_frame_none
else:
dither_func = dither_frame_fast
try:
while True:
frame_start = time.time()
# Read one RGB24 frame from stdin
raw_data = sys.stdin.buffer.read(FRAME_SIZE_RGB)
if len(raw_data) < FRAME_SIZE_RGB:
print(f"\nEnd of stream after {frame_count} frames")
break
# Convert to numpy array
frame = np.frombuffer(raw_data, dtype=np.uint8).reshape((HEIGHT, WIDTH, 3))
# Dither to 16 colors
indexed = dither_func(frame)
# Pack to 4bpp
packed = pack_4bpp_fast(indexed)
# Send to ESP32
try:
sock.sendall(packed)
except Exception as e:
print(f"\nSend error: {e}")
break
frame_count += 1
# Frame rate limiting
elapsed = time.time() - frame_start
if elapsed < frame_interval:
time.sleep(frame_interval - elapsed)
# Progress indicator
if frame_count % 30 == 0:
actual_fps = frame_count / (time.time() - start_time)
print(f"\rFrames: {frame_count}, FPS: {actual_fps:.1f} ", end='', flush=True)
except KeyboardInterrupt:
print(f"\nStopped after {frame_count} frames")
finally:
sock.close()
elapsed = time.time() - start_time
if elapsed > 0:
print(f"Average FPS: {frame_count / elapsed:.1f}")
if __name__ == '__main__':
main()