import base64 import json from datetime import datetime, timezone from functools import wraps from django.contrib.auth import authenticate, login, logout from django.http import JsonResponse from django.views.decorators.csrf import csrf_exempt from django.views.decorators.http import require_http_methods from podcasts.models import PodcastFeed, PodcastEpisode, EpisodeProgress from podcasts.views import _refresh_feed from .models import GpodderDevice, GpodderSubscriptionChange, GpodderEpisodeAction def _basic_auth(request): auth = request.META.get('HTTP_AUTHORIZATION', '') if not auth.startswith('Basic '): return None try: credentials = base64.b64decode(auth[6:]).decode('utf-8') username, _, password = credentials.partition(':') user = authenticate(request, username=username, password=password) if user: login(request, user) return user except Exception: return None def _resolve_user(request, username): if request.user.is_authenticated and request.user.username == username: return request.user user = _basic_auth(request) if user and user.username == username: return user return None def gpodder_auth(func): @wraps(func) def wrapper(request, username, *args, **kwargs): user = _resolve_user(request, username) if not user: return JsonResponse({'error': 'unauthorized'}, status=401) return func(request, username, *args, **kwargs) return wrapper def _now_ts(): return int(datetime.now(timezone.utc).timestamp()) def _get_or_create_device(user, device_id): device, _ = GpodderDevice.objects.get_or_create( user=user, device_id=device_id, defaults={'caption': device_id, 'type': 'other'} ) return device # --------------------------------------------------------------------------- # Auth # --------------------------------------------------------------------------- @csrf_exempt @require_http_methods(['GET', 'POST']) def auth_login(request, username): user = _resolve_user(request, username) if not user: return JsonResponse({'error': 'unauthorized'}, status=401) return JsonResponse({'username': user.username}) @csrf_exempt @require_http_methods(['GET', 'POST']) def auth_logout(request, username): logout(request) return JsonResponse({}) # --------------------------------------------------------------------------- # Devices # --------------------------------------------------------------------------- @csrf_exempt @gpodder_auth def devices_list(request, username): devices = GpodderDevice.objects.filter(user=request.user) return JsonResponse([ {'id': d.device_id, 'caption': d.caption, 'type': d.type, 'subscriptions': 0} for d in devices ], safe=False) @csrf_exempt @gpodder_auth def device_update(request, username, deviceid): device = _get_or_create_device(request.user, deviceid) if request.method == 'POST': try: body = json.loads(request.body or '{}') if 'caption' in body: device.caption = body['caption'][:100] if 'type' in body: device.type = body['type'][:20] device.save() except (json.JSONDecodeError, ValueError): pass return JsonResponse({'id': device.device_id, 'caption': device.caption, 'type': device.type}) # --------------------------------------------------------------------------- # Subscriptions # --------------------------------------------------------------------------- def _subscription_changes_since(user, since_ts): qs = GpodderSubscriptionChange.objects.filter(user=user) if since_ts: try: since_dt = datetime.fromtimestamp(int(since_ts), tz=timezone.utc) qs = qs.filter(timestamp__gt=since_dt) except (ValueError, OSError): pass added = list(qs.filter(action='add').values_list('url', flat=True).distinct()) removed = list(qs.filter(action='remove').values_list('url', flat=True).distinct()) # If a URL appears in both, the latest action wins — keep only consistent entries added = [u for u in added if u not in removed] removed = [u for u in removed if u not in added] return added, removed @csrf_exempt @gpodder_auth def subscriptions_by_device(request, username, deviceid): _get_or_create_device(request.user, deviceid) if request.method == 'GET': since = request.GET.get('since') if since: added, removed = _subscription_changes_since(request.user, since) return JsonResponse({'add': added, 'remove': removed, 'timestamp': _now_ts(), 'update_urls': []}) else: urls = list(PodcastFeed.objects.filter(user=request.user).values_list('rss_url', flat=True)) return JsonResponse(urls, safe=False) elif request.method == 'POST': try: body = json.loads(request.body or '{}') except (json.JSONDecodeError, ValueError): return JsonResponse({'error': 'invalid JSON'}, status=400) for url in body.get('add', []): if not url: continue feed, created = PodcastFeed.objects.get_or_create( user=request.user, rss_url=url, defaults={'title': url} ) if created: try: _refresh_feed(feed) except Exception: pass GpodderSubscriptionChange.objects.create(user=request.user, url=url, action='add') for url in body.get('remove', []): if not url: continue PodcastFeed.objects.filter(user=request.user, rss_url=url).delete() GpodderSubscriptionChange.objects.create(user=request.user, url=url, action='remove') return JsonResponse({'timestamp': _now_ts(), 'update_urls': []}) @csrf_exempt @gpodder_auth def subscriptions_all(request, username): since = request.GET.get('since') if since: added, removed = _subscription_changes_since(request.user, since) return JsonResponse({'add': added, 'remove': removed, 'timestamp': _now_ts(), 'update_urls': []}) else: urls = list(PodcastFeed.objects.filter(user=request.user).values_list('rss_url', flat=True)) return JsonResponse(urls, safe=False) # --------------------------------------------------------------------------- # Episode actions # --------------------------------------------------------------------------- @csrf_exempt @gpodder_auth def episode_actions(request, username): if request.method == 'POST': try: actions = json.loads(request.body or '[]') except (json.JSONDecodeError, ValueError): return JsonResponse({'error': 'invalid JSON'}, status=400) if not isinstance(actions, list): actions = [actions] for a in actions: if not isinstance(a, dict): continue podcast_url = a.get('podcast', '') episode_url = a.get('episode', '') action = a.get('action', '') if not (podcast_url and episode_url and action): continue ts_str = a.get('timestamp', '') try: ts = datetime.fromisoformat(ts_str.replace('Z', '+00:00')) if ts_str else datetime.now(timezone.utc) except (ValueError, AttributeError): ts = datetime.now(timezone.utc) device_id = a.get('device', '') device = _get_or_create_device(request.user, device_id) if device_id else None GpodderEpisodeAction.objects.create( user=request.user, device=device, podcast=podcast_url, episode=episode_url, action=action, timestamp=ts, started=a.get('started'), position=a.get('position'), total=a.get('total'), ) # Sync playback position to EpisodeProgress if action == 'play' and a.get('position') is not None: try: ep = PodcastEpisode.objects.filter(audio_url=episode_url).first() if ep: pos = int(a['position']) dur = ep.duration_seconds or 0 played = dur > 0 and pos >= dur * 0.9 EpisodeProgress.objects.update_or_create( user=request.user, episode=ep, defaults={'position_seconds': pos, 'played': played} ) except Exception: pass return JsonResponse({'timestamp': _now_ts()}) elif request.method == 'GET': since = request.GET.get('since') qs = GpodderEpisodeAction.objects.filter(user=request.user) if since: try: since_dt = datetime.fromtimestamp(int(since), tz=timezone.utc) qs = qs.filter(timestamp__gt=since_dt) except (ValueError, OSError): pass return JsonResponse({ 'actions': [ { 'podcast': a.podcast, 'episode': a.episode, 'device': a.device.device_id if a.device else '', 'action': a.action, 'timestamp': a.timestamp.strftime('%Y-%m-%dT%H:%M:%S'), 'started': a.started, 'position': a.position, 'total': a.total, } for a in qs[:500] ], 'timestamp': _now_ts(), })