""" ICY stream metadata reader. Connects to an internet radio stream with ICY metadata enabled and extracts the StreamTitle tags embedded in the audio data at regular intervals. """ import socket import ssl import re from typing import Optional from urllib.parse import urlparse def _connect(url: str, timeout: int): """Open a raw socket to the stream host, handling http and https.""" parsed = urlparse(url) scheme = parsed.scheme.lower() host = parsed.hostname port = parsed.port if port is None: port = 443 if scheme == 'https' else 80 path = parsed.path or '/' if parsed.query: path = f"{path}?{parsed.query}" sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(timeout) sock.connect((host, port)) if scheme == 'https': ctx = ssl.create_default_context() sock = ctx.wrap_socket(sock, server_hostname=host) request = ( f"GET {path} HTTP/1.0\r\n" f"Host: {host}\r\n" f"User-Agent: diora/1.0\r\n" f"Accept: */*\r\n" f"Icy-MetaData: 1\r\n" f"Connection: close\r\n" f"\r\n" ) sock.sendall(request.encode('utf-8')) return sock def _read_headers(sock) -> dict: """Read HTTP/ICY response headers and return them as a lowercase dict.""" raw = b'' while b'\r\n\r\n' not in raw: chunk = sock.recv(1) if not chunk: break raw += chunk headers = {} lines = raw.decode('utf-8', errors='replace').split('\r\n') for line in lines[1:]: if ':' in line: key, _, value = line.partition(':') headers[key.strip().lower()] = value.strip() return headers def _recv_exactly(sock, n: int) -> bytes: """Read exactly n bytes from socket.""" buf = b'' while len(buf) < n: chunk = sock.recv(n - len(buf)) if not chunk: raise ConnectionError("Stream closed before reading enough bytes") buf += chunk return buf def _extract_title(metadata: bytes) -> Optional[str]: """Extract StreamTitle value from ICY metadata bytes.""" text = metadata.decode('utf-8', errors='replace') match = re.search(r"StreamTitle='([^']*)'", text) if match: title = match.group(1).strip() return title if title else None return None def read_icy_title(url: str, timeout: int = 10) -> Optional[str]: """ Connect to an ICY stream, read one metadata block, and return the StreamTitle. Returns the title string or None if it cannot be determined or an error occurs. """ try: sock = _connect(url, timeout) try: headers = _read_headers(sock) metaint_str = headers.get('icy-metaint', '') if not metaint_str: return None metaint = int(metaint_str) # Discard audio data up to the first metadata block _recv_exactly(sock, metaint) # Read metadata length byte (actual length = byte_value * 16) meta_len_byte = _recv_exactly(sock, 1)[0] meta_len = meta_len_byte * 16 if meta_len == 0: return None metadata = _recv_exactly(sock, meta_len) return _extract_title(metadata) finally: sock.close() except Exception: return None def stream_icy_metadata(url: str): """ Generator that continuously yields StreamTitle values from an ICY stream. Connects to the stream, reads audio/metadata chunks in a loop, and yields each non-empty StreamTitle as it arrives. Stops on any socket error or server disconnect. """ try: sock = _connect(url, timeout=15) except Exception: return try: headers = _read_headers(sock) metaint_str = headers.get('icy-metaint', '') if not metaint_str: return metaint = int(metaint_str) while True: # Discard audio data _recv_exactly(sock, metaint) # Read metadata length byte meta_len_byte = _recv_exactly(sock, 1)[0] meta_len = meta_len_byte * 16 if meta_len > 0: metadata = _recv_exactly(sock, meta_len) title = _extract_title(metadata) if title: yield title except Exception: return finally: try: sock.close() except Exception: pass