commit 6de1bbd1b9ddd76ecb55034c68f8d8ad6c91737e Author: melancholytron Date: Sun Aug 17 14:12:02 2025 -0500 first commit diff --git a/terminalTv.py b/terminalTv.py new file mode 100644 index 0000000..ce9f7b6 --- /dev/null +++ b/terminalTv.py @@ -0,0 +1,404 @@ +#!/usr/bin/env python3 +import os +import re +import subprocess +import sys +import xml.etree.ElementTree as ET +from datetime import datetime, timedelta +from urllib.request import urlopen, Request +import urwid +from urllib.error import URLError + +# Configuration +M3U_URL = "http://10.0.0.17:8409/iptv/channels.m3u" +XMLTV_URL = "http://10.0.0.17:8409/iptv/xmltv.xml" +MPV_COMMAND = ["mpv", "--really-quiet", "--no-terminal", "--force-window=immediate"] +USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" + +# Color Palette +PALETTE = [ + ('header', 'white', 'dark blue'), + ('footer', 'white', 'dark blue'), + ('channel', 'black', 'light gray'), + ('channel_focus', 'white', 'dark blue'), + ('program_now', 'white', 'dark green'), + ('program_next', 'black', 'light gray'), + ('error', 'white', 'dark red'), + ('divider', 'light gray', ''), + ('program_desc', 'light cyan', ''), # New color for descriptions + ('time', 'yellow', ''), # New color for time displays + ('title', 'bold', ''), # New color for titles +] + +# ASCII Art +BANNER = r""" +__________._____. ___. .__ __ _______________ ____ +\______ \__\_ |__\_ |__ |__|/ |_ \__ ___/\ \ / / + | | _/ || __ \| __ \| \ __\ | | \ Y / + | | \ || \_\ \ \_\ \ || | | | \ / + |______ /__||___ /___ /__||__| |____| \___/ + \/ \/ \/ +""" + +class ChannelButton(urwid.Button): + def __init__(self, channel, *args, **kwargs): + self.channel = channel + if channel.get('current_show'): + label = f"{channel['name']}\nNow: {channel['current_show']['title']}" + if channel.get('next_show'): + label += f"\nNext: {channel['next_show']['title']}" + else: + label = channel['name'] + super().__init__(label, *args, **kwargs) + +class FocusAwareListBox(urwid.ListBox): + """ListBox that tracks focus changes""" + def __init__(self, body, on_focus_change=None): + super().__init__(body) + self.on_focus_change = on_focus_change + self._last_focus = None + + def change_focus(self, size, position, *args, **kwargs): + super().change_focus(size, position, *args, **kwargs) + current_focus = self.focus + if current_focus != self._last_focus and self.on_focus_change: + self.on_focus_change(current_focus) + self._last_focus = current_focus + +class IPTVPlayer: + def __init__(self): + self.channels = [] + self.programs = {} + self.current_channel = None + self.mpv_process = None + self.load_data() + self.setup_ui() + + def load_data(self): + print("\n[DEBUG] Loading IPTV data...") + + # Load M3U playlist + try: + req = Request(M3U_URL, headers={'User-Agent': USER_AGENT}) + with urlopen(req, timeout=10) as response: + m3u_data = response.read().decode('utf-8') + print(f"[DEBUG] Received M3U data (length: {len(m3u_data)} bytes)") + + # Parse M3U + lines = m3u_data.split('\n') + current_channel = None + + for line in lines: + line = line.strip() + if line.startswith('#EXTINF:'): + # Parse channel info + tvg_id = re.search(r'tvg-id="([^"]*)"', line) + tvg_name = re.search(r'tvg-name="([^"]*)"', line) + tvg_logo = re.search(r'tvg-logo="([^"]*)"', line) + group_title = re.search(r'group-title="([^"]*)"', line) + channel_title = line.split(',')[-1].strip() + + current_channel = { + 'id': tvg_id.group(1) if tvg_id else "", + 'name': tvg_name.group(1) if tvg_name else channel_title, + 'logo': tvg_logo.group(1) if tvg_logo else "", + 'group': group_title.group(1) if group_title else "Other", + 'title': channel_title, + 'url': None, + 'current_show': None, + 'next_show': None + } + elif line.startswith('http://'): + if current_channel: + current_channel['url'] = line + self.channels.append(current_channel) + current_channel = None + + if not self.channels: + print("[WARNING] No channels found in M3U file!") + self.add_error_channel("No channels found in playlist") + else: + print(f"[DEBUG] Successfully loaded {len(self.channels)} channels") + + except Exception as e: + print(f"[ERROR] Failed to load M3U: {str(e)}") + self.add_error_channel(f"Error loading playlist: {str(e)}") + + # Load XMLTV guide + if self.channels and not self.channels[0]['name'].startswith("Error:"): + try: + req = Request(XMLTV_URL, headers={'User-Agent': USER_AGENT}) + with urlopen(req, timeout=10) as response: + xml_data = response.read().decode('utf-8') + print(f"[DEBUG] Received XMLTV data (length: {len(xml_data)} bytes)") + + root = ET.fromstring(xml_data) + for programme in root.findall('programme'): + channel_id = programme.get('channel') + start = self.parse_xmltv_time(programme.get('start')) + stop = self.parse_xmltv_time(programme.get('stop')) + title_elem = programme.find('title') + desc_elem = programme.find('desc') + + if channel_id not in self.programs: + self.programs[channel_id] = [] + + self.programs[channel_id].append({ + 'start': start, + 'stop': stop, + 'title': title_elem.text if title_elem is not None else "No Title", + 'desc': desc_elem.text if desc_elem is not None else "" + }) + print(f"[DEBUG] Loaded EPG data for {len(self.programs)} channels") + + # Pre-load current and next shows for each channel + now = datetime.now() + for channel in self.channels: + if channel['id'] in self.programs: + shows = self.programs[channel['id']] + for i, show in enumerate(shows): + if show['start'] <= now < show['stop']: + channel['current_show'] = show + if i+1 < len(shows): + channel['next_show'] = shows[i+1] + break + elif show['start'] > now: + channel['next_show'] = show + break + + except Exception as e: + print(f"[WARNING] Failed to load EPG: {str(e)}") + + def add_error_channel(self, message): + self.channels.append({ + 'id': "error_channel", + 'name': f"Error: {message}", + 'logo': "", + 'group': "Error", + 'url': "", + 'title': "Check your playlist URL", + 'current_show': None, + 'next_show': None + }) + + def parse_xmltv_time(self, time_str): + try: + return datetime.strptime(time_str[:14], "%Y%m%d%H%M%S") + except: + return datetime.now() + + def setup_ui(self): + # Create channel list with current show info + channel_items = [] + for channel in self.channels: + # Create custom button that knows its channel + btn = ChannelButton(channel) + + # Connect the signal with the correct channel using a closure + def make_click_handler(c): + return lambda button: self.on_channel_select(c) + + urwid.connect_signal(btn, 'click', make_click_handler(channel)) + + # Apply different colors based on channel status + if channel.get('url'): + channel_item = urwid.AttrMap(btn, 'channel', 'channel_focus') + else: + channel_item = urwid.AttrMap(btn, 'error', 'error') + + channel_items.append(channel_item) + + # Create list box that tracks focus changes + self.channel_list = FocusAwareListBox( + urwid.SimpleFocusListWalker(channel_items), + on_focus_change=self.on_focus_change + ) + + # Create program guide with scrollable content + self.program_walker = urwid.SimpleFocusListWalker([]) + self.program_listbox = urwid.ListBox(self.program_walker) + program_frame = urwid.Frame( + urwid.LineBox(self.program_listbox, title="Program Details"), + footer=urwid.AttrMap(urwid.Text("Q: Quit | ↑↓: Navigate | Enter: Play Channel | L: Reload"), 'footer') + ) + + # Create header with ASCII art + header_text = BANNER + "\nBibbitTV Terminal Player" + header = urwid.AttrMap(urwid.Text(header_text, align='center'), 'header') + + # Create columns + columns = urwid.Columns([ + ('weight', 1, urwid.LineBox(self.channel_list, title="Channels")), + ('weight', 2, program_frame) + ], dividechars=1) + + # Main layout with header + layout = urwid.Pile([ + ('pack', header), + ('pack', urwid.Divider()), + columns + ]) + + # Overlay for centering + self.top = urwid.Overlay( + urwid.LineBox(layout, title=""), + urwid.SolidFill(' '), + align='center', + width=('relative', 85), + valign='middle', + height=('relative', 85) + ) + + def on_focus_change(self, focused_widget): + """Update program info when focusing a channel""" + if focused_widget: + # Get the button inside the AttrMap + button = focused_widget.base_widget + if hasattr(button, 'channel'): + self.on_channel_hover(button.channel) + + def on_channel_hover(self, channel): + """Update program info""" + self.current_channel = channel + program_info = self.get_program_info(channel) + self.program_walker[:] = [urwid.Text(line) for line in program_info] + + def on_channel_select(self, channel): + """Play channel when selected""" + if channel.get('url'): + self.play_channel(channel['url']) + + def get_program_info(self, channel): + info = [] + info.append([("header", f"📺 Channel: {channel['name']}")]) + + if channel.get('group'): + info.append([("title", f"Group: {channel['group']}")]) + + if channel.get('current_show'): + now = datetime.now() + remaining = (channel['current_show']['stop'] - now).seconds // 60 + start_time = channel['current_show']['start'].strftime("%H:%M") + end_time = channel['current_show']['stop'].strftime("%H:%M") + + # Current show section with colorful formatting + info.append([]) # Empty line + info.append([("program_now", "⏺ NOW PLAYING")]) + info.append([("title", f"Title: {channel['current_show']['title']}")]) + info.append([ + ("time", f"Time: {start_time} - {end_time} "), + ("", f"({remaining} minutes remaining)") + ]) + + if channel['current_show']['desc']: + info.append([]) # Empty line + info.append([("program_desc", "📝 Description:")]) + # Split long description into multiple lines + desc = channel['current_show']['desc'] + max_line_length = 60 + for i in range(0, len(desc), max_line_length): + info.append([('program_desc', desc[i:i+max_line_length])]) + + # Next show section + if channel.get('next_show'): + next_start = channel['next_show']['start'].strftime("%H:%M") + next_end = channel['next_show']['stop'].strftime("%H:%M") + + info.append([]) # Empty line + info.append([("divider", "─" * 50)]) + info.append([]) # Empty line + info.append([("program_next", "⏭ UP NEXT")]) + info.append([("title", f"Title: {channel['next_show']['title']}")]) + info.append([("time", f"Time: {next_start} - {next_end}")]) + + if channel['next_show']['desc']: + info.append([]) # Empty line + info.append([("program_desc", "📝 Description:")]) + desc = channel['next_show']['desc'] + # Limit description to 5 lines to prevent overflow + max_lines = 5 + line_count = 0 + for i in range(0, len(desc), max_line_length): + if line_count >= max_lines: + info.append([('program_desc', '... (description truncated)')]) + break + info.append([('program_desc', desc[i:i+max_line_length])]) + line_count += 1 + else: + info.append([("", "\nNo current program information available")]) + + return info + + def play_channel(self, url): + if self.mpv_process: + self.mpv_process.terminate() + self.mpv_process.wait() + + try: + self.mpv_process = subprocess.Popen(MPV_COMMAND + [url]) + except Exception as e: + self.program_walker[:] = [urwid.Text(("error", f"Failed to play stream:\n{str(e)}"))] + + def reload_data(self): + self.channels = [] + self.programs = {} + self.load_data() + self.setup_ui() + + def run(self): + # Enable mouse support and cursor visibility + screen = urwid.raw_display.Screen() + screen.set_terminal_properties(colors=256) + screen.set_mouse_tracking() + + loop = urwid.MainLoop( + self.top, + palette=PALETTE, + screen=screen, + unhandled_input=self.handle_input, + handle_mouse=True + ) + loop.run() + + def handle_input(self, key): + if key in ('q', 'Q'): + self.quit_player() + elif key in ('l', 'L'): + self.reload_data() + elif key == 'enter': + if self.current_channel and self.current_channel.get('url'): + self.play_channel(self.current_channel['url']) + + def quit_player(self): + if self.mpv_process: + self.mpv_process.terminate() + self.mpv_process.wait() + raise urwid.ExitMainLoop() + + def __del__(self): + if hasattr(self, 'mpv_process') and self.mpv_process: + self.mpv_process.terminate() + +def check_mpv_installed(): + try: + if os.name == 'nt': + subprocess.run(["where.exe", "mpv"], check=True, + stdout=subprocess.PIPE, stderr=subprocess.PIPE) + else: + subprocess.run(["which", "mpv"], check=True, + stdout=subprocess.PIPE, stderr=subprocess.PIPE) + return True + except subprocess.CalledProcessError: + return False + +def main(): + if not check_mpv_installed(): + print("Error: mpv player is required but not found. Please install mpv first.") + print("You can download it from: https://mpv.io/installation/") + sys.exit(1) + + player = IPTVPlayer() + player.run() + +if __name__ == "__main__": + main() \ No newline at end of file