commit
6de1bbd1b9
1 changed files with 404 additions and 0 deletions
-
404terminalTv.py
@ -0,0 +1,404 @@ |
|||
#!/usr/bin/env python3 |
|||
import os |
|||
import re |
|||
import subprocess |
|||
import sys |
|||
import xml.etree.ElementTree as ET |
|||
from datetime import datetime, timedelta |
|||
from urllib.request import urlopen, Request |
|||
import urwid |
|||
from urllib.error import URLError |
|||
|
|||
# Configuration |
|||
M3U_URL = "http://10.0.0.17:8409/iptv/channels.m3u" |
|||
XMLTV_URL = "http://10.0.0.17:8409/iptv/xmltv.xml" |
|||
MPV_COMMAND = ["mpv", "--really-quiet", "--no-terminal", "--force-window=immediate"] |
|||
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" |
|||
|
|||
# Color Palette |
|||
PALETTE = [ |
|||
('header', 'white', 'dark blue'), |
|||
('footer', 'white', 'dark blue'), |
|||
('channel', 'black', 'light gray'), |
|||
('channel_focus', 'white', 'dark blue'), |
|||
('program_now', 'white', 'dark green'), |
|||
('program_next', 'black', 'light gray'), |
|||
('error', 'white', 'dark red'), |
|||
('divider', 'light gray', ''), |
|||
('program_desc', 'light cyan', ''), # New color for descriptions |
|||
('time', 'yellow', ''), # New color for time displays |
|||
('title', 'bold', ''), # New color for titles |
|||
] |
|||
|
|||
# ASCII Art |
|||
BANNER = r""" |
|||
__________._____. ___. .__ __ _______________ ____ |
|||
\______ \__\_ |__\_ |__ |__|/ |_ \__ ___/\ \ / / |
|||
| | _/ || __ \| __ \| \ __\ | | \ Y / |
|||
| | \ || \_\ \ \_\ \ || | | | \ / |
|||
|______ /__||___ /___ /__||__| |____| \___/ |
|||
\/ \/ \/ |
|||
""" |
|||
|
|||
class ChannelButton(urwid.Button): |
|||
def __init__(self, channel, *args, **kwargs): |
|||
self.channel = channel |
|||
if channel.get('current_show'): |
|||
label = f"{channel['name']}\nNow: {channel['current_show']['title']}" |
|||
if channel.get('next_show'): |
|||
label += f"\nNext: {channel['next_show']['title']}" |
|||
else: |
|||
label = channel['name'] |
|||
super().__init__(label, *args, **kwargs) |
|||
|
|||
class FocusAwareListBox(urwid.ListBox): |
|||
"""ListBox that tracks focus changes""" |
|||
def __init__(self, body, on_focus_change=None): |
|||
super().__init__(body) |
|||
self.on_focus_change = on_focus_change |
|||
self._last_focus = None |
|||
|
|||
def change_focus(self, size, position, *args, **kwargs): |
|||
super().change_focus(size, position, *args, **kwargs) |
|||
current_focus = self.focus |
|||
if current_focus != self._last_focus and self.on_focus_change: |
|||
self.on_focus_change(current_focus) |
|||
self._last_focus = current_focus |
|||
|
|||
class IPTVPlayer: |
|||
def __init__(self): |
|||
self.channels = [] |
|||
self.programs = {} |
|||
self.current_channel = None |
|||
self.mpv_process = None |
|||
self.load_data() |
|||
self.setup_ui() |
|||
|
|||
def load_data(self): |
|||
print("\n[DEBUG] Loading IPTV data...") |
|||
|
|||
# Load M3U playlist |
|||
try: |
|||
req = Request(M3U_URL, headers={'User-Agent': USER_AGENT}) |
|||
with urlopen(req, timeout=10) as response: |
|||
m3u_data = response.read().decode('utf-8') |
|||
print(f"[DEBUG] Received M3U data (length: {len(m3u_data)} bytes)") |
|||
|
|||
# Parse M3U |
|||
lines = m3u_data.split('\n') |
|||
current_channel = None |
|||
|
|||
for line in lines: |
|||
line = line.strip() |
|||
if line.startswith('#EXTINF:'): |
|||
# Parse channel info |
|||
tvg_id = re.search(r'tvg-id="([^"]*)"', line) |
|||
tvg_name = re.search(r'tvg-name="([^"]*)"', line) |
|||
tvg_logo = re.search(r'tvg-logo="([^"]*)"', line) |
|||
group_title = re.search(r'group-title="([^"]*)"', line) |
|||
channel_title = line.split(',')[-1].strip() |
|||
|
|||
current_channel = { |
|||
'id': tvg_id.group(1) if tvg_id else "", |
|||
'name': tvg_name.group(1) if tvg_name else channel_title, |
|||
'logo': tvg_logo.group(1) if tvg_logo else "", |
|||
'group': group_title.group(1) if group_title else "Other", |
|||
'title': channel_title, |
|||
'url': None, |
|||
'current_show': None, |
|||
'next_show': None |
|||
} |
|||
elif line.startswith('http://'): |
|||
if current_channel: |
|||
current_channel['url'] = line |
|||
self.channels.append(current_channel) |
|||
current_channel = None |
|||
|
|||
if not self.channels: |
|||
print("[WARNING] No channels found in M3U file!") |
|||
self.add_error_channel("No channels found in playlist") |
|||
else: |
|||
print(f"[DEBUG] Successfully loaded {len(self.channels)} channels") |
|||
|
|||
except Exception as e: |
|||
print(f"[ERROR] Failed to load M3U: {str(e)}") |
|||
self.add_error_channel(f"Error loading playlist: {str(e)}") |
|||
|
|||
# Load XMLTV guide |
|||
if self.channels and not self.channels[0]['name'].startswith("Error:"): |
|||
try: |
|||
req = Request(XMLTV_URL, headers={'User-Agent': USER_AGENT}) |
|||
with urlopen(req, timeout=10) as response: |
|||
xml_data = response.read().decode('utf-8') |
|||
print(f"[DEBUG] Received XMLTV data (length: {len(xml_data)} bytes)") |
|||
|
|||
root = ET.fromstring(xml_data) |
|||
for programme in root.findall('programme'): |
|||
channel_id = programme.get('channel') |
|||
start = self.parse_xmltv_time(programme.get('start')) |
|||
stop = self.parse_xmltv_time(programme.get('stop')) |
|||
title_elem = programme.find('title') |
|||
desc_elem = programme.find('desc') |
|||
|
|||
if channel_id not in self.programs: |
|||
self.programs[channel_id] = [] |
|||
|
|||
self.programs[channel_id].append({ |
|||
'start': start, |
|||
'stop': stop, |
|||
'title': title_elem.text if title_elem is not None else "No Title", |
|||
'desc': desc_elem.text if desc_elem is not None else "" |
|||
}) |
|||
print(f"[DEBUG] Loaded EPG data for {len(self.programs)} channels") |
|||
|
|||
# Pre-load current and next shows for each channel |
|||
now = datetime.now() |
|||
for channel in self.channels: |
|||
if channel['id'] in self.programs: |
|||
shows = self.programs[channel['id']] |
|||
for i, show in enumerate(shows): |
|||
if show['start'] <= now < show['stop']: |
|||
channel['current_show'] = show |
|||
if i+1 < len(shows): |
|||
channel['next_show'] = shows[i+1] |
|||
break |
|||
elif show['start'] > now: |
|||
channel['next_show'] = show |
|||
break |
|||
|
|||
except Exception as e: |
|||
print(f"[WARNING] Failed to load EPG: {str(e)}") |
|||
|
|||
def add_error_channel(self, message): |
|||
self.channels.append({ |
|||
'id': "error_channel", |
|||
'name': f"Error: {message}", |
|||
'logo': "", |
|||
'group': "Error", |
|||
'url': "", |
|||
'title': "Check your playlist URL", |
|||
'current_show': None, |
|||
'next_show': None |
|||
}) |
|||
|
|||
def parse_xmltv_time(self, time_str): |
|||
try: |
|||
return datetime.strptime(time_str[:14], "%Y%m%d%H%M%S") |
|||
except: |
|||
return datetime.now() |
|||
|
|||
def setup_ui(self): |
|||
# Create channel list with current show info |
|||
channel_items = [] |
|||
for channel in self.channels: |
|||
# Create custom button that knows its channel |
|||
btn = ChannelButton(channel) |
|||
|
|||
# Connect the signal with the correct channel using a closure |
|||
def make_click_handler(c): |
|||
return lambda button: self.on_channel_select(c) |
|||
|
|||
urwid.connect_signal(btn, 'click', make_click_handler(channel)) |
|||
|
|||
# Apply different colors based on channel status |
|||
if channel.get('url'): |
|||
channel_item = urwid.AttrMap(btn, 'channel', 'channel_focus') |
|||
else: |
|||
channel_item = urwid.AttrMap(btn, 'error', 'error') |
|||
|
|||
channel_items.append(channel_item) |
|||
|
|||
# Create list box that tracks focus changes |
|||
self.channel_list = FocusAwareListBox( |
|||
urwid.SimpleFocusListWalker(channel_items), |
|||
on_focus_change=self.on_focus_change |
|||
) |
|||
|
|||
# Create program guide with scrollable content |
|||
self.program_walker = urwid.SimpleFocusListWalker([]) |
|||
self.program_listbox = urwid.ListBox(self.program_walker) |
|||
program_frame = urwid.Frame( |
|||
urwid.LineBox(self.program_listbox, title="Program Details"), |
|||
footer=urwid.AttrMap(urwid.Text("Q: Quit | ↑↓: Navigate | Enter: Play Channel | L: Reload"), 'footer') |
|||
) |
|||
|
|||
# Create header with ASCII art |
|||
header_text = BANNER + "\nBibbitTV Terminal Player" |
|||
header = urwid.AttrMap(urwid.Text(header_text, align='center'), 'header') |
|||
|
|||
# Create columns |
|||
columns = urwid.Columns([ |
|||
('weight', 1, urwid.LineBox(self.channel_list, title="Channels")), |
|||
('weight', 2, program_frame) |
|||
], dividechars=1) |
|||
|
|||
# Main layout with header |
|||
layout = urwid.Pile([ |
|||
('pack', header), |
|||
('pack', urwid.Divider()), |
|||
columns |
|||
]) |
|||
|
|||
# Overlay for centering |
|||
self.top = urwid.Overlay( |
|||
urwid.LineBox(layout, title=""), |
|||
urwid.SolidFill(' '), |
|||
align='center', |
|||
width=('relative', 85), |
|||
valign='middle', |
|||
height=('relative', 85) |
|||
) |
|||
|
|||
def on_focus_change(self, focused_widget): |
|||
"""Update program info when focusing a channel""" |
|||
if focused_widget: |
|||
# Get the button inside the AttrMap |
|||
button = focused_widget.base_widget |
|||
if hasattr(button, 'channel'): |
|||
self.on_channel_hover(button.channel) |
|||
|
|||
def on_channel_hover(self, channel): |
|||
"""Update program info""" |
|||
self.current_channel = channel |
|||
program_info = self.get_program_info(channel) |
|||
self.program_walker[:] = [urwid.Text(line) for line in program_info] |
|||
|
|||
def on_channel_select(self, channel): |
|||
"""Play channel when selected""" |
|||
if channel.get('url'): |
|||
self.play_channel(channel['url']) |
|||
|
|||
def get_program_info(self, channel): |
|||
info = [] |
|||
info.append([("header", f"📺 Channel: {channel['name']}")]) |
|||
|
|||
if channel.get('group'): |
|||
info.append([("title", f"Group: {channel['group']}")]) |
|||
|
|||
if channel.get('current_show'): |
|||
now = datetime.now() |
|||
remaining = (channel['current_show']['stop'] - now).seconds // 60 |
|||
start_time = channel['current_show']['start'].strftime("%H:%M") |
|||
end_time = channel['current_show']['stop'].strftime("%H:%M") |
|||
|
|||
# Current show section with colorful formatting |
|||
info.append([]) # Empty line |
|||
info.append([("program_now", "⏺ NOW PLAYING")]) |
|||
info.append([("title", f"Title: {channel['current_show']['title']}")]) |
|||
info.append([ |
|||
("time", f"Time: {start_time} - {end_time} "), |
|||
("", f"({remaining} minutes remaining)") |
|||
]) |
|||
|
|||
if channel['current_show']['desc']: |
|||
info.append([]) # Empty line |
|||
info.append([("program_desc", "📝 Description:")]) |
|||
# Split long description into multiple lines |
|||
desc = channel['current_show']['desc'] |
|||
max_line_length = 60 |
|||
for i in range(0, len(desc), max_line_length): |
|||
info.append([('program_desc', desc[i:i+max_line_length])]) |
|||
|
|||
# Next show section |
|||
if channel.get('next_show'): |
|||
next_start = channel['next_show']['start'].strftime("%H:%M") |
|||
next_end = channel['next_show']['stop'].strftime("%H:%M") |
|||
|
|||
info.append([]) # Empty line |
|||
info.append([("divider", "─" * 50)]) |
|||
info.append([]) # Empty line |
|||
info.append([("program_next", "⏭ UP NEXT")]) |
|||
info.append([("title", f"Title: {channel['next_show']['title']}")]) |
|||
info.append([("time", f"Time: {next_start} - {next_end}")]) |
|||
|
|||
if channel['next_show']['desc']: |
|||
info.append([]) # Empty line |
|||
info.append([("program_desc", "📝 Description:")]) |
|||
desc = channel['next_show']['desc'] |
|||
# Limit description to 5 lines to prevent overflow |
|||
max_lines = 5 |
|||
line_count = 0 |
|||
for i in range(0, len(desc), max_line_length): |
|||
if line_count >= max_lines: |
|||
info.append([('program_desc', '... (description truncated)')]) |
|||
break |
|||
info.append([('program_desc', desc[i:i+max_line_length])]) |
|||
line_count += 1 |
|||
else: |
|||
info.append([("", "\nNo current program information available")]) |
|||
|
|||
return info |
|||
|
|||
def play_channel(self, url): |
|||
if self.mpv_process: |
|||
self.mpv_process.terminate() |
|||
self.mpv_process.wait() |
|||
|
|||
try: |
|||
self.mpv_process = subprocess.Popen(MPV_COMMAND + [url]) |
|||
except Exception as e: |
|||
self.program_walker[:] = [urwid.Text(("error", f"Failed to play stream:\n{str(e)}"))] |
|||
|
|||
def reload_data(self): |
|||
self.channels = [] |
|||
self.programs = {} |
|||
self.load_data() |
|||
self.setup_ui() |
|||
|
|||
def run(self): |
|||
# Enable mouse support and cursor visibility |
|||
screen = urwid.raw_display.Screen() |
|||
screen.set_terminal_properties(colors=256) |
|||
screen.set_mouse_tracking() |
|||
|
|||
loop = urwid.MainLoop( |
|||
self.top, |
|||
palette=PALETTE, |
|||
screen=screen, |
|||
unhandled_input=self.handle_input, |
|||
handle_mouse=True |
|||
) |
|||
loop.run() |
|||
|
|||
def handle_input(self, key): |
|||
if key in ('q', 'Q'): |
|||
self.quit_player() |
|||
elif key in ('l', 'L'): |
|||
self.reload_data() |
|||
elif key == 'enter': |
|||
if self.current_channel and self.current_channel.get('url'): |
|||
self.play_channel(self.current_channel['url']) |
|||
|
|||
def quit_player(self): |
|||
if self.mpv_process: |
|||
self.mpv_process.terminate() |
|||
self.mpv_process.wait() |
|||
raise urwid.ExitMainLoop() |
|||
|
|||
def __del__(self): |
|||
if hasattr(self, 'mpv_process') and self.mpv_process: |
|||
self.mpv_process.terminate() |
|||
|
|||
def check_mpv_installed(): |
|||
try: |
|||
if os.name == 'nt': |
|||
subprocess.run(["where.exe", "mpv"], check=True, |
|||
stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
|||
else: |
|||
subprocess.run(["which", "mpv"], check=True, |
|||
stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
|||
return True |
|||
except subprocess.CalledProcessError: |
|||
return False |
|||
|
|||
def main(): |
|||
if not check_mpv_installed(): |
|||
print("Error: mpv player is required but not found. Please install mpv first.") |
|||
print("You can download it from: https://mpv.io/installation/") |
|||
sys.exit(1) |
|||
|
|||
player = IPTVPlayer() |
|||
player.run() |
|||
|
|||
if __name__ == "__main__": |
|||
main() |
|||
Write
Preview
Loading…
Cancel
Save
Reference in new issue