diora-web/radio/icy.py
2026-03-16 19:19:22 +01:00

163 lines
4.4 KiB
Python

"""
ICY stream metadata reader.
Connects to an internet radio stream with ICY metadata enabled and extracts
the StreamTitle tags embedded in the audio data at regular intervals.
"""
import socket
import ssl
import re
from typing import Optional
from urllib.parse import urlparse
def _connect(url: str, timeout: int):
"""Open a raw socket to the stream host, handling http and https."""
parsed = urlparse(url)
scheme = parsed.scheme.lower()
host = parsed.hostname
port = parsed.port
if port is None:
port = 443 if scheme == 'https' else 80
path = parsed.path or '/'
if parsed.query:
path = f"{path}?{parsed.query}"
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
sock.connect((host, port))
if scheme == 'https':
ctx = ssl.create_default_context()
sock = ctx.wrap_socket(sock, server_hostname=host)
request = (
f"GET {path} HTTP/1.0\r\n"
f"Host: {host}\r\n"
f"User-Agent: diora/1.0\r\n"
f"Accept: */*\r\n"
f"Icy-MetaData: 1\r\n"
f"Connection: close\r\n"
f"\r\n"
)
sock.sendall(request.encode('utf-8'))
return sock
def _read_headers(sock) -> dict:
"""Read HTTP/ICY response headers and return them as a lowercase dict."""
raw = b''
while b'\r\n\r\n' not in raw:
chunk = sock.recv(1)
if not chunk:
break
raw += chunk
headers = {}
lines = raw.decode('utf-8', errors='replace').split('\r\n')
for line in lines[1:]:
if ':' in line:
key, _, value = line.partition(':')
headers[key.strip().lower()] = value.strip()
return headers
def _recv_exactly(sock, n: int) -> bytes:
"""Read exactly n bytes from socket."""
buf = b''
while len(buf) < n:
chunk = sock.recv(n - len(buf))
if not chunk:
raise ConnectionError("Stream closed before reading enough bytes")
buf += chunk
return buf
def _extract_title(metadata: bytes) -> Optional[str]:
"""Extract StreamTitle value from ICY metadata bytes."""
text = metadata.decode('utf-8', errors='replace')
match = re.search(r"StreamTitle='([^']*)'", text)
if match:
title = match.group(1).strip()
return title if title else None
return None
def read_icy_title(url: str, timeout: int = 10) -> Optional[str]:
"""
Connect to an ICY stream, read one metadata block, and return the StreamTitle.
Returns the title string or None if it cannot be determined or an error occurs.
"""
try:
sock = _connect(url, timeout)
try:
headers = _read_headers(sock)
metaint_str = headers.get('icy-metaint', '')
if not metaint_str:
return None
metaint = int(metaint_str)
# Discard audio data up to the first metadata block
_recv_exactly(sock, metaint)
# Read metadata length byte (actual length = byte_value * 16)
meta_len_byte = _recv_exactly(sock, 1)[0]
meta_len = meta_len_byte * 16
if meta_len == 0:
return None
metadata = _recv_exactly(sock, meta_len)
return _extract_title(metadata)
finally:
sock.close()
except Exception:
return None
def stream_icy_metadata(url: str):
"""
Generator that continuously yields StreamTitle values from an ICY stream.
Connects to the stream, reads audio/metadata chunks in a loop, and yields
each non-empty StreamTitle as it arrives. Stops on any socket error or
server disconnect.
"""
try:
sock = _connect(url, timeout=15)
except Exception:
return
try:
headers = _read_headers(sock)
metaint_str = headers.get('icy-metaint', '')
if not metaint_str:
return
metaint = int(metaint_str)
while True:
# Discard audio data
_recv_exactly(sock, metaint)
# Read metadata length byte
meta_len_byte = _recv_exactly(sock, 1)[0]
meta_len = meta_len_byte * 16
if meta_len > 0:
metadata = _recv_exactly(sock, meta_len)
title = _extract_title(metadata)
if title:
yield title
except Exception:
return
finally:
try:
sock.close()
except Exception:
pass