diora-web/radio/views.py
marwin 85776390f6
All checks were successful
Build and push Docker image / build (push) Successful in 14s
Test / test (push) Successful in 16s
Open HTTP streams in minimal standalone player tab
Instead of trying a HTTPS upgrade (which fails for IP-based streams):
- playStation() detects http:// URL on https:// page, opens /radio/stream-player/
  with url, name, vol as query params, then returns — main stream is already
  stopped by the stopPlayback(false) call at the top of playStation()
- New view stream_player renders a standalone minimal player page
- Template: auto-plays on load, correct volume from URL param, volume changes
  synced back to localStorage so main window picks it up next time,
  live track metadata via SSE, tab title updates on track change,
  close-tab button

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-21 17:50:12 +01:00

592 lines
20 KiB
Python

import json
import socket
import ssl as ssl_module
import time
import urllib.parse
from datetime import datetime
from django.core.serializers.json import DjangoJSONEncoder
import requests
from django.db.models import Count
from django.conf import settings
from django.http import (
HttpResponse,
HttpResponseBadRequest,
JsonResponse,
StreamingHttpResponse,
)
from django.shortcuts import render
from django.utils import timezone
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_http_methods
from .icy import stream_icy_metadata
from . import lastfm as lastfm_module
from .models import SavedStation, StationPlay, TrackHistory, FocusSession, FeaturedStation
# ---------------------------------------------------------------------------
# Index / player
# ---------------------------------------------------------------------------
def index(request):
saved_stations = []
history = []
if request.user.is_authenticated:
play_counts = {
sp['station_url']: sp['count']
for sp in StationPlay.objects
.filter(user=request.user)
.values('station_url')
.annotate(count=Count('id'))
}
saved_stations = list(
request.user.saved_stations.values(
'id', 'name', 'url', 'bitrate', 'country', 'tags', 'favicon_url', 'is_favorite'
)
)
for s in saved_stations:
s['play_count'] = play_counts.get(s['url'], 0)
history = list(
request.user.track_history.values(
'id', 'station_name', 'track', 'played_at', 'scrobbled'
)[:50]
)
# Convert datetime to ISO string for JSON serialisation in template
for entry in history:
entry['played_at'] = entry['played_at'].isoformat()
featured = list(
FeaturedStation.objects.filter(active=True).values(
'id', 'name', 'url', 'description', 'favicon_url', 'tags'
)
)
initial_podcast_feeds = []
if request.user.is_authenticated:
from podcasts.models import PodcastFeed
initial_podcast_feeds = list(
request.user.podcast_feeds.values('id', 'title', 'artwork_url', 'rss_url')
)
focus_station = None # null in JS means "never configured, use default"
encrypted_bg = {}
if request.user.is_authenticated:
p = getattr(request.user, 'profile', None)
if p:
if p.focus_station_url or p.focus_station_name:
focus_station = {'url': p.focus_station_url, 'name': p.focus_station_name}
if p.background_encrypted:
encrypted_bg = {
'iv': p.background_iv,
'ciphertext': p.background_encrypted,
'mime': p.background_mime,
}
context = {
'saved_stations': saved_stations,
'saved_stations_json': json.dumps(saved_stations, cls=DjangoJSONEncoder),
'history': history,
'amazon_enabled': settings.AMAZON_AFFILIATE_ENABLED,
'featured_stations': featured,
'featured_stations_json': json.dumps(featured, cls=DjangoJSONEncoder),
'initial_podcast_feeds': json.dumps(initial_podcast_feeds, cls=DjangoJSONEncoder),
'focus_station': focus_station,
'focus_station_json': json.dumps(focus_station, cls=DjangoJSONEncoder),
'encrypted_bg': encrypted_bg,
'encrypted_bg_json': json.dumps(encrypted_bg, cls=DjangoJSONEncoder) if encrypted_bg else '',
}
return render(request, 'radio/player.html', context)
# ---------------------------------------------------------------------------
# SSE metadata stream
# ---------------------------------------------------------------------------
@csrf_exempt
def sse_metadata(request):
url = request.GET.get('url', '').strip()
if not url:
return HttpResponseBadRequest('url parameter required')
def event_stream():
last_title = None
try:
for title in stream_icy_metadata(url):
if title != last_title:
last_title = title
yield f"data: {json.dumps({'track': title})}\n\n"
except Exception:
yield f"data: {json.dumps({'error': 'stream ended'})}\n\n"
response = StreamingHttpResponse(event_stream(), content_type='text/event-stream')
response['Cache-Control'] = 'no-cache'
response['X-Accel-Buffering'] = 'no'
return response
# ---------------------------------------------------------------------------
# Record track
# ---------------------------------------------------------------------------
@csrf_exempt
@require_http_methods(['POST'])
def record_track(request):
try:
body = json.loads(request.body)
except (json.JSONDecodeError, ValueError):
return JsonResponse({'error': 'invalid JSON'}, status=400)
station_name = body.get('station_name', '').strip()
track = body.get('track', '').strip()
do_scrobble = body.get('scrobble', False)
if not station_name or not track:
return JsonResponse({'error': 'station_name and track required'}, status=400)
# Ensure session exists for anonymous users
if not request.session.session_key:
request.session.create()
history_entry = TrackHistory(
station_name=station_name,
track=track,
)
if request.user.is_authenticated:
history_entry.user = request.user
else:
history_entry.session_key = request.session.session_key
# Attempt scrobble before saving (so we can mark it)
scrobbled = False
if (
do_scrobble
and request.user.is_authenticated
and hasattr(request.user, 'profile')
and request.user.profile.has_lastfm()
and request.user.profile.lastfm_scrobble
):
try:
artist, title = lastfm_module.parse_track(track)
if not artist:
artist = station_name
lastfm_module.scrobble(
session_key=request.user.profile.lastfm_session_key,
artist=artist,
title=title,
timestamp=int(time.time()),
)
scrobbled = True
except Exception:
pass # Scrobble failure is non-fatal
history_entry.scrobbled = scrobbled
history_entry.save()
return JsonResponse({'ok': True, 'scrobbled': scrobbled})
# ---------------------------------------------------------------------------
# Affiliate links
# ---------------------------------------------------------------------------
def affiliate_links(request):
if not settings.AMAZON_AFFILIATE_ENABLED:
return JsonResponse({'amazon_url': None, 'itunes_data': {}})
track = request.GET.get('track', '').strip()
if not track:
return JsonResponse({'error': 'track parameter required'}, status=400)
itunes_data = {}
try:
itunes_url = (
f"https://itunes.apple.com/search"
f"?term={urllib.parse.quote(track)}&media=music&limit=1"
)
resp = requests.get(itunes_url, timeout=5)
resp.raise_for_status()
results = resp.json().get('results', [])
if results:
r = results[0]
itunes_data = {
'name': r.get('trackName', ''),
'artist': r.get('artistName', ''),
'album': r.get('collectionName', ''),
'artwork': r.get('artworkUrl100', ''),
}
except Exception:
pass
amazon_url = (
f"https://www.amazon.com/s"
f"?k={urllib.parse.quote(track)}"
f"&i=digital-music"
f"&tag={settings.AMAZON_AFFILIATE_TAG}"
)
return JsonResponse({'amazon_url': amazon_url, 'itunes_data': itunes_data})
# ---------------------------------------------------------------------------
# Save station
# ---------------------------------------------------------------------------
@csrf_exempt
@require_http_methods(['POST'])
def save_station(request):
if not request.user.is_authenticated:
return JsonResponse({'error': 'authentication required'}, status=401)
try:
body = json.loads(request.body)
except (json.JSONDecodeError, ValueError):
return JsonResponse({'error': 'invalid JSON'}, status=400)
name = body.get('name', '').strip()
url = body.get('url', '').strip()
if not name or not url:
return JsonResponse({'error': 'name and url required'}, status=400)
station, created = SavedStation.objects.get_or_create(
user=request.user,
url=url,
defaults={
'name': name,
'bitrate': body.get('bitrate', ''),
'country': body.get('country', ''),
'tags': body.get('tags', ''),
'favicon_url': body.get('favicon_url', ''),
},
)
if not created:
# Update mutable fields in case station details changed
station.name = name
station.bitrate = body.get('bitrate', station.bitrate)
station.country = body.get('country', station.country)
station.tags = body.get('tags', station.tags)
station.favicon_url = body.get('favicon_url', station.favicon_url)
station.save()
return JsonResponse({'ok': True, 'id': station.id, 'created': created})
# ---------------------------------------------------------------------------
# Remove station
# ---------------------------------------------------------------------------
@csrf_exempt
@require_http_methods(['POST'])
def remove_station(request, pk):
if not request.user.is_authenticated:
return JsonResponse({'error': 'authentication required'}, status=401)
try:
station = SavedStation.objects.get(pk=pk, user=request.user)
station.delete()
return JsonResponse({'ok': True})
except SavedStation.DoesNotExist:
return JsonResponse({'error': 'not found'}, status=404)
# ---------------------------------------------------------------------------
# Toggle favorite
# ---------------------------------------------------------------------------
@csrf_exempt
@require_http_methods(['POST'])
def toggle_favorite(request, pk):
if not request.user.is_authenticated:
return JsonResponse({'error': 'authentication required'}, status=401)
try:
station = SavedStation.objects.get(pk=pk, user=request.user)
station.is_favorite = not station.is_favorite
station.save(update_fields=['is_favorite'])
return JsonResponse({'ok': True, 'is_favorite': station.is_favorite})
except SavedStation.DoesNotExist:
return JsonResponse({'error': 'not found'}, status=404)
# ---------------------------------------------------------------------------
# Station play tracking
# ---------------------------------------------------------------------------
@csrf_exempt
@require_http_methods(['POST'])
def start_play(request):
if not request.user.is_authenticated:
return JsonResponse({'error': 'authentication required'}, status=401)
try:
body = json.loads(request.body)
except (json.JSONDecodeError, ValueError):
return JsonResponse({'error': 'invalid JSON'}, status=400)
station_name = body.get('station_name', '').strip()
station_url = body.get('station_url', '').strip()
if not station_name or not station_url:
return JsonResponse({'error': 'station_name and station_url required'}, status=400)
play = StationPlay(
user=request.user,
station_name=station_name,
station_url=station_url,
)
play.save()
return JsonResponse({'ok': True, 'play_id': play.id})
@csrf_exempt
@require_http_methods(['POST'])
def stop_play(request):
try:
body = json.loads(request.body)
except (json.JSONDecodeError, ValueError):
return JsonResponse({'error': 'invalid JSON'}, status=400)
play_id = body.get('play_id')
if play_id is None:
return JsonResponse({'error': 'play_id required'}, status=400)
if not request.user.is_authenticated:
return JsonResponse({'error': 'authentication required'}, status=401)
try:
play = StationPlay.objects.get(id=play_id, user=request.user)
except StationPlay.DoesNotExist:
return JsonResponse({'error': 'not found'}, status=404)
play.ended_at = timezone.now()
play.save(update_fields=['ended_at'])
return JsonResponse({'ok': True})
# ---------------------------------------------------------------------------
# Recommendations
# ---------------------------------------------------------------------------
def _time_context_label(hour, is_weekend):
prefix = 'weekend' if is_weekend else 'weekday'
if 5 <= hour <= 11:
period = 'morning'
elif 12 <= hour <= 16:
period = 'afternoon'
elif 17 <= hour <= 21:
period = 'evening'
else:
period = 'night'
return f'{prefix} {period}'
def recommendations(request):
if not request.user.is_authenticated:
return JsonResponse({'recommendations': [], 'context': ''})
now = datetime.now()
current_hour = now.hour
current_is_weekend = now.weekday() >= 5
# Build ±2 hour window (wrapping around midnight)
hour_window = [(current_hour + i) % 24 for i in range(-2, 3)]
from django.db.models import Count
plays_qs = (
StationPlay.objects
.filter(
user=request.user,
hour_of_day__in=hour_window,
is_weekend=current_is_weekend,
)
.values('station_url', 'station_name')
.annotate(play_count=Count('id'))
.order_by('-play_count')[:5]
)
# Build a lookup of saved stations by URL
saved_by_url = {
s['url']: s
for s in request.user.saved_stations.values('id', 'name', 'url', 'favicon_url')
}
results = []
for entry in plays_qs:
saved = saved_by_url.get(entry['station_url'])
results.append({
'station_name': saved['name'] if saved else entry['station_name'],
'station_url': entry['station_url'],
'play_count': entry['play_count'],
'saved_id': saved['id'] if saved else None,
})
context_label = _time_context_label(current_hour, current_is_weekend)
return JsonResponse({'recommendations': results, 'context': context_label})
# ---------------------------------------------------------------------------
# History JSON
# ---------------------------------------------------------------------------
@csrf_exempt
@require_http_methods(['POST'])
def delete_history_entry(request, pk):
if not request.user.is_authenticated:
return JsonResponse({'error': 'authentication required'}, status=401)
try:
entry = TrackHistory.objects.get(pk=pk, user=request.user)
entry.delete()
return JsonResponse({'ok': True})
except TrackHistory.DoesNotExist:
return JsonResponse({'error': 'not found'}, status=404)
def history_json(request):
if not request.user.is_authenticated:
return JsonResponse({'error': 'authentication required'}, status=401)
entries = list(
request.user.track_history.values(
'station_name', 'track', 'played_at', 'scrobbled'
)[:200]
)
for entry in entries:
entry['played_at'] = entry['played_at'].isoformat()
return JsonResponse(entries, safe=False)
# ---------------------------------------------------------------------------
# Station notes
# ---------------------------------------------------------------------------
@csrf_exempt
@require_http_methods(['POST'])
def save_station_notes(request, pk):
if not request.user.is_authenticated:
return JsonResponse({'error': 'authentication required'}, status=401)
try:
station = SavedStation.objects.get(pk=pk, user=request.user)
except SavedStation.DoesNotExist:
return JsonResponse({'error': 'not found'}, status=404)
try:
body = json.loads(request.body)
except (json.JSONDecodeError, ValueError):
return JsonResponse({'error': 'invalid JSON'}, status=400)
station.notes = body.get('notes', '').strip()
station.save()
return JsonResponse({'ok': True})
# ---------------------------------------------------------------------------
# Focus session
# ---------------------------------------------------------------------------
@csrf_exempt
@require_http_methods(['POST'])
def record_focus_session(request):
if not request.user.is_authenticated:
return JsonResponse({'error': 'authentication required'}, status=401)
try:
body = json.loads(request.body)
except (json.JSONDecodeError, ValueError):
return JsonResponse({'error': 'invalid JSON'}, status=400)
session = FocusSession.objects.create(
user=request.user,
station_name=body.get('station_name', ''),
duration_minutes=body.get('duration_minutes', 25),
)
return JsonResponse({'ok': True, 'id': session.id})
def focus_stats(request):
if not request.user.is_authenticated:
return JsonResponse({'today_sessions': 0, 'today_minutes': 0, 'sessions': []})
from django.utils.timezone import now, localtime
from django.db.models import Sum
today = localtime(now()).date()
qs = request.user.focus_sessions.all()
today_qs = qs.filter(completed_at__date=today)
today_count = today_qs.count()
today_minutes = today_qs.aggregate(t=Sum('duration_minutes'))['t'] or 0
recent = list(qs.values('station_name', 'completed_at', 'duration_minutes')[:50])
for s in recent:
s['completed_at'] = s['completed_at'].isoformat()
return JsonResponse({
'today_sessions': today_count,
'today_minutes': today_minutes,
'sessions': recent,
})
# ---------------------------------------------------------------------------
# M3U import
# ---------------------------------------------------------------------------
@require_http_methods(['POST'])
def import_m3u(request):
if not request.user.is_authenticated:
return JsonResponse({'error': 'authentication required'}, status=401)
f = request.FILES.get('file')
if not f:
return JsonResponse({'error': 'no file uploaded'}, status=400)
if not f.name.lower().endswith(('.m3u', '.m3u8')):
return JsonResponse({'error': 'file must be .m3u or .m3u8'}, status=400)
content = f.read().decode('utf-8', errors='replace')
lines = content.splitlines()
stations = []
pending_name = None
for line in lines:
line = line.strip()
if line.startswith('#EXTINF'):
comma = line.find(',')
pending_name = line[comma + 1:].strip() if comma != -1 else None
elif line and not line.startswith('#'):
parsed = urllib.parse.urlparse(line)
name = pending_name or parsed.netloc or line
stations.append({'name': name, 'url': line})
pending_name = None
if not stations:
return JsonResponse({'error': 'no stations found in file'}, status=400)
added = 0
skipped = 0
for s in stations:
_, created = SavedStation.objects.get_or_create(
user=request.user,
url=s['url'],
defaults={'name': s['name']},
)
if created:
added += 1
else:
skipped += 1
return JsonResponse({'ok': True, 'added': added, 'skipped': skipped})
# ---------------------------------------------------------------------------
# Minimal HTTP stream player (standalone tab for mixed-content streams)
# ---------------------------------------------------------------------------
def stream_player(request):
url = request.GET.get('url', '').strip()
name = request.GET.get('name', '').strip()
vol = request.GET.get('vol', '204').strip()
try:
vol = max(0, min(255, int(vol)))
except ValueError:
vol = 204
return render(request, 'radio/stream_player.html', {'stream_url': url, 'stream_name': name, 'stream_vol': vol})