164 lines
4.4 KiB
Python
164 lines
4.4 KiB
Python
|
|
"""
|
||
|
|
ICY stream metadata reader.
|
||
|
|
|
||
|
|
Connects to an internet radio stream with ICY metadata enabled and extracts
|
||
|
|
the StreamTitle tags embedded in the audio data at regular intervals.
|
||
|
|
"""
|
||
|
|
|
||
|
|
import socket
|
||
|
|
import ssl
|
||
|
|
import re
|
||
|
|
from typing import Optional
|
||
|
|
from urllib.parse import urlparse
|
||
|
|
|
||
|
|
|
||
|
|
def _connect(url: str, timeout: int):
|
||
|
|
"""Open a raw socket to the stream host, handling http and https."""
|
||
|
|
parsed = urlparse(url)
|
||
|
|
scheme = parsed.scheme.lower()
|
||
|
|
host = parsed.hostname
|
||
|
|
port = parsed.port
|
||
|
|
|
||
|
|
if port is None:
|
||
|
|
port = 443 if scheme == 'https' else 80
|
||
|
|
|
||
|
|
path = parsed.path or '/'
|
||
|
|
if parsed.query:
|
||
|
|
path = f"{path}?{parsed.query}"
|
||
|
|
|
||
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||
|
|
sock.settimeout(timeout)
|
||
|
|
sock.connect((host, port))
|
||
|
|
|
||
|
|
if scheme == 'https':
|
||
|
|
ctx = ssl.create_default_context()
|
||
|
|
sock = ctx.wrap_socket(sock, server_hostname=host)
|
||
|
|
|
||
|
|
request = (
|
||
|
|
f"GET {path} HTTP/1.0\r\n"
|
||
|
|
f"Host: {host}\r\n"
|
||
|
|
f"User-Agent: diora/1.0\r\n"
|
||
|
|
f"Accept: */*\r\n"
|
||
|
|
f"Icy-MetaData: 1\r\n"
|
||
|
|
f"Connection: close\r\n"
|
||
|
|
f"\r\n"
|
||
|
|
)
|
||
|
|
sock.sendall(request.encode('utf-8'))
|
||
|
|
return sock
|
||
|
|
|
||
|
|
|
||
|
|
def _read_headers(sock) -> dict:
|
||
|
|
"""Read HTTP/ICY response headers and return them as a lowercase dict."""
|
||
|
|
raw = b''
|
||
|
|
while b'\r\n\r\n' not in raw:
|
||
|
|
chunk = sock.recv(1)
|
||
|
|
if not chunk:
|
||
|
|
break
|
||
|
|
raw += chunk
|
||
|
|
|
||
|
|
headers = {}
|
||
|
|
lines = raw.decode('utf-8', errors='replace').split('\r\n')
|
||
|
|
for line in lines[1:]:
|
||
|
|
if ':' in line:
|
||
|
|
key, _, value = line.partition(':')
|
||
|
|
headers[key.strip().lower()] = value.strip()
|
||
|
|
return headers
|
||
|
|
|
||
|
|
|
||
|
|
def _recv_exactly(sock, n: int) -> bytes:
|
||
|
|
"""Read exactly n bytes from socket."""
|
||
|
|
buf = b''
|
||
|
|
while len(buf) < n:
|
||
|
|
chunk = sock.recv(n - len(buf))
|
||
|
|
if not chunk:
|
||
|
|
raise ConnectionError("Stream closed before reading enough bytes")
|
||
|
|
buf += chunk
|
||
|
|
return buf
|
||
|
|
|
||
|
|
|
||
|
|
def _extract_title(metadata: bytes) -> Optional[str]:
|
||
|
|
"""Extract StreamTitle value from ICY metadata bytes."""
|
||
|
|
text = metadata.decode('utf-8', errors='replace')
|
||
|
|
match = re.search(r"StreamTitle='([^']*)'", text)
|
||
|
|
if match:
|
||
|
|
title = match.group(1).strip()
|
||
|
|
return title if title else None
|
||
|
|
return None
|
||
|
|
|
||
|
|
|
||
|
|
def read_icy_title(url: str, timeout: int = 10) -> Optional[str]:
|
||
|
|
"""
|
||
|
|
Connect to an ICY stream, read one metadata block, and return the StreamTitle.
|
||
|
|
|
||
|
|
Returns the title string or None if it cannot be determined or an error occurs.
|
||
|
|
"""
|
||
|
|
try:
|
||
|
|
sock = _connect(url, timeout)
|
||
|
|
try:
|
||
|
|
headers = _read_headers(sock)
|
||
|
|
metaint_str = headers.get('icy-metaint', '')
|
||
|
|
if not metaint_str:
|
||
|
|
return None
|
||
|
|
|
||
|
|
metaint = int(metaint_str)
|
||
|
|
|
||
|
|
# Discard audio data up to the first metadata block
|
||
|
|
_recv_exactly(sock, metaint)
|
||
|
|
|
||
|
|
# Read metadata length byte (actual length = byte_value * 16)
|
||
|
|
meta_len_byte = _recv_exactly(sock, 1)[0]
|
||
|
|
meta_len = meta_len_byte * 16
|
||
|
|
|
||
|
|
if meta_len == 0:
|
||
|
|
return None
|
||
|
|
|
||
|
|
metadata = _recv_exactly(sock, meta_len)
|
||
|
|
return _extract_title(metadata)
|
||
|
|
finally:
|
||
|
|
sock.close()
|
||
|
|
except Exception:
|
||
|
|
return None
|
||
|
|
|
||
|
|
|
||
|
|
def stream_icy_metadata(url: str):
|
||
|
|
"""
|
||
|
|
Generator that continuously yields StreamTitle values from an ICY stream.
|
||
|
|
|
||
|
|
Connects to the stream, reads audio/metadata chunks in a loop, and yields
|
||
|
|
each non-empty StreamTitle as it arrives. Stops on any socket error or
|
||
|
|
server disconnect.
|
||
|
|
"""
|
||
|
|
try:
|
||
|
|
sock = _connect(url, timeout=15)
|
||
|
|
except Exception:
|
||
|
|
return
|
||
|
|
|
||
|
|
try:
|
||
|
|
headers = _read_headers(sock)
|
||
|
|
metaint_str = headers.get('icy-metaint', '')
|
||
|
|
if not metaint_str:
|
||
|
|
return
|
||
|
|
|
||
|
|
metaint = int(metaint_str)
|
||
|
|
|
||
|
|
while True:
|
||
|
|
# Discard audio data
|
||
|
|
_recv_exactly(sock, metaint)
|
||
|
|
|
||
|
|
# Read metadata length byte
|
||
|
|
meta_len_byte = _recv_exactly(sock, 1)[0]
|
||
|
|
meta_len = meta_len_byte * 16
|
||
|
|
|
||
|
|
if meta_len > 0:
|
||
|
|
metadata = _recv_exactly(sock, meta_len)
|
||
|
|
title = _extract_title(metadata)
|
||
|
|
if title:
|
||
|
|
yield title
|
||
|
|
except Exception:
|
||
|
|
return
|
||
|
|
finally:
|
||
|
|
try:
|
||
|
|
sock.close()
|
||
|
|
except Exception:
|
||
|
|
pass
|