"""FastAPI service for goodNews. A read-only JSON API over the ingestion database, plus a small static site that consumes it. The same endpoints back both the website and any future companion app; the auto-generated OpenAPI docs at /docs are that shared contract. Run with the bundled CLI: goodnews serve Or directly: uvicorn goodnews.api:app --host 0.0.0.0 --port 8000 The database path comes from GOODNEWS_DB (falling back to the repo's data dir), so the API and CLI always read the same file. """ from __future__ import annotations import csv import hashlib import hmac import io import json import logging import os import re import secrets import sqlite3 import threading import time from collections import Counter from contextlib import contextmanager from datetime import datetime, timezone from pathlib import Path from fastapi import BackgroundTasks, FastAPI, HTTPException, Query, Request, Response from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse, HTMLResponse, RedirectResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel from . import art, auth, bloom, daily, email_send, feeds, games, newsimg, oauth_google, onthisday, publishing, queries, quote, readtime, share, sources, summarize, wotd from .localtime import local_today from .markup import reply_html_to_text, sanitize_reply_html from .db import connect from .filters import filter_articles, prefs_from_json from .hero import safe_to_lead from .llm import LocalModelClient from .moods import MOODS, mood_filter from .lanes import build_lane_pool from .paywall import is_paywalled, is_paywalled_for_source from .taxonomy import FAMILIES, FLAVORS, TOPICS # Edge-cache directives for GLOBAL endpoints — responses that depend only on the # URL, never the session/user, so Cloudflare may safely serve one visitor's copy # to another (this is what makes "Gathering the good news…" resolve from the edge # instead of a round-trip to the residential origin). The personalization # boundary is hard: anything session- or filter-specific stays private/no-store. _EDGE_CONFIG = "public, max-age=0, s-maxage=900, stale-while-revalidate=600" # static config (moods/categories) _EDGE_DERIVED = "public, max-age=0, s-maxage=120, stale-while-revalidate=120" # global, data-derived (lanes/families) _EDGE_FEED = "public, max-age=0, s-maxage=45, stale-while-revalidate=30" # global feed (URL-keyed, shareable only) _PRIVATE = "private, no-store" # never share across users log = logging.getLogger("goodnews.api") ROOT = Path(__file__).resolve().parents[1] DEFAULT_DB = ROOT / "data" / "goodnews.sqlite3" # Prefer the built SvelteKit site; fall back to the legacy single-page harness. FRONTEND_DIR = ROOT / "frontend" / "build" LEGACY_STATIC = Path(__file__).resolve().parent / "static" STATIC_DIR = FRONTEND_DIR if FRONTEND_DIR.is_dir() else LEGACY_STATIC def db_path() -> Path: return Path(os.environ.get("GOODNEWS_DB", str(DEFAULT_DB))) # --- Auth helpers ----------------------------------------------------------- PUBLIC_BASE_URL = os.environ.get("GOODNEWS_PUBLIC_BASE_URL", "https://upbeatbytes.com").rstrip("/") SESSION_COOKIE = "ub_session" OAUTH_COOKIE = "ub_oauth" SESSION_MAX_AGE = int(auth.SESSION_TTL.total_seconds()) SESSION_SECRET = os.environ.get("GOODNEWS_SESSION_SECRET", "dev-insecure-secret") # Emails that are always admins (normalized), in addition to users.is_admin. ADMIN_EMAILS = {e.strip().lower() for e in os.environ.get("GOODNEWS_ADMIN_EMAILS", "").split(",") if e.strip()} # Secure cookies in production (https); off for http (local/test) so they round-trip. _COOKIE_SECURE = PUBLIC_BASE_URL.startswith("https") _EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$") def _sign(value: str) -> str: sig = hmac.new(SESSION_SECRET.encode(), value.encode(), hashlib.sha256).hexdigest() return f"{value}.{sig}" def _unsign(signed: str | None) -> str | None: if not signed or "." not in signed: return None value, _, sig = signed.rpartition(".") expected = hmac.new(SESSION_SECRET.encode(), value.encode(), hashlib.sha256).hexdigest() return value if hmac.compare_digest(sig, expected) else None def _google_redirect_uri() -> str: return f"{PUBLIC_BASE_URL}/api/auth/google/callback" def _session_token_from_request(request: Request) -> str | None: """Web sends the session as an httpOnly cookie; the app sends a bearer token.""" cookie = request.cookies.get(SESSION_COOKIE) if cookie: return cookie authz = request.headers.get("Authorization", "") return authz[7:].strip() if authz.startswith("Bearer ") else None def _current_user(conn: sqlite3.Connection, request: Request) -> sqlite3.Row | None: user = auth.resolve_session(conn, _session_token_from_request(request)) if user: conn.commit() # persist the last_seen touch return user def _require_user(conn: sqlite3.Connection, request: Request) -> sqlite3.Row: user = _current_user(conn, request) if not user: raise HTTPException(status_code=401, detail="Sign in to do that.") return user def _is_admin(user: sqlite3.Row) -> bool: return bool(user["is_admin"]) or auth.normalize_email(user["email"]) in ADMIN_EMAILS def _require_admin(conn: sqlite3.Connection, request: Request) -> sqlite3.Row: user = _require_user(conn, request) if not _is_admin(user): raise HTTPException(status_code=403, detail="Admins only.") return user def _user_out(user: sqlite3.Row) -> dict: return { "id": user["id"], "email": user["email"], "display_name": user["display_name"], "avatar_url": user["avatar_url"], "is_admin": _is_admin(user), "digest_enabled": bool(user["digest_enabled"]), } # Articles whose summary is being generated right now — so concurrent pollers / # scrapers don't each kick off a duplicate LLM call. _summarizing: set[int] = set() # In-process cache of fully-rendered /a/{id} share pages. We're direct-origin (no # CDN), so Cache-Control alone can't shield the box from crawler bursts hitting the # sitemap's article URLs while the LAN LLM / cycle is loading it. Only COMPLETE # pages (summary + explanation present) are cached, so a "still generating" page is # never pinned; a short TTL still picks up edits. Per-process (fine across workers). # INVARIANT: the share page is PUBLIC/anonymous — the cache key is article_id alone. # If /a/{id} ever personalizes (per-viewer content), key by viewer or drop the cache, # or one visitor's variant would be served to another. _SHARE_CACHE: dict[int, tuple[float, str]] = {} _SHARE_TTL = 900.0 # 15 min _SHARE_CACHE_MAX = 512 def _share_cache_get(aid: int) -> str | None: hit = _SHARE_CACHE.get(aid) if hit and (time.monotonic() - hit[0]) < _SHARE_TTL: return hit[1] return None def _share_cache_put(aid: int, html: str) -> None: if len(_SHARE_CACHE) >= _SHARE_CACHE_MAX: oldest = min(_SHARE_CACHE, key=lambda k: _SHARE_CACHE[k][0]) _SHARE_CACHE.pop(oldest, None) _SHARE_CACHE[aid] = (time.monotonic(), html) def _run_summary(article_id: int) -> None: try: with get_conn() as conn: summarize.generate_summary(conn, article_id) except Exception: pass finally: _summarizing.discard(article_id) # Publishing Desk: the "Build queue" job runs in the background (one bounded # comparative LLM call can be slow); the admin polls the queue endpoint. Mirrors the # summary-kick pattern — never holds an HTTP request open on the model. The lock makes # the check-and-set atomic so two rapid clicks can't launch two expensive jobs. _publish_build: dict = {"building": False, "result": None, "error": None} _publish_build_lock = threading.Lock() def _run_publish_build() -> None: try: try: client = LocalModelClient.from_env() except Exception: # noqa: BLE001 — model down → deterministic fallback inside build_queue client = None with get_conn() as conn: res = publishing.build_queue(conn, PUBLIC_BASE_URL, client=client) _publish_build.update(result=res, error=None) except Exception as exc: # noqa: BLE001 — surface, don't crash the worker _publish_build.update(error=str(exc)[:300]) finally: _publish_build["building"] = False def _kick_summary(article_id: int, background_tasks: BackgroundTasks) -> None: if article_id in _summarizing: return _summarizing.add(article_id) background_tasks.add_task(_run_summary, article_id) def _feedback_email_safe(addr: str, category: str, message: str, contact: str | None, who: str) -> None: try: email_send.send_feedback(addr, category, message, contact, who) except Exception: pass def _send_link_safe(email: str, link: str) -> None: """Send the magic link, swallowing failures (runs off the request path).""" try: email_send.send_magic_link(email, link) except Exception: pass # don't crash the worker; never surfaced to the caller anyway def _set_session_cookie(response: Response, token: str) -> None: response.set_cookie( SESSION_COOKIE, token, max_age=SESSION_MAX_AGE, httponly=True, secure=_COOKIE_SECURE, samesite="lax", path="/", ) @contextmanager def get_conn(): conn = connect(db_path()) try: yield conn finally: conn.close() def _prefs_sql_kw(fp, now) -> dict: """Categorical prefs → queries.feed keyword filters (avoid-terms stay Python).""" return dict( include_topics=fp.include_topics or None, include_flavors=fp.include_flavors or None, mute_topics=list(fp.muted_topics(now)) or None, mute_flavors=list(fp.muted_flavors(now)) or None, max_cortisol=fp.max_cortisol, max_ragebait=fp.max_ragebait, ) def _pick_lead(items: list[dict]) -> list[dict]: """Lead with a gentle, readable, ideally illustrated story. Preference order: gentle + readable + has an image, then gentle + readable, then gentle, then leave the order alone. Charged/paywalled/imageless stories still appear in the set — they just don't lead. """ def gentle(a: dict) -> bool: return safe_to_lead(a) and not is_paywalled_for_source(a.get("canonical_url"), a.get("paywall_override")) for ok in ( lambda a: gentle(a) and bool(a.get("image_url")), gentle, safe_to_lead, ): for i, a in enumerate(items): if ok(a): return items if i == 0 else [a, *items[:i], *items[i + 1:]] return items # --- Response models (the companion-app contract) --------------------------- class Category(BaseModel): key: str description: str class CategoriesResponse(BaseModel): topics: list[Category] flavors: list[Category] class CategoryCount(BaseModel): topic: str | None flavor: str | None count: int class Article(BaseModel): id: int title: str description: str | None = None url: str image_url: str | None = None published_at: str | None = None source: str source_id: int | None = None topic: str | None = None flavor: str | None = None accepted: bool rank_score: int | None = None reason_code: str | None = None reason_text: str | None = None model_name: str | None = None rank: int | None = None # position within a brief, when applicable paywalled: bool = False tags: list[str] = [] summary: str | None = None # our own cached summary (present on the brief) source_read_minutes: int | None = None # ~minutes to read the FULL source article (null = unknown) # Subject geography (present on feed rows; absent/empty on the brief). breadth is # locality|regional|national|multinational|global|unknown; places are ISO codes. geo_breadth: str | None = None geo_confidence: str | None = None geo_places: list[dict] = [] # e.g. [{"country": "US", "state": "NY"}, {"country": "GB", "state": None}] section: str | None = None # 'near' | 'country' | 'world' when a home is set (Closer to Home) @classmethod def from_row(cls, row: dict) -> "Article": raw_tags = row.get("tags") places = [] for tok in (row.get("geo_places") or "").split(","): tok = tok.strip() if not tok: continue cc, _, sc = tok.partition("-") places.append({"country": cc, "state": sc or None}) return cls( section=row.get("__section"), geo_breadth=row.get("geo_breadth"), geo_confidence=row.get("geo_confidence"), geo_places=places, summary=row.get("summary"), source_read_minutes=readtime.source_read_minutes(row.get("source_words")), id=row["id"], title=row["title"], description=row.get("description"), url=row["canonical_url"], # Resolve per the source's image policy: our cached copy, the publisher's URL # (hotlink), or none — so we never re-host an image we haven't cleared. image_url=newsimg.display_url(row["id"], row.get("image_policy"), row.get("image_url")), published_at=row.get("published_at"), source=row["source_name"], source_id=row.get("source_id"), topic=row.get("topic"), flavor=row.get("flavor"), accepted=bool(row.get("accepted")), rank_score=row.get("rank_score"), reason_code=row.get("reason_code"), reason_text=row.get("reason_text"), model_name=row.get("model_name"), rank=row.get("rank"), paywalled=is_paywalled_for_source(row.get("canonical_url"), row.get("paywall_override")), tags=[t for t in (raw_tags.split(",") if raw_tags else []) if t], ) class FeedResponse(BaseModel): topic: str | None flavor: str | None count: int items: list[Article] next_offset: int | None = None # world-tier offset for the next page (Closer to Home paging) class BriefResponse(BaseModel): brief_date: str | None title: str | None generated_at: str | None = None # freshness stamp: changes only when content changes items: list[Article] class RejectedExample(BaseModel): title: str reason: str class JoyAction(BaseModel): action: str # block | unblock | feature | unfeature | delete | edit fields: dict | None = None # for edit: {column: value} class JoyAdd(BaseModel): text: str | None = None # quote / onthisday author: str | None = None # quote work: str | None = None # quote md: str | None = None # onthisday 'MM-DD' year: int | None = None # onthisday summary: str | None = None # onthisday image_url: str | None = None # onthisday page_url: str | None = None # onthisday word: str | None = None # word class Candidate(BaseModel): id: int feed_url: str homepage_url: str | None = None name: str | None = None status: str preview: dict | None = None notes: str | None = None last_previewed_at: str | None = None created_at: str | None = None updated_at: str | None = None class SourcePreview(BaseModel): url: str sampled: int classified: bool accepted: int acceptance_rate: float | None # None when there are no English items to judge (all held) avg_cortisol: float avg_ragebait: float avg_pr_risk: float newest_published: str | None recent_7d: int topic_mix: dict[str, int] flavor_mix: dict[str, int] examples_accepted: list[str] examples_rejected: list[RejectedExample] class WordGuessRequest(BaseModel): variant: str = "5" guess: str n: int = 1 # this guess's position (1-based); the answer is revealed only at n >= max class GameStateBody(BaseModel): game: str variant: str date: str state: dict = {} class PublishStatusBody(BaseModel): status: str draft_text: str | None = None final_text: str | None = None post_url: str | None = None snooze_until: str | None = None class PublishDraftBody(BaseModel): draft_text: str = "" class EntityHandleBody(BaseModel): entity_name: str handle: str profile_url: str | None = None class GameStateItem(BaseModel): game: str variant: str state: dict = {} class GameStateBatchBody(BaseModel): date: str items: list[GameStateItem] = [] class BloomReportBody(BaseModel): word: str = "" date: str | None = None mode: str | None = None format: str | None = None letters: str | None = None reason: str | None = None class BloomOverrideBody(BaseModel): word: str = "" action: str = "allow" # 'allow' | 'block' reason: str | None = None class BloomReportActionBody(BaseModel): action: str = "" # 'approve' | 'block' | 'dismiss' class WordPoolBody(BaseModel): word: str class WordPoolImportBody(BaseModel): text: str = "" words: list[str] = [] class ClientErrorBody(BaseModel): reason: str = "" path: str = "" version: str = "" class WordsearchThemeBody(BaseModel): theme: str words: list[str] = [] id: int | None = None class WordsearchSuggestBody(BaseModel): theme: str existing: list[str] = [] class EmailStartRequest(BaseModel): email: str class TokenVerifyRequest(BaseModel): token: str class UserOut(BaseModel): id: int email: str display_name: str | None = None avatar_url: str | None = None is_admin: bool = False digest_enabled: bool = False class SessionOut(BaseModel): user: UserOut token: str # for non-browser (app) clients; the web SPA uses the cookie class IdsBody(BaseModel): ids: list[int] = [] class ImportBody(BaseModel): seen: list[int] = [] saved: list[int] = [] class PrefsBody(BaseModel): prefs: dict = {} class EventBody(BaseModel): kind: str article_id: int | None = None visitor: str | None = None class FeedbackBody(BaseModel): category: str = "other" message: str = "" email: str | None = None visitor: str | None = None hp: str | None = None # honeypot — bots fill it, humans don't class FeedbackReadBody(BaseModel): read: bool = True class FeedbackReplyBody(BaseModel): html: str = "" # raw editor HTML — sanitized server-side before use class SourceStatusBody(BaseModel): status: str = "active" # active | paused | retired class SourceVisibilityBody(BaseModel): visible: bool = True class SourcePaywallBody(BaseModel): override: str | None = None # None = use domain rule · 'free' · 'paywalled' class SourceImagePolicyBody(BaseModel): policy: str = "remote" # 'cache' · 'remote' (default) · 'none' class CandidateSuggestBody(BaseModel): feed_url: str = "" name: str | None = None class CandidatePromoteBody(BaseModel): default_category: str | None = None active: bool = False # promote-as-paused by default; opt in to activate trust_score: int = 5 pr_risk_score: int = 3 poll_interval_minutes: int = 180 class CandidateRenameBody(BaseModel): name: str = "" class DigestBody(BaseModel): enabled: bool = True class FollowBody(BaseModel): kind: str # 'source' | 'tag' value: str # source id (as text) or tag key class SourceReviewBody(BaseModel): flag: bool = False reason: str | None = None _FEEDBACK_CATEGORIES = {"idea", "concern", "bug", "praise", "other"} # The only event kinds we record. All aggregate, non-personal. # Per-game funnel events (article_id is reused as 0 — no article dimension). Per-game # kinds (not a generic "game_started") so the admin kind-count breakdown shows which # game drives play and, crucially, shares — the growth loop we're instrumenting. _GAME_NAMES = ("word", "wordsearch", "bloom", "match") # arrival = landed on the game via a shared link (utm_source=game_share) — the share # loop's acquisition signal; started/completed/shared are the engagement funnel. _GAME_EVENT_KINDS = {f"{g}_{e}" for g in _GAME_NAMES for e in ("started", "completed", "shared", "arrival")} _EVENT_KINDS = { "visit", "open", "summary_viewed", "full_story", "source_click", "share_ub", "copy_source", "native_share", "not_today", "less_like_this", "hide_topic", "replace_used", "replace_none", "paywall_replace", "paywalled_source_open", "engaged", # genuine engagement: ~8s visible + a real gesture (vs. a raw visit) "client_error", # boot-failure seatbelt beacon (blank-screen risk signal) } | _GAME_EVENT_KINDS def _fts_query(q: str) -> str: """Raw search box → safe FTS5 query: alnum terms only (no operator/quote injection), each prefix-matched and AND'd together. '' when nothing usable.""" terms = re.findall(r"[A-Za-z0-9]+", q or "")[:8] return " ".join(f"{t}*" for t in terms) def _visitor_hash(token: str | None) -> str: token = (token or "").strip()[:200] if not token: return "" return hashlib.sha256(f"{SESSION_SECRET}:{token}".encode()).hexdigest() # --- App -------------------------------------------------------------------- def create_app() -> FastAPI: app = FastAPI( title="goodNews API", version="0.1.0", description="Constructive, uplifting news — metadata and links only.", ) # The website and companion app may live on other origins; allow them. app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["GET", "POST"], allow_headers=["*"], ) @app.get("/healthz") def healthz() -> dict: # Read-only: the schema is owned by the ingestion CLI, so the API never # writes (it can run as a read-only replica against a shared DB). try: with get_conn() as conn: scored = conn.execute("SELECT COUNT(*) FROM article_scores").fetchone()[0] except sqlite3.Error: scored = 0 return {"status": "ok", "scored_articles": scored} # --- Auth: passwordless magic link (Google added in Phase 2) ---------- @app.post("/api/auth/email/start") def auth_email_start(body: EmailStartRequest, background_tasks: BackgroundTasks) -> dict: email = auth.normalize_email(body.email) if not _EMAIL_RE.match(email): raise HTTPException(status_code=422, detail="Please enter a valid email address.") link = None with get_conn() as conn: # Light abuse guard: cap recent tokens per address (still reply OK). recent = conn.execute( "SELECT COUNT(*) FROM login_tokens WHERE email = ? " "AND created_at > datetime('now', '-10 minutes')", (email,), ).fetchone()[0] if recent < 5: raw = auth.create_login_token(conn, email) conn.commit() link = f"{PUBLIC_BASE_URL}/auth/verify?token={raw}" # Hand the (slow) SMTP send to a background task so the request returns # immediately. Reply is always identical (no account enumeration). if link: background_tasks.add_task(_send_link_safe, email, link) return {"ok": True} @app.post("/api/auth/email/verify", response_model=SessionOut) def auth_email_verify(body: TokenVerifyRequest, request: Request, response: Response) -> SessionOut: with get_conn() as conn: email = auth.consume_login_token(conn, body.token) if not email: conn.commit() raise HTTPException(status_code=400, detail="This sign-in link is invalid or has expired.") user_id = auth.find_or_create_user(conn, email, "email", email) token = auth.create_session(conn, user_id, user_agent=request.headers.get("User-Agent")) conn.commit() user = auth.get_user(conn, user_id) _set_session_cookie(response, token) return SessionOut(user=UserOut(**_user_out(user)), token=token) @app.get("/api/auth/me", response_model=UserOut | None) def auth_me(request: Request) -> UserOut | None: with get_conn() as conn: user = _current_user(conn, request) return UserOut(**_user_out(user)) if user else None @app.post("/api/account/digest") def account_digest(body: DigestBody, request: Request) -> dict: with get_conn() as conn: user = _current_user(conn, request) if not user: raise HTTPException(status_code=401, detail="sign in required") token = user["digest_unsub_token"] if body.enabled and not token: token = secrets.token_urlsafe(18) conn.execute( "UPDATE users SET digest_enabled = ?, digest_unsub_token = ? WHERE id = ?", (1 if body.enabled else 0, token, user["id"]), ) conn.commit() return {"ok": True, "digest_enabled": body.enabled} def _do_unsubscribe(u: int, t: str) -> bool: with get_conn() as conn: row = conn.execute("SELECT digest_unsub_token FROM users WHERE id = ?", (u,)).fetchone() if row and row["digest_unsub_token"] and hmac.compare_digest(row["digest_unsub_token"], t): conn.execute("UPDATE users SET digest_enabled = 0 WHERE id = ?", (u,)) conn.commit() return True return False @app.post("/api/digest/unsubscribe") def digest_unsubscribe_oneclick(u: int = Query(...), t: str = Query(...)) -> dict: # RFC 8058 one-click: the mailbox provider POSTs here; just do it + 200. _do_unsubscribe(u, t) return {"ok": True} @app.get("/api/digest/unsubscribe", response_class=HTMLResponse) def digest_unsubscribe(u: int = Query(...), t: str = Query(...)) -> HTMLResponse: # One-click, no login: match the per-user token, then turn the digest off. ok = _do_unsubscribe(u, t) msg = ( "You’re unsubscribed from the daily digest. No hard feelings — " "upbeatBytes is always here when you want it." if ok else "That unsubscribe link looks invalid or expired. You can manage the " "digest from your account settings." ) html = ( '' '
' '

upbeatBytes

' f'

{msg}

' '

← Back to upbeatBytes

' ) return HTMLResponse(html) @app.post("/api/auth/logout") def auth_logout(request: Request, response: Response) -> dict: with get_conn() as conn: auth.revoke_session(conn, _session_token_from_request(request)) conn.commit() response.delete_cookie(SESSION_COOKIE, path="/") return {"ok": True} # --- Auth: Google (OAuth 2.0 / OIDC) ---------------------------------- @app.get("/api/auth/google/start") def google_start() -> RedirectResponse: if not oauth_google.configured(): raise HTTPException(status_code=503, detail="Google sign-in isn't configured.") state = secrets.token_urlsafe(24) verifier, challenge = oauth_google.new_pkce() url = oauth_google.auth_url(_google_redirect_uri(), state, challenge) resp = RedirectResponse(url, status_code=302) # Bind the flow to this browser; read back (and CSRF-checked) on callback. resp.set_cookie( OAUTH_COOKIE, _sign(f"{state}:{verifier}"), max_age=600, httponly=True, secure=_COOKIE_SECURE, samesite="lax", path="/", ) return resp @app.get("/api/auth/google/callback") def google_callback( request: Request, code: str | None = None, state: str | None = None, error: str | None = None, ) -> RedirectResponse: # The user always sees the same generic error=google (no detail leaked), # but we log WHY internally so device/host-specific failures (e.g. a www # vs apex cookie loss, a state mismatch, a token-exchange error) are # diagnosable instead of all looking identical. def fail(reason: str, exc: Exception | None = None) -> RedirectResponse: host = request.headers.get("Host", "?") if exc is not None: log.warning("google callback failed: %s (host=%s): %s", reason, host, exc) else: log.warning("google callback failed: %s (host=%s)", reason, host) return RedirectResponse(f"{PUBLIC_BASE_URL}/auth/verify?error=google", status_code=302) if error: return fail(f"provider_error:{error}") if not code or not state: return fail("missing_code_or_state") saved = _unsign(request.cookies.get(OAUTH_COOKIE)) if not saved: # Most likely the host-only ub_oauth cookie was set on a different # host than this callback (www vs apex). Canonicalizing www→apex at # the edge prevents this. return fail("missing_oauth_cookie") saved_state, _, verifier = saved.partition(":") if not hmac.compare_digest(saved_state, state): return fail("state_mismatch") try: tokens = oauth_google.exchange_code(code, _google_redirect_uri(), verifier) info = oauth_google.verify_id_token(tokens["id_token"]) if not info.get("picture") and tokens.get("access_token"): info["picture"] = oauth_google.fetch_userinfo(tokens["access_token"]).get("picture") except Exception as exc: # noqa: BLE001 — log reason, show generic error return fail("token_exchange_or_verify", exc) with get_conn() as conn: user_id = auth.find_or_create_user( conn, info["email"], "google", info["sub"], display_name=info.get("name"), avatar_url=info.get("picture"), ) token = auth.create_session(conn, user_id, user_agent=request.headers.get("User-Agent")) conn.commit() ok = RedirectResponse(f"{PUBLIC_BASE_URL}/", status_code=302) _set_session_cookie(ok, token) ok.delete_cookie(OAUTH_COOKIE, path="/") return ok # --- Saved articles, history, and one-time import (all require sign-in) --- @app.get("/api/saved", response_model=FeedResponse) def saved_list(request: Request) -> FeedResponse: with get_conn() as conn: user = _require_user(conn, request) rows = queries.saved(conn, user["id"]) items = [Article.from_row(r) for r in rows] return FeedResponse(topic=None, flavor=None, count=len(items), items=items) @app.get("/api/saved/ids") def saved_id_list(request: Request) -> list[int]: with get_conn() as conn: user = _require_user(conn, request) return queries.saved_ids(conn, user["id"]) # --- Follows: durable source / tag interests (require sign-in) --- def _follows_for(conn, user_id: int) -> list[dict]: rows = conn.execute( "SELECT kind, value FROM user_follows WHERE user_id = ? ORDER BY created_at DESC", (user_id,) ).fetchall() out = [] for r in rows: d = {"kind": r["kind"], "value": r["value"], "name": r["value"]} if r["kind"] == "source" and r["value"].isdigit(): src = conn.execute("SELECT name FROM sources WHERE id = ?", (int(r["value"]),)).fetchone() if src: d["name"] = src["name"] out.append(d) return out @app.get("/api/follows") def follows_list(request: Request) -> list[dict]: with get_conn() as conn: user = _require_user(conn, request) return _follows_for(conn, user["id"]) @app.post("/api/follows") def follow_add(body: FollowBody, request: Request) -> dict: if body.kind not in ("source", "tag"): raise HTTPException(status_code=422, detail="kind must be 'source' or 'tag'") value = (body.value or "").strip() if body.kind == "tag": value = value.lower() if not value: raise HTTPException(status_code=422, detail="value is required") with get_conn() as conn: user = _require_user(conn, request) if body.kind == "source": if not value.isdigit() or not conn.execute( "SELECT 1 FROM sources WHERE id = ?", (int(value),) ).fetchone(): raise HTTPException(status_code=404, detail="source not found") conn.execute( "INSERT OR IGNORE INTO user_follows (user_id, kind, value) VALUES (?, ?, ?)", (user["id"], body.kind, value), ) conn.commit() return {"ok": True, "kind": body.kind, "value": value} @app.delete("/api/follows") def follow_remove(request: Request, kind: str = Query(...), value: str = Query(...)) -> dict: v = value.strip().lower() if kind == "tag" else value.strip() with get_conn() as conn: user = _require_user(conn, request) conn.execute( "DELETE FROM user_follows WHERE user_id = ? AND kind = ? AND value = ?", (user["id"], kind, v) ) conn.commit() return {"ok": True} @app.post("/api/saved/{article_id}") def save_article(article_id: int, request: Request) -> dict: with get_conn() as conn: user = _require_user(conn, request) if not conn.execute("SELECT 1 FROM articles WHERE id = ?", (article_id,)).fetchone(): raise HTTPException(status_code=404, detail="No such article.") conn.execute( "INSERT OR IGNORE INTO saved_articles (user_id, article_id) VALUES (?, ?)", (user["id"], article_id), ) conn.commit() return {"saved": True} @app.delete("/api/saved/{article_id}") def unsave_article(article_id: int, request: Request) -> dict: with get_conn() as conn: user = _require_user(conn, request) conn.execute( "DELETE FROM saved_articles WHERE user_id = ? AND article_id = ?", (user["id"], article_id), ) conn.commit() return {"saved": False} @app.get("/api/history", response_model=FeedResponse) def history_list(request: Request) -> FeedResponse: with get_conn() as conn: user = _require_user(conn, request) rows = queries.history(conn, user["id"]) items = [Article.from_row(r) for r in rows] return FeedResponse(topic=None, flavor=None, count=len(items), items=items) @app.post("/api/history") def record_history(body: IdsBody, request: Request) -> dict: with get_conn() as conn: user = _require_user(conn, request) for aid in queries.existing_article_ids(conn, body.ids): conn.execute( "INSERT OR IGNORE INTO user_history (user_id, article_id, event) " "VALUES (?, ?, 'seen')", (user["id"], aid), ) conn.commit() return {"ok": True} @app.delete("/api/history") def clear_history(request: Request) -> dict: with get_conn() as conn: user = _require_user(conn, request) conn.execute("DELETE FROM user_history WHERE user_id = ?", (user["id"],)) conn.commit() return {"ok": True} @app.delete("/api/history/{article_id}") def remove_history(article_id: int, request: Request) -> dict: with get_conn() as conn: user = _require_user(conn, request) conn.execute( "DELETE FROM user_history WHERE user_id = ? AND article_id = ?", (user["id"], article_id), ) conn.commit() return {"ok": True} # --- Prefs sync (Calm Filters / Boundaries follow the account) -------- @app.get("/api/prefs") def get_prefs(request: Request) -> dict: with get_conn() as conn: user = _require_user(conn, request) row = conn.execute( "SELECT prefs_json FROM user_prefs WHERE user_id = ?", (user["id"],) ).fetchone() if not row: return {"prefs": None} # no row yet → caller seeds from the device try: return {"prefs": json.loads(row["prefs_json"])} except (ValueError, TypeError): return {"prefs": None} @app.put("/api/prefs") def put_prefs(body: PrefsBody, request: Request) -> dict: blob = json.dumps(body.prefs)[:20000] with get_conn() as conn: user = _require_user(conn, request) conn.execute( "INSERT INTO user_prefs (user_id, prefs_json, updated_at) " "VALUES (?, ?, CURRENT_TIMESTAMP) " "ON CONFLICT(user_id) DO UPDATE SET prefs_json = excluded.prefs_json, " "updated_at = CURRENT_TIMESTAMP", (user["id"], blob), ) conn.commit() return {"ok": True} # --- Account: profile, sessions, export, delete ----------------------- @app.get("/api/account") def account_info(request: Request) -> dict: with get_conn() as conn: user = _require_user(conn, request) providers = [r["provider"] for r in conn.execute( "SELECT provider FROM identities WHERE user_id = ?", (user["id"],) )] sessions = conn.execute( "SELECT COUNT(*) FROM sessions WHERE user_id = ?", (user["id"],) ).fetchone()[0] saved = conn.execute( "SELECT COUNT(*) FROM saved_articles WHERE user_id = ?", (user["id"],) ).fetchone()[0] return { "user": {"id": user["id"], "email": user["email"], "display_name": user["display_name"]}, "providers": providers, "sessions": sessions, "saved_count": saved, } @app.post("/api/account/logout-all") def logout_all(request: Request, response: Response) -> dict: with get_conn() as conn: user = _require_user(conn, request) conn.execute("DELETE FROM sessions WHERE user_id = ?", (user["id"],)) conn.commit() response.delete_cookie(SESSION_COOKIE, path="/") return {"ok": True} @app.get("/api/account/export") def export_account(request: Request) -> Response: with get_conn() as conn: user = _require_user(conn, request) uid = user["id"] providers = [r["provider"] for r in conn.execute( "SELECT provider FROM identities WHERE user_id = ?", (uid,) )] saved = queries.saved(conn, uid, limit=10000) hist = queries.history(conn, uid, limit=10000) prow = conn.execute( "SELECT prefs_json FROM user_prefs WHERE user_id = ?", (uid,) ).fetchone() slim = lambda a: {"id": a["id"], "title": a["title"], "url": a["canonical_url"]} data = { "account": {"id": uid, "email": user["email"], "display_name": user["display_name"], "created_at": user["created_at"]}, "sign_in_methods": providers, "saved": [slim(a) for a in saved], "history": [slim(a) for a in hist], "preferences": json.loads(prow["prefs_json"]) if prow else None, } return Response( content=json.dumps(data, indent=2), media_type="application/json", headers={"Content-Disposition": "attachment; filename=upbeatbytes-data.json"}, ) @app.delete("/api/account") def delete_account(request: Request, response: Response) -> dict: with get_conn() as conn: user = _require_user(conn, request) conn.execute("DELETE FROM users WHERE id = ?", (user["id"],)) # cascades to all account data conn.commit() response.delete_cookie(SESSION_COOKIE, path="/") return {"ok": True} # --- Public share/landing page for an article ------------------------- # GET + HEAD: FastAPI's @app.get registers GET only (no auto-HEAD), so a HEAD would # fall through to the catch-all StaticFiles mount at "/" and 404. Register both so # HEAD returns the same status (200/301/404) as GET, sans body. @app.api_route("/a/{article_id}", methods=["GET", "HEAD"], response_class=HTMLResponse) def share_page(article_id: str, background_tasks: BackgroundTasks) -> HTMLResponse: not_found = HTMLResponse(share.render_not_found(PUBLIC_BASE_URL), status_code=404) try: aid = int(article_id) except (TypeError, ValueError): return not_found # malformed id → calm 404, no stack trace cached = _share_cache_get(aid) if cached is not None: # serve a rendered page without touching SQLite/render return HTMLResponse(cached, headers={"Cache-Control": "public, max-age=300"}) with get_conn() as conn: row = conn.execute( "SELECT a.id, a.title, a.description, a.image_url, a.canonical_url, " "a.duplicate_of, a.source_id, src.name AS source_name, src.image_policy, s.reason_text, s.accepted, " "(SELECT group_concat(t.tag) FROM article_tags t WHERE t.article_id = a.id) AS tags " "FROM articles a JOIN sources src ON src.id = a.source_id " "LEFT JOIN article_scores s ON s.article_id = a.id WHERE a.id = ?", (aid,), ).fetchone() if not row: return not_found # A duplicate's URL may already be indexed by Google. A hard 404 silently # drops it (and any newer twin that arrives later retires the OLDER, already # indexed URL) — that's what tanked impressions. So 301 to the canonical twin # instead: Google consolidates the page onto the survivor. dedup stores a star # (dup -> rep, rep.duplicate_of IS NULL); we still follow a short chain with a # cycle guard as cheap insurance. if row["duplicate_of"] is not None: seen, cur, target = {aid}, row["duplicate_of"], None for _ in range(8): if cur in seen: break seen.add(cur) r2 = conn.execute( "SELECT a.id, a.duplicate_of, s.accepted FROM articles a " "LEFT JOIN article_scores s ON s.article_id = a.id WHERE a.id = ?", (cur,), ).fetchone() if not r2: break if r2["duplicate_of"] is None: target = r2 if r2["accepted"] else None break cur = r2["duplicate_of"] if target is not None: return RedirectResponse(f"/a/{target['id']}", status_code=301) return not_found # canonical itself is gone/rejected → genuinely 404 if not row["accepted"]: return not_found summary = summarize.get_summary(conn, aid) explanation = summarize.get_explanation(conn, aid) complete = bool(summary and explanation) if not complete: _kick_summary(aid, background_tasks) # generate/top-up for next time; page polls html = share.render_share_page(dict(row), PUBLIC_BASE_URL, summary=summary, explanation=explanation) if complete: _share_cache_put(aid, html) # cache only the finished page (never the "generating" state) return HTMLResponse(html, headers={"Cache-Control": "public, max-age=300" if complete else "no-cache"}) # --- Privacy-respecting first-party analytics ------------------------- @app.post("/api/events") def record_event(body: EventBody, request: Request) -> dict: # Don't let crawlers inflate visitor/funnel counts. Many modern bots run JS and # DO fire this beacon, so filter by User-Agent (same check the load-error beacon # uses) — catches honest bot UAs (GPTBot, AhrefsBot, headless Chrome, …). The # response is identical either way, so a bot can't tell it was dropped. ua = request.headers.get("user-agent", "") if body.kind in _EVENT_KINDS and not queries.is_bot_ua(ua): with get_conn() as conn: conn.execute( "INSERT OR IGNORE INTO events (kind, article_id, visitor_hash, day) " "VALUES (?, ?, ?, date('now'))", (body.kind, body.article_id or 0, _visitor_hash(body.visitor)), ) conn.commit() return {"ok": True} # always identical; dedup'd by the unique key @app.post("/api/client-error") def record_client_error(body: ClientErrorBody, request: Request) -> dict: # Boot-failure seatbelt telemetry — what blank-risk looks like in the wild. ua = (request.headers.get("user-agent") or "")[:300] with get_conn() as conn: conn.execute( "INSERT INTO client_errors (reason, path, user_agent, app_version) VALUES (?, ?, ?, ?)", ((body.reason or "")[:500], (body.path or "")[:200], ua, (body.version or "")[:60]), ) conn.execute("DELETE FROM client_errors WHERE created_at < datetime('now','-14 days')") conn.commit() return {"ok": True} @app.get("/api/admin/client-errors") def admin_client_errors(request: Request, show: str = Query("unread", pattern="^(unread|read|all)$")) -> list[dict]: with get_conn() as conn: _require_admin(conn, request) where = {"unread": "WHERE read_at IS NULL", "read": "WHERE read_at IS NOT NULL", "all": ""}[show] rows = conn.execute( f"SELECT id, reason, path, user_agent, app_version, created_at, read_at " f"FROM client_errors {where} ORDER BY id DESC LIMIT 50" ).fetchall() # Bots stay visible (tagged) but are excluded from the headline count — see admin_stats. return [{**dict(r), "read": r["read_at"] is not None, "bot": queries.is_bot_ua(r["user_agent"])} for r in rows] @app.post("/api/admin/client-errors/read-all") def admin_client_errors_read_all(request: Request) -> dict: with get_conn() as conn: _require_admin(conn, request) cur = conn.execute("UPDATE client_errors SET read_at = CURRENT_TIMESTAMP WHERE read_at IS NULL") conn.commit() return {"ok": True, "marked": cur.rowcount} @app.post("/api/admin/client-errors/{eid}/read") def admin_client_error_read(eid: int, body: FeedbackReadBody, request: Request) -> dict: with get_conn() as conn: _require_admin(conn, request) ts = "CURRENT_TIMESTAMP" if body.read else "NULL" cur = conn.execute(f"UPDATE client_errors SET read_at = {ts} WHERE id = ?", (eid,)) if cur.rowcount == 0: raise HTTPException(status_code=404, detail="load error not found") conn.commit() return {"ok": True, "read": body.read} @app.post("/api/feedback") def submit_feedback(body: FeedbackBody, request: Request, background_tasks: BackgroundTasks) -> dict: if body.hp: # honeypot tripped → accept silently, store nothing return {"ok": True} message = (body.message or "").strip()[:4000] if not message: raise HTTPException(status_code=422, detail="Please add a short message.") category = body.category if body.category in _FEEDBACK_CATEGORIES else "other" email = ((body.email or "").strip()[:200]) or None vh = _visitor_hash(body.visitor) with get_conn() as conn: if vh: # light flood cap per anonymous token per day recent = conn.execute( "SELECT COUNT(*) FROM feedback WHERE visitor_hash = ? AND day = date('now')", (vh,) ).fetchone()[0] if recent >= 8: return {"ok": True} user = _current_user(conn, request) conn.execute( "INSERT INTO feedback (category, message, contact_email, user_id, visitor_hash, day) " "VALUES (?, ?, ?, ?, ?, date('now'))", (category, message, email, user["id"] if user else None, vh), ) conn.commit() who = user["email"] if user else "anonymous visitor" for addr in ADMIN_EMAILS: background_tasks.add_task(_feedback_email_safe, addr, category, message, email, who) return {"ok": True} @app.get("/api/admin/feedback") def admin_feedback(request: Request) -> list[dict]: with get_conn() as conn: _require_admin(conn, request) rows = conn.execute( "SELECT f.id, f.category, f.message, f.contact_email, f.created_at, f.read_at, " "u.email AS user_email FROM feedback f LEFT JOIN users u ON u.id = f.user_id " "ORDER BY f.created_at DESC LIMIT 200" ).fetchall() items = [dict(r) for r in rows] if items: ids = [it["id"] for it in items] ph = ",".join("?" * len(ids)) reps = conn.execute( f"SELECT id, feedback_id, message, message_html, sent_to, sent_at FROM feedback_replies " f"WHERE feedback_id IN ({ph}) ORDER BY sent_at", ids, ).fetchall() by_fid: dict = {} for r in reps: by_fid.setdefault(r["feedback_id"], []).append(dict(r)) for it in items: it["replies"] = by_fid.get(it["id"], []) return items @app.post("/api/admin/feedback/{fid}/read") def admin_feedback_read(fid: int, body: FeedbackReadBody, request: Request) -> dict: with get_conn() as conn: _require_admin(conn, request) ts = "CURRENT_TIMESTAMP" if body.read else "NULL" cur = conn.execute(f"UPDATE feedback SET read_at = {ts} WHERE id = ?", (fid,)) if cur.rowcount == 0: raise HTTPException(status_code=404, detail="feedback not found") conn.commit() return {"ok": True, "read": body.read} @app.delete("/api/admin/feedback/{fid}") def admin_feedback_delete(fid: int, request: Request) -> dict: with get_conn() as conn: _require_admin(conn, request) cur = conn.execute("DELETE FROM feedback WHERE id = ?", (fid,)) if cur.rowcount == 0: raise HTTPException(status_code=404, detail="feedback not found") conn.commit() return {"ok": True} @app.post("/api/admin/feedback/{fid}/reply") def admin_feedback_reply(fid: int, body: FeedbackReplyBody, request: Request) -> dict: # Cap the RAW editor HTML first (slicing sanitized output could sever a # tag), then sanitize the whole thing. reply_html = sanitize_reply_html((body.html or "")[:20000]) reply_text = reply_html_to_text(reply_html) if not reply_text: raise HTTPException(status_code=422, detail="Reply message is required.") # 1. Validate + gather, then release the DB connection — SMTP can take # ~20s and shouldn't keep a connection open. with get_conn() as conn: admin = _require_admin(conn, request) fb = conn.execute("SELECT contact_email, message FROM feedback WHERE id = ?", (fid,)).fetchone() if not fb: raise HTTPException(status_code=404, detail="feedback not found") if not fb["contact_email"]: raise HTTPException(status_code=400, detail="No reply address for this feedback.") admin_id, to, original = admin["id"], fb["contact_email"], fb["message"] # 2. Send with no DB connection held; only record a reply that actually # went out, so the UI can keep the draft on failure. try: email_send.send_feedback_reply(to, reply_text, reply_html, original) except Exception as exc: # noqa: BLE001 — surface any SMTP failure to the admin raise HTTPException(status_code=502, detail=f"Could not send the reply: {exc}") # 3. Record the sent reply + mark the item read. with get_conn() as conn: cur = conn.execute( "INSERT INTO feedback_replies (feedback_id, user_id, message, message_html, sent_to) " "VALUES (?, ?, ?, ?, ?)", (fid, admin_id, reply_text, reply_html, to), ) conn.execute( "UPDATE feedback SET read_at = COALESCE(read_at, CURRENT_TIMESTAMP) WHERE id = ?", (fid,) ) conn.commit() reply = conn.execute( "SELECT id, feedback_id, message, message_html, sent_to, sent_at FROM feedback_replies WHERE id = ?", (cur.lastrowid,), ).fetchone() return {"ok": True, "reply": dict(reply)} @app.post("/api/admin/sources/{sid}/status") def admin_source_status(sid: int, body: SourceStatusBody, request: Request) -> dict: # Lifecycle: active | paused | retired. Stops/resumes polling only — # existing articles stay visible (use /visibility to hide). `active` is # kept mirrored so the scheduler/CLI keep working off the legacy flag. if body.status not in ("active", "paused", "retired"): raise HTTPException(status_code=422, detail="invalid status") active = 1 if body.status == "active" else 0 with get_conn() as conn: _require_admin(conn, request) cur = conn.execute( "UPDATE sources SET status = ?, active = ? WHERE id = ?", (body.status, active, sid) ) if cur.rowcount == 0: raise HTTPException(status_code=404, detail="source not found") conn.commit() return {"ok": True, "status": body.status, "active": active} @app.post("/api/admin/sources/{sid}/visibility") def admin_source_visibility(sid: int, body: SourceVisibilityBody, request: Request) -> dict: # Pull a source's existing articles out of (or back into) the public feed # and brief. Reversible; never deletes anything. with get_conn() as conn: _require_admin(conn, request) cur = conn.execute( "UPDATE sources SET content_visible = ? WHERE id = ?", (1 if body.visible else 0, sid) ) if cur.rowcount == 0: raise HTTPException(status_code=404, detail="source not found") conn.commit() return {"ok": True, "visible": body.visible} @app.post("/api/admin/sources/{sid}/preview") def admin_source_preview(sid: int, request: Request) -> dict: # Read-only spot-check of a LIVE source: safe-fetch + heuristic preview. # Mutates nothing — no DB write, no poll attempt, no health/state change. with get_conn() as conn: _require_admin(conn, request) src = conn.execute("SELECT feed_url FROM sources WHERE id = ?", (sid,)).fetchone() if not src: raise HTTPException(status_code=404, detail="source not found") url = src["feed_url"] return _preview_or_502(url) # safe fetch, no DB connection held @app.get("/api/admin/sources/{sid}/articles") def admin_source_articles(sid: int, request: Request, filter: str = "all", limit: int = 25, offset: int = 0) -> dict: # Read-only inspector: the REAL ingested articles behind a source's metrics, # so paywall/image/acceptance/duplicate signals can be verified against evidence. limit = max(1, min(int(limit), 100)) offset = max(0, int(offset)) with get_conn() as conn: _require_admin(conn, request) if not conn.execute("SELECT 1 FROM sources WHERE id = ?", (sid,)).fetchone(): raise HTTPException(status_code=404, detail="source not found") arts = queries.source_articles(conn, sid, filter, limit, offset) return { "articles": arts, "summary": queries.source_articles_summary(conn, sid) if offset == 0 else None, "has_more": len(arts) == limit, } @app.post("/api/admin/sources/{sid}/paywall") def admin_source_paywall(sid: int, body: SourcePaywallBody, request: Request) -> dict: # Per-source paywall override: corrects domain-rule false positives # (NY Times Learning is free) / negatives. Threaded into feed/lead/brief # ranking + badges via is_paywalled_for_source. ov = body.override or None if ov not in (None, "free", "paywalled"): raise HTTPException(status_code=422, detail="override must be null, 'free', or 'paywalled'") with get_conn() as conn: _require_admin(conn, request) cur = conn.execute("UPDATE sources SET paywall_override = ? WHERE id = ?", (ov, sid)) if cur.rowcount == 0: raise HTTPException(status_code=404, detail="source not found") conn.commit() return {"ok": True, "override": ov} @app.post("/api/admin/sources/{sid}/image-policy") def admin_source_image_policy(sid: int, body: SourceImagePolicyBody, request: Request) -> dict: # Image rights policy: 'cache' (re-host a downscaled copy — only for sources we've # cleared: open license / permission / public-domain), 'remote' (hotlink the # publisher's image), 'none' (no image). Default is the conservative 'remote'. pol = body.policy if pol not in ("cache", "remote", "none"): raise HTTPException(status_code=422, detail="policy must be 'cache', 'remote', or 'none'") with get_conn() as conn: _require_admin(conn, request) cur = conn.execute("UPDATE sources SET image_policy = ? WHERE id = ?", (pol, sid)) if cur.rowcount == 0: raise HTTPException(status_code=404, detail="source not found") conn.commit() # Leaving 'cache' (e.g. permission revoked) → take the re-hosted copies down now, # not just make them inaccessible. Setting TO 'cache' just flips the flag; the # cycle warms it. purged = newsimg.purge_source(conn, sid) if pol != "cache" else 0 return {"ok": True, "policy": pol, "purged": purged} # --- Source candidates (supervised add-a-source pipeline) ---------------- def _candidate_dict(row) -> dict: d = dict(row) raw = d.pop("preview_json", None) try: d["preview"] = json.loads(raw) if raw else None except (ValueError, TypeError): d["preview"] = None return d @app.get("/api/admin/candidates") def admin_candidates(request: Request) -> list[dict]: with get_conn() as conn: _require_admin(conn, request) rows = sources.list_candidates(conn) return [_candidate_dict(r) for r in rows] def _preview_or_502(url: str, deep: bool = False) -> dict: # SSRF-safe fetch (admin-pasted URL is untrusted). Default is the fast # heuristic; deep=True also runs the real LLM classifier on a small sample # (slower, ~5-7s/item — the true acceptance view, not an estimate). client = None if deep: try: client = LocalModelClient.from_env() except Exception: # noqa: BLE001 — fall back to heuristic if the model is down client = None try: return feeds.preview_feed( url, sample=(8 if deep else 20), fetcher=feeds.safe_fetch_feed, client=client ) except Exception as exc: # noqa: BLE001 — surface a readable reason raise HTTPException(status_code=502, detail=f"Couldn't preview that feed: {exc}") @app.post("/api/admin/candidates") def admin_candidate_suggest(body: CandidateSuggestBody, request: Request) -> dict: url = (body.feed_url or "").strip() if not url: raise HTTPException(status_code=422, detail="feed_url is required") with get_conn() as conn: # gate BEFORE the outbound fetch _require_admin(conn, request) # Don't re-add a feed that's already live or already queued (catches # http/https · www · trailing-slash variants, not just exact dups). existing = sources.find_existing_feed(conn, url) if existing: where = "already a source" if existing["kind"] == "source" else "already in the candidate queue" raise HTTPException( status_code=409, detail=f"“{existing['name']}” is {where} ({existing['status']}). " "Find it below — Re-preview it there if you want a fresh read.", ) preview = _preview_or_502(url) # no DB connection held during network I/O with get_conn() as conn: _require_admin(conn, request) row = sources.save_candidate(conn, url, preview=preview, name=(body.name or None), status="suggested") return _candidate_dict(row) @app.post("/api/admin/candidates/{cid}/preview") def admin_candidate_repreview(cid: int, request: Request, deep: bool = False) -> dict: with get_conn() as conn: _require_admin(conn, request) cand = conn.execute("SELECT feed_url FROM source_candidates WHERE id = ?", (cid,)).fetchone() if not cand: raise HTTPException(status_code=404, detail="candidate not found") url = cand["feed_url"] preview = _preview_or_502(url, deep=deep) # conn released during the (slow) model pass with get_conn() as conn: _require_admin(conn, request) row = sources.save_candidate(conn, url, preview=preview) return _candidate_dict(row) @app.post("/api/admin/candidates/{cid}/rename") def admin_candidate_rename(cid: int, body: CandidateRenameBody, request: Request) -> dict: with get_conn() as conn: _require_admin(conn, request) cand = conn.execute("SELECT status FROM source_candidates WHERE id = ?", (cid,)).fetchone() if not cand: raise HTTPException(status_code=404, detail="candidate not found") # Match the UI policy server-side: a promoted/rejected candidate is # settled history — rename only while it's pending review. if cand["status"] in ("promoted", "rejected"): raise HTTPException(status_code=409, detail=f"Can't rename a {cand['status']} candidate.") name = (body.name or "").strip()[:160] or None # cap so a pasted paragraph can't wreck the queue row = sources.rename_candidate(conn, cid, name) return _candidate_dict(row) @app.post("/api/admin/candidates/{cid}/promote") def admin_candidate_promote(cid: int, body: CandidatePromoteBody, request: Request) -> dict: with get_conn() as conn: _require_admin(conn, request) try: source_id = sources.promote_candidate( conn, cid, active=body.active, default_category=body.default_category, trust_score=body.trust_score, pr_risk_score=body.pr_risk_score, poll_interval_minutes=body.poll_interval_minutes, ) except sources.DuplicateFeedError as exc: ex = exc.existing raise HTTPException( status_code=409, detail=f"“{ex['name']}” is already a source ({ex['status']}) — " "promote skipped so its settings aren't overwritten.", ) except ValueError: raise HTTPException(status_code=404, detail="candidate not found") src = conn.execute( "SELECT id, name, status, active, content_visible FROM sources WHERE id = ?", (source_id,) ).fetchone() cand = conn.execute("SELECT * FROM source_candidates WHERE id = ?", (cid,)).fetchone() return {"ok": True, "source_id": source_id, "source": dict(src), "candidate": _candidate_dict(cand)} @app.post("/api/admin/candidates/{cid}/reject") def admin_candidate_reject(cid: int, request: Request) -> dict: with get_conn() as conn: _require_admin(conn, request) if not sources.reject_candidate(conn, cid): raise HTTPException(status_code=404, detail="candidate not found") cand = conn.execute("SELECT * FROM source_candidates WHERE id = ?", (cid,)).fetchone() return _candidate_dict(cand) @app.post("/api/admin/candidates/{cid}/restore") def admin_candidate_restore(cid: int, request: Request) -> dict: # Send a rejected candidate back to staging for another look. with get_conn() as conn: _require_admin(conn, request) if not sources.restore_candidate(conn, cid): raise HTTPException(status_code=404, detail="no rejected candidate with that id") cand = conn.execute("SELECT * FROM source_candidates WHERE id = ?", (cid,)).fetchone() return _candidate_dict(cand) # --- Publishing Desk (admin): outbound-share queue for X (platform-neutral) --- @app.post("/api/admin/publishing/build") def admin_publishing_build(request: Request, background_tasks: BackgroundTasks) -> dict: # Kick the queue build in the background (the comparative LLM call can be slow); # the client polls /queue. No-op if a build is already running. with get_conn() as conn: _require_admin(conn, request) with _publish_build_lock: # atomic check-and-set: one job at a time if not _publish_build["building"]: _publish_build.update(building=True, result=None, error=None) background_tasks.add_task(_run_publish_build) return {"building": True} @app.get("/api/admin/publishing/queue") def admin_publishing_queue(request: Request, archived: bool = False) -> dict: with get_conn() as conn: _require_admin(conn, request) items = publishing.list_queue(conn, include_archived=archived) return {"building": _publish_build["building"], "last": _publish_build.get("result"), "error": _publish_build.get("error"), "items": items} @app.post("/api/admin/publishing/{sid}/status") def admin_publishing_status(sid: int, body: PublishStatusBody, request: Request) -> dict: with get_conn() as conn: _require_admin(conn, request) ok = publishing.set_status(conn, sid, body.status, draft_text=body.draft_text, final_text=body.final_text, post_url=body.post_url, snooze_until=body.snooze_until) if not ok: raise HTTPException(status_code=400, detail="bad status or id") return {"ok": True} @app.post("/api/admin/publishing/{sid}/draft") def admin_publishing_draft(sid: int, body: PublishDraftBody, request: Request) -> dict: with get_conn() as conn: _require_admin(conn, request) ok = publishing.save_draft(conn, sid, body.draft_text) if not ok: raise HTTPException(status_code=404, detail="no such share") return {"ok": True} @app.post("/api/admin/publishing/{sid}/restore") def admin_publishing_restore(sid: int, request: Request) -> dict: with get_conn() as conn: _require_admin(conn, request) ok = publishing.restore(conn, sid) if not ok: raise HTTPException(status_code=400, detail="not a restorable (skipped/snoozed) share") return {"ok": True} @app.post("/api/admin/publishing/handles") def admin_publishing_add_handle(body: EntityHandleBody, request: Request) -> dict: # Save a verified handle (e.g. after confirming one via 'Find on X'). with get_conn() as conn: _require_admin(conn, request) ok = publishing.add_entity_handle(conn, body.entity_name, body.handle, body.profile_url) if not ok: raise HTTPException(status_code=400, detail="bad entity or handle") return {"ok": True} # --- CSV exports (admin-gated, for inspection / archiving) --------------- def _csv_cell(v): # Defuse CSV formula injection: a cell a spreadsheet might evaluate (=,+,-,@) # gets a leading apostrophe. Numbers pass through untouched (no risk, and # this avoids mangling any legitimate negative value). if v is None: return "" if isinstance(v, (int, float)): return v s = str(v) return "'" + s if s[:1] in ("=", "+", "-", "@") else s def _csv_response(filename: str, write) -> Response: buf = io.StringIO() writer = csv.writer(buf) write(lambda values: writer.writerow([_csv_cell(v) for v in values])) return Response( content=buf.getvalue(), media_type="text/csv", headers={"Content-Disposition": f'attachment; filename="{filename}"'}, ) @app.get("/api/admin/export/sources.csv") def admin_export_sources(request: Request) -> Response: # Current-state snapshot of every source (active + paused + retired). with get_conn() as conn: _require_admin(conn, request) rows = queries.source_health(conn) def write(row): row([ "name", "feed_url", "homepage", "status", "visible", "served", "accepted_total", "total_articles", "acceptance_pct", "duplicate_pct", "accepted_dup_pct", "image_coverage_pct", "paywalled", "last_success", "last_error", "retry_after", "review_flag", "review_reason", ]) for s in rows: row([ s["name"], s["feed_url"], s.get("homepage_url") or "", s.get("status") or "", "yes" if s.get("content_visible") else "no", s["served"], s["accepted_total"], s["total_articles"], s["acceptance_rate"], s["duplicate_rate"], s["accepted_dup_rate"], s["image_coverage"], "yes" if s.get("paywalled") else "no", s.get("last_success_at") or "", s.get("last_error") or "", s.get("retry_after_at") or "", "yes" if s.get("review_flag") else "no", s.get("review_reason") or "", ]) return _csv_response("sources.csv", write) @app.get("/api/admin/export/audience.csv") def admin_export_audience(request: Request, days: int = Query(30)) -> Response: days = days if days in (7, 30, 90) else 30 with get_conn() as conn: _require_admin(conn, request) st = queries.admin_stats(conn, days=days) def write(row): v, a = st["visitors"], st["accounts"] row(["metric", "value"]) for label, value in [ ("window_days", st["days"]), ("visitors_today", v["today"]), ("visitors_7d", v["d7"]), ("visitors_30d", v["d30"]), ("returning_30d", st.get("returning", 0)), ("once_30d", st.get("once", 0)), ("accounts_total", a["total"]), ("accounts_new_7d", a["new_7d"]), ("accounts_active_7d", a["active_7d"]), ("feedback_7d", st.get("feedback_7d", 0)), ("feedback_unread", st.get("feedback_unread", 0)), ]: row([label, value]) for kind, n in (st.get("shares") or {}).items(): row([f"share_{kind}", n]) row([]) # blank line, then the daily time series row(["date", "visitors", "opens"]) for d in st.get("daily", []): row([d["day"], d["visits"], d["opens"]]) return _csv_response("audience.csv", write) @app.post("/api/admin/sources/{sid}/review") def admin_source_review(sid: int, body: SourceReviewBody, request: Request) -> dict: with get_conn() as conn: _require_admin(conn, request) cur = conn.execute( "UPDATE sources SET review_flag = ?, review_reason = ? WHERE id = ?", (1 if body.flag else 0, (body.reason or None) if body.flag else None, sid), ) if cur.rowcount == 0: raise HTTPException(status_code=404, detail="source not found") conn.commit() return {"ok": True, "flag": body.flag} @app.get("/api/admin/stats") def admin_stats(request: Request, days: int = Query(30)) -> dict: days = days if days in (7, 30, 90) else 30 # clamp to the offered windows with get_conn() as conn: _require_admin(conn, request) return queries.admin_stats(conn, days=days) @app.get("/api/summary/{article_id}") def article_summary(article_id: int, background_tasks: BackgroundTasks) -> dict: with get_conn() as conn: summary = summarize.get_summary(conn, article_id) explanation = summarize.get_explanation(conn, article_id) if summary: return {"status": "ready", "summary": summary, "explanation": explanation} _kick_summary(article_id, background_tasks) return {"status": "pending", "summary": None, "explanation": None} @app.get("/today", response_class=HTMLResponse) def today_digest() -> HTMLResponse: with get_conn() as conn: b = queries.brief(conn) items = b.get("items") or [] if not items: return HTMLResponse(share.render_not_found(PUBLIC_BASE_URL), status_code=404) return HTMLResponse(share.render_digest(items, PUBLIC_BASE_URL, b.get("brief_date"))) @app.api_route("/sitemap.xml", methods=["GET", "HEAD"]) def sitemap() -> Response: with get_conn() as conn: pwx = queries.paywalled_source_ids(conn) pw_clause = f" AND a.source_id NOT IN ({','.join('?' * len(pwx))})" if pwx else "" # Only articles with a real summary (the page has substance), paywalled excluded. # Cap near the 50k protocol ceiling so older canonical pages aren't dropped. rows = conn.execute( "SELECT a.id, COALESCE(a.published_at, a.discovered_at) AS lm " "FROM articles a JOIN article_scores s ON s.article_id = a.id " "WHERE s.accepted = 1 AND a.duplicate_of IS NULL" + pw_clause + " " "AND EXISTS (SELECT 1 FROM article_summaries asum WHERE asum.article_id = a.id) " "ORDER BY lm DESC LIMIT 50000", pwx, ).fetchall() base = PUBLIC_BASE_URL # Hub + the public sections, then every summarized article page. urls = [ f"{base}/hourly1.0", f"{base}/newshourly0.9", f"{base}/todaydaily0.9", f"{base}/artdaily0.6", f"{base}/playweekly0.5", f"{base}/worddaily0.5", f"{base}/quotedaily0.5", f"{base}/onthisdaydaily0.5", ] for r in rows: lm = (r["lm"] or "")[:10] lastmod = f"{lm}" if lm else "" urls.append(f"{base}/a/{r['id']}{lastmod}") xml = ( '' '' + "".join(urls) + "" ) return Response(content=xml, media_type="application/xml") @app.post("/api/import") def import_local(body: ImportBody, request: Request) -> dict: """Fold this device's anonymous history/saved into the account (one-time).""" with get_conn() as conn: user = _require_user(conn, request) for aid in queries.existing_article_ids(conn, body.seen): conn.execute( "INSERT OR IGNORE INTO user_history (user_id, article_id, event) " "VALUES (?, ?, 'seen')", (user["id"], aid), ) for aid in queries.existing_article_ids(conn, body.saved): conn.execute( "INSERT OR IGNORE INTO saved_articles (user_id, article_id) VALUES (?, ?)", (user["id"], aid), ) conn.commit() return {"ok": True} @app.get("/api/categories", response_model=CategoriesResponse) def categories(response: Response) -> CategoriesResponse: response.headers["Cache-Control"] = _EDGE_CONFIG # static taxonomy, identical for everyone return CategoriesResponse( topics=[Category(key=k, description=v) for k, v in TOPICS.items()], flavors=[Category(key=k, description=v) for k, v in FLAVORS.items()], ) @app.get("/api/moods") def moods(response: Response) -> list[dict]: # The humane front door: each mood resolves to a filter preset the # client merges with the user's own Calm Filters. response.headers["Cache-Control"] = _EDGE_CONFIG # static presets, identical for everyone return MOODS @app.get("/api/lanes") def lanes(response: Response) -> dict: # The customizable quick-access rail: 'today' is always pinned, and the # reader pins any subset of these moods / topics / Discovery tags. Live # counts let the client gate empty lanes and show volume. response.headers["Cache-Control"] = _EDGE_DERIVED # global counts, no per-user data with get_conn() as conn: tagc = queries.tag_counts(conn) topicc: dict[str, int] = {} for row in queries.category_counts(conn): topicc[row["topic"]] = topicc.get(row["topic"], 0) + int(row["count"]) return build_lane_pool(topicc, tagc) @app.get("/api/families") def families(response: Response) -> list[dict]: # Grouping vocabulary organised into calm families for the Explore UI. response.headers["Cache-Control"] = _EDGE_DERIVED # global vocab + counts, no per-user data with get_conn() as conn: counts = queries.tag_counts(conn) return [ { "name": name, "description": d["description"], "tags": [{"key": t, "count": counts.get(t, 0)} for t in d["tags"]], } for name, d in FAMILIES.items() ] @app.get("/api/category-counts", response_model=list[CategoryCount]) def category_counts(accepted_only: bool = True, prefs: str | None = Query(None)) -> list[CategoryCount]: fp = prefs_from_json(prefs) with get_conn() as conn: if fp.is_empty(): rows = queries.category_counts(conn, accepted_only=accepted_only) else: # Count over the SAME filtered set the feed would return, so the # browse numbers always match what the user actually sees. allrows = queries.feed(conn, accepted_only=accepted_only, limit=100000, offset=0) kept = filter_articles(allrows, fp, datetime.now(timezone.utc)) counts = Counter((r["topic"], r["flavor"]) for r in kept) rows = [ {"topic": t, "flavor": f, "count": n} for (t, f), n in sorted(counts.items(), key=lambda kv: (str(kv[0][0]), str(kv[0][1]))) ] return [CategoryCount(**row) for row in rows] @app.get("/api/feed", response_model=FeedResponse) def feed( response: Response, topic: str | None = Query(None), flavor: str | None = Query(None), accepted_only: bool = True, limit: int = Query(30, ge=1, le=100), offset: int = Query(0, ge=0), prefs: str | None = Query(None), exclude: str = Query("", description="comma-separated article ids the reader has dismissed"), tag: str | None = Query(None, description="grouping tag to browse"), source_id: int | None = Query(None, ge=1, description="show only this source's articles"), sort: str = Query("ranked", pattern="^(ranked|latest)$", description="ranked (best-first) or latest (newest-first)"), following: bool = Query(False, description="restrict to the signed-in user's followed sources/tags"), home: str | None = Query(None, max_length=8, description="Closer to Home: reader's home as 'US' or 'US-NY'"), request: Request = None, ) -> FeedResponse: # Edge-cacheable ONLY when the response depends purely on the URL: not the # following feed (reads the session's follows) and not personal filters # (prefs/dismissals are per-reader). The shareable cases — the default # home feed, topic/flavor/tag/source browse — are identical for everyone, # so the edge can serve one copy to all. Everything else stays private. shareable = not following and not prefs and not exclude.strip() and not home response.headers["Cache-Control"] = _EDGE_FEED if shareable else _PRIVATE if topic and topic.lower() not in TOPICS: raise HTTPException(400, f"unknown topic: {topic}") if flavor and flavor.lower() not in FLAVORS: raise HTTPException(400, f"unknown flavor: {flavor}") # Parse the reader's home: 'US' or 'US-NY'. State granularity is US-only for v1. home_country = home_state = None if home: parts = home.upper().split("-", 1) home_country = (parts[0][:2] or None) if home_country == "US" and len(parts) > 1: home_state = parts[1][:2] or None fp = prefs_from_json(prefs) now = datetime.now(timezone.utc) excl = {int(x) for x in exclude.split(",") if x.strip().lstrip("-").isdigit()} # Categorical filters (include/mute topics+flavors incl. active pauses, # cortisol ceiling) go to SQL so nothing is truncated by ranking. Only # word-boundary avoid-terms and dismissals need a Python pass. kw = _prefs_sql_kw(fp, now) with get_conn() as conn: if following: user = _current_user(conn, request) if not user: return FeedResponse(topic=topic, flavor=flavor, count=0, items=[]) frows = conn.execute( "SELECT kind, value FROM user_follows WHERE user_id = ?", (user["id"],) ).fetchall() kw["follow_sources"] = [int(r["value"]) for r in frows if r["kind"] == "source" and r["value"].isdigit()] kw["follow_tags"] = [r["value"] for r in frows if r["kind"] == "tag"] def _fetch(scope, lim, off): # One scoped page, applying the avoid-terms/dismissal Python pass when needed. if fp.avoid_terms or excl: fetch_n = min(2000, (off + lim) * 4 + 50 + len(excl)) raw = queries.feed( conn, topic=topic, flavor=flavor, accepted_only=accepted_only, limit=fetch_n, offset=0, tag=tag, source_id=source_id, sort=sort, home_country=home_country, home_state=home_state, geo_scope=scope, **kw, ) kept = [a for a in filter_articles(raw, fp, now) if a["id"] not in excl] return kept[off : off + lim] return queries.feed( conn, topic=topic, flavor=flavor, accepted_only=accepted_only, limit=lim, offset=off, tag=tag, source_id=source_id, sort=sort, home_country=home_country, home_state=home_state, geo_scope=scope, **kw, ) next_offset = None if home_country: # Closer to Home. Near you (+ Elsewhere in your country when a state is set) # is a ONE-TIME lead block on page 0; the world is the paginated body. Thin # tiers fold down so a header is never shown empty (Codex: lead, don't trap). NEAR_CAP, COUNTRY_CAP, MIN_TIER = 8, 8, 3 if offset == 0: near = _fetch("near", NEAR_CAP, 0) country = _fetch("country", COUNTRY_CAP, 0) if home_state else [] world = _fetch("world", limit, 0) next_offset = limit if len(world) == limit else None tiers = [] if home_state: if len(near) >= MIN_TIER: tiers.append(("near", near)) else: country = near + country # fold sparse "near" into your country if len(country) >= MIN_TIER: tiers.append(("country", country)) else: world = country + world # fold sparse country into the world elif len(near) >= MIN_TIER: tiers.append(("near", near)) # near == your whole country here else: world = near + world tiers.append(("world", world)) rows = [] for key, group in tiers: for r in group: r["__section"] = key rows.append(r) else: rows = _fetch("world", limit, offset) for r in rows: r["__section"] = "world" next_offset = offset + limit if len(rows) == limit else None else: rows = _fetch(None, limit, offset) next_offset = offset + len(rows) if len(rows) == limit else None # Section grouping only — paywalled-source stories are excluded upstream now. _SEC = {"near": 0, "country": 1, "world": 2} rows = sorted(rows, key=lambda r: _SEC.get(r.get("__section"), 0)) return FeedResponse( topic=topic, flavor=flavor, count=len(rows), items=[Article.from_row(r) for r in rows], next_offset=next_offset, ) @app.get("/api/search", response_model=FeedResponse) def search(response: Response, q: str = Query("", max_length=120), prefs: str | None = Query(None), limit: int = Query(30, ge=1, le=60), offset: int = Query(0, ge=0)) -> FeedResponse: # Public article search across the visitor-facing corpus. Mirrors the feed's # boundaries (accepted/visible/non-duplicate + the reader's Calm Filters / # avoid-terms) but NOT a lane scope — you searched on purpose. Ranked by # relevance (bm25), recency as a tie-break. Per-reader → never edge-cached. response.headers["Cache-Control"] = _PRIVATE fts = _fts_query(q) if not fts: return FeedResponse(topic=None, flavor=None, count=0, items=[]) fp = prefs_from_json(prefs) now = datetime.now(timezone.utc) kw = _prefs_sql_kw(fp, now) with get_conn() as conn: if not conn.execute("SELECT 1 FROM article_search LIMIT 1").fetchone(): queries.reindex_search(conn) # lazy build (fresh deploy / before first cycle) fetch_n = min(2000, (offset + limit) * 4 + 40) if fp.avoid_terms else (offset + limit) raw = queries.feed(conn, accepted_only=True, limit=fetch_n, offset=0, match=fts, **kw) kept = filter_articles(raw, fp, now) if fp.avoid_terms else raw # word-boundary avoid-terms items = kept[offset:offset + limit] # Keep relevance order (don't paywall-reorder); the badge still shows true status. return FeedResponse(topic=None, flavor=None, count=len(items), items=[Article.from_row(r) for r in items]) @app.get("/api/puzzle/{game}") def daily_puzzle(game: str, variant: str = Query("5")) -> dict: with get_conn() as conn: if game == "word" and variant in games.WORD_VARIANTS: return games.word_puzzle_response(conn, local_today(), variant) if game == "wordsearch": return games.wordsearch_response(conn, local_today(), variant) if game == "bloom": return bloom.bloom_response(conn, local_today()) raise HTTPException(status_code=404, detail="no such puzzle") @app.get("/api/puzzle/bloom/free") def bloom_free(response: Response, format: str = "center", seed: str | None = None) -> dict: # A free-play wheel: deterministic by `seed` (client stores it to resume), # random when none is given. Center Circle or Wild Bloom. No DB, no sync. fmt = "wild" if format == "wild" else "center" s = seed if (seed and re.fullmatch(r"[A-Za-z0-9_-]{1,32}", seed)) else secrets.token_urlsafe(6) response.headers["Cache-Control"] = "no-store" with get_conn() as conn: return bloom.bloom_free_response(conn, s, fmt) @app.post("/api/bloom/report") def bloom_report(body: BloomReportBody) -> dict: # A player flagging a rejected word as "should count". Public + deduped; # lands in the admin queue (approve→allow / block / dismiss). with get_conn() as conn: ok = bloom.add_report(conn, body.word, body.date, body.mode, body.format, body.letters, body.reason) return {"ok": bool(ok)} @app.post("/api/puzzle/word/guess") def word_guess(body: WordGuessRequest) -> dict: if body.variant not in games.WORD_VARIANTS: raise HTTPException(status_code=404, detail="no such puzzle") with get_conn() as conn: res = games.adjudicate_word_guess(conn, local_today(), body.variant, body.guess, body.n) if "error" in res: raise HTTPException(status_code=400, detail=res["error"]) return res # --- Cross-device game state sync (signed-in only; merged server-side) --- def _game_ok(game: str, variant: str) -> bool: return (game == "word" and variant in games.WORD_VARIANTS) or \ (game == "wordsearch" and variant in games.WS_TIERS) or \ (game == "bloom" and variant == "") or \ (game == "match" and variant in games.MATCH_VARIANTS) # "-" def _valid_pdate(d: str) -> bool: return bool(re.match(r"^\d{4}-\d{2}-\d{2}$", d or "")) # plain YYYY-MM-DD, no junk rows @app.get("/api/games/state") def game_state_get(game: str, variant: str, date: str, request: Request) -> dict: if not _game_ok(game, variant): raise HTTPException(status_code=404, detail="no such game") if not _valid_pdate(date): raise HTTPException(status_code=400, detail="bad date") with get_conn() as conn: user = _current_user(conn, request) if not user: return {"state": None} return {"state": games.load_game_state(conn, user["id"], game, variant, date)} @app.put("/api/games/state") def game_state_put(body: GameStateBody, request: Request) -> dict: if not _game_ok(body.game, body.variant): raise HTTPException(status_code=404, detail="no such game") if not _valid_pdate(body.date): raise HTTPException(status_code=400, detail="bad date") if len(json.dumps(body.state)) > 20000: # a real game state is tiny — reject junk raise HTTPException(status_code=413, detail="state too large") with get_conn() as conn: user = _current_user(conn, request) if not user: return {"state": body.state} # signed out → no sync, just echo merged = games.save_game_state(conn, user["id"], body.game, body.variant, body.date, body.state or {}) return {"state": merged} @app.put("/api/games/state/batch") def game_state_put_batch(body: GameStateBatchBody, request: Request) -> dict: """Reconcile many (game, variant) states for one date in a SINGLE request, so the hub doesn't fan out a dozen calls on every /play load. Each item is validated/sanitized/merged exactly like the single PUT; unknown or oversized items are dropped (not fatal). Signed-out → echo (no sync), same as the single endpoint, so cross-device pull is preserved for signed-in users.""" if not _valid_pdate(body.date): raise HTTPException(status_code=400, detail="bad date") items = [it for it in body.items[:32] if _game_ok(it.game, it.variant) and len(json.dumps(it.state)) <= 20000] with get_conn() as conn: user = _current_user(conn, request) if not user: return {"states": [{"game": it.game, "variant": it.variant, "state": it.state} for it in items]} out = [] for it in items: merged = games.save_game_state(conn, user["id"], it.game, it.variant, body.date, it.state or {}) out.append({"game": it.game, "variant": it.variant, "state": merged}) return {"states": out} @app.get("/api/games/stats") def game_stats_get(game: str, variant: str, request: Request) -> dict: if not _game_ok(game, variant): raise HTTPException(status_code=404, detail="no such game") with get_conn() as conn: user = _current_user(conn, request) return {"stats": games.game_stats(conn, user["id"], game, variant) if user else None} # --- Admin: Daily Word pool curation --- # --- Admin: Bloom word curation (runtime, no deploy) --- @app.get("/api/admin/bloom/reports") def admin_bloom_reports(request: Request, status: str = "pending") -> dict: with get_conn() as conn: _require_admin(conn, request) st = status if status in ("pending", "approved", "blocked", "dismissed") else "pending" return {"status": st, "reports": bloom.list_reports(conn, st), "overrides": bloom.list_overrides(conn)} @app.post("/api/admin/bloom/reports/{report_id}") def admin_bloom_resolve(report_id: int, body: BloomReportActionBody, request: Request) -> dict: with get_conn() as conn: admin = _require_admin(conn, request) ok = bloom.resolve_report(conn, report_id, body.action, by=admin["email"]) if not ok: raise HTTPException(status_code=400, detail="bad report or action") return {"ok": True} @app.post("/api/admin/bloom/overrides") def admin_bloom_override(body: BloomOverrideBody, request: Request) -> dict: with get_conn() as conn: admin = _require_admin(conn, request) ok = bloom.set_override(conn, body.word, body.action, reason=body.reason, by=admin["email"]) if not ok: raise HTTPException(status_code=422, detail="allow needs a real ≥4-letter word with no 'S'; block accepts any word") return {"ok": True} @app.delete("/api/admin/bloom/overrides/{word}") def admin_bloom_override_clear(word: str, request: Request) -> dict: with get_conn() as conn: _require_admin(conn, request) bloom.clear_override(conn, word) return {"ok": True} @app.get("/api/admin/word/lookup") def admin_word_lookup(word: str, request: Request) -> dict: with get_conn() as conn: _require_admin(conn, request) res = games.lookup_word(word) res["in_pool"] = bool(res["variant"]) and res["word"] in games.answer_pool(conn, res["variant"]) res["removed"] = bool(res["variant"]) and res["word"] in games._db_removed(conn, res["variant"]) return res @app.get("/api/admin/word/pool") def admin_word_pool(request: Request) -> dict: with get_conn() as conn: _require_admin(conn, request) return games.pool_summary(conn) @app.post("/api/admin/word/pool") def admin_word_pool_add(body: WordPoolBody, request: Request) -> dict: with get_conn() as conn: _require_admin(conn, request) res = games.add_pool_word(conn, body.word) if "error" in res: raise HTTPException(status_code=400, detail=res["error"]) return {**res, "pool": games.pool_summary(conn)} @app.delete("/api/admin/word/pool/{word}") def admin_word_pool_remove(word: str, request: Request) -> dict: with get_conn() as conn: _require_admin(conn, request) games.remove_pool_word(conn, word) return games.pool_summary(conn) @app.post("/api/admin/word/pool/restore") def admin_word_pool_restore(body: WordPoolBody, request: Request) -> dict: with get_conn() as conn: _require_admin(conn, request) games.restore_pool_word(conn, body.word) return games.pool_summary(conn) @app.post("/api/admin/word/pool/import") def admin_word_pool_import(body: WordPoolImportBody, request: Request) -> dict: with get_conn() as conn: _require_admin(conn, request) words = list(body.words) if body.text: words += re.findall(r"[A-Za-z]+", body.text) res = games.import_pool_words(conn, words) return {**res, "pool": games.pool_summary(conn)} # --- Admin: Word Search theme authoring --- @app.get("/api/admin/wordsearch/themes") def admin_ws_themes(request: Request) -> list[dict]: with get_conn() as conn: _require_admin(conn, request) return games.list_wordsearch_themes(conn) @app.post("/api/admin/wordsearch/themes") def admin_ws_theme_save(body: WordsearchThemeBody, request: Request) -> dict: with get_conn() as conn: _require_admin(conn, request) res = games.save_wordsearch_theme(conn, body.theme, body.words, body.id) if "error" in res: raise HTTPException(status_code=400, detail=res["error"]) return {**res, "themes": games.list_wordsearch_themes(conn)} @app.delete("/api/admin/wordsearch/themes/{tid}") def admin_ws_theme_remove(tid: int, request: Request) -> list[dict]: with get_conn() as conn: _require_admin(conn, request) games.remove_wordsearch_theme(conn, tid) return games.list_wordsearch_themes(conn) @app.post("/api/admin/wordsearch/suggest") def admin_ws_suggest(body: WordsearchSuggestBody, request: Request) -> dict: with get_conn() as conn: _require_admin(conn, request) from .llm import LocalModelClient try: client = LocalModelClient.from_env() except Exception: # noqa: BLE001 client = None res = games.suggest_wordsearch_word(client, body.theme, body.existing) if "error" in res: raise HTTPException(status_code=503, detail=res["error"]) return res @app.get("/api/since", response_model=FeedResponse) def feed_since(ts: str = Query(...), prefs: str | None = Query(None)) -> FeedResponse: # A calm welcome-back cue: accepted/non-dup/visible articles discovered # since the reader's last visit (boundary-respecting). count = how many; # items = a few to show inline. No nagging, no unread state stored. try: norm = ts.replace("Z", "+00:00") dt = datetime.fromisoformat(norm) since = (dt.astimezone(timezone.utc) if dt.tzinfo else dt).strftime("%Y-%m-%d %H:%M:%S") except (ValueError, TypeError): return FeedResponse(topic=None, flavor=None, count=0, items=[]) fp = prefs_from_json(prefs) now = datetime.now(timezone.utc) kw = _prefs_sql_kw(fp, now) with get_conn() as conn: rows = queries.feed(conn, sort="latest", since=since, limit=60, **kw) if fp.avoid_terms: rows = filter_articles(rows, fp, now) rows = sorted(rows, key=lambda r: is_paywalled_for_source(r["canonical_url"], r["paywall_override"])) return FeedResponse(topic=None, flavor=None, count=len(rows), items=[Article.from_row(r) for r in rows[:8]]) @app.get("/api/brief", response_model=BriefResponse) def brief( response: Response, date: str | None = Query(None), limit: int = Query(10, ge=1, le=50), prefs: str | None = Query(None), exclude: str = Query("", description="comma-separated article ids the reader has dismissed"), home: str | None = Query(None, max_length=8, description="local-first highlights: 'US' or 'US-NY'"), scope: str = Query("nearby", pattern="^(nearby|region|country|world)$", description="radius dial"), ) -> BriefResponse: # The default highlights are global (date-keyed, no session) → edge-cacheable # so a new visitor's "Gathering the good news…" resolves from their POP, not # a pull to the residential origin. Personal filters (incl. a home) stay private. home_country = home_state = None if home: parts = home.upper().split("-", 1) home_country = parts[0][:2] or None if home_country == "US" and len(parts) > 1: home_state = parts[1][:2] or None shareable = not prefs and not exclude.strip() and not home_country response.headers["Cache-Control"] = _EDGE_FEED if shareable else _PRIVATE fp = prefs_from_json(prefs) now = datetime.now(timezone.utc) excl = {int(x) for x in exclude.split(",") if x.strip().lstrip("-").isdigit()} with get_conn() as conn: if home_country and scope != "world": # The reader's home + scope dial lead the landing: closest-first good news, # blended outward so it's always a full set. Over-fetch to survive dismissal/ # boundary filtering, then cap to limit. scope='world' = the global brief. meta = queries.brief(conn, brief_date=date, limit=1) data = {"brief_date": meta["brief_date"], "title": "Close to home", "created_at": meta.get("created_at")} pool = queries.home_brief(conn, home_country, home_state, scope=scope, limit=limit + 12) else: data = queries.brief(conn, brief_date=date, limit=limit) pool = data["items"] # Drop dismissed (replaced-away) items and anything the reader's # boundaries hide; avoid-terms take precedence over curation. items = [a for a in pool if a["id"] not in excl and not is_paywalled_for_source(a.get("canonical_url"), a.get("paywall_override"))] if not fp.is_empty(): items = filter_articles(items, fp, now) items = items[:limit] # home mode over-fetches to survive filtering; cap here # Keep the highlights full: if a boundary or a dismissal removed a # story, top up with other readable, boundary-respecting good news # rather than show fewer. (Home mode's home_brief already blends to world.) if len(items) < limit: have = {a["id"] for a in items} | excl pool = queries.feed( conn, accepted_only=True, limit=limit * 5 + 40, offset=0, **_prefs_sql_kw(fp, now) ) for a in filter_articles(pool, fp, now): if len(items) >= limit: break if a["id"] not in have: items.append(a) have.add(a["id"]) # Lead with a gentle, readable story (charged or paywalled stories stay # in the set, just not as the first thing seen). if home_country and scope != "world" and items: # Keep "closest first": pick the gentlest hero from ONLY the closest non-empty # section, so _pick_lead can never float a wider-region/world story above a # local one. Wider tiers stay in their order behind it. lead = items[0].get("__section") head = [a for a in items if a.get("__section") == lead] tail = [a for a in items if a.get("__section") != lead] items = _pick_lead(head) + tail else: items = _pick_lead(items) return BriefResponse( brief_date=data["brief_date"], title=data["title"], generated_at=data.get("created_at"), items=[Article.from_row(r) for r in items], ) @app.get("/api/brief-dates", response_model=list[str]) def brief_dates(limit: int = Query(30, ge=1, le=365)) -> list[str]: with get_conn() as conn: return queries.available_dates(conn, limit=limit) # --- Daily Art (the /art room) ----------------------------------------- @app.get("/api/art/today") def art_today(response: Response) -> dict: with get_conn() as conn: a = art.get_today(conn) if not a: response.headers["Cache-Control"] = _PRIVATE raise HTTPException(status_code=404, detail="No art yet.") response.headers["Cache-Control"] = _EDGE_FEED # one piece a day, same for everyone museums = {"met": "The Met", "aic": "Art Institute of Chicago", "si": "Smithsonian"} return { "date": a["art_date"], "object_id": a["object_id"], "title": a["title"], "artist": a["artist"], "date_text": a["date_text"], "medium": a["medium"], "department": a["department"], "credit": a["credit"], "source_url": a["source_url"], "source": a["source"], "museum": museums.get(a["source"], a["source"]), "is_public_domain": bool(a["is_public_domain"]), "license": "Public Domain (CC0)" if a["is_public_domain"] else None, "blurb": a.get("blurb"), "palette": json.loads(a["palette"]) if a.get("palette") else [], "image_url": f"/api/art/image/{a['object_id']}", "image_url_large": f"/api/art/image/{a['object_id']}?size=full", } @app.api_route("/api/img/{article_id}", methods=["GET", "HEAD"]) def article_image(article_id: int) -> FileResponse: """Serve the locally-cached, downscaled WebP for an accepted, canonical article. Cache HITS ONLY — this never fetches (the cycle owns all fetching via newsimg.warm), so the public endpoint has no SSRF/worker-exhaustion surface. A miss 404s; the frontend handles that (retry, then the typographic cover).""" with get_conn() as conn: row = conn.execute( "SELECT a.image_url FROM articles a JOIN article_scores s ON s.article_id = a.id " "JOIN sources src ON src.id = a.source_id " "WHERE a.id = ? AND s.accepted = 1 AND a.duplicate_of IS NULL AND src.image_policy = 'cache'", (article_id,), ).fetchone() url = row["image_url"] if row else None path = newsimg.path_for(url) if url else None if not path: raise HTTPException(status_code=404, detail="image not cached") # NOT immutable: a re-enriched article can change image_url, so the bytes behind a # given id can change. A day's browser cache is plenty (we're direct-origin, no CDN). return FileResponse(str(path), media_type="image/webp", headers={"Cache-Control": "public, max-age=86400"}) @app.api_route("/api/art/image/{object_id}", methods=["GET", "HEAD"]) def art_image(object_id: int, size: str = Query("")) -> FileResponse: cdir = art.cache_dir() matches = sorted(cdir.glob(f"{object_id}-full.*")) if size == "full" else [] if not matches: # fall back to the web-large copy matches = sorted(cdir.glob(f"{object_id}.*")) if not matches: raise HTTPException(status_code=404, detail="Not cached.") # Cached museum image: immutable for a given object id. return FileResponse(str(matches[0]), headers={"Cache-Control": "public, max-age=31536000, immutable"}) @app.get("/api/onthisday/today") def onthisday_today(response: Response) -> dict: with get_conn() as conn: a = onthisday.get_today(conn) if not a: response.headers["Cache-Control"] = _PRIVATE raise HTTPException(status_code=404, detail="No fact yet.") response.headers["Cache-Control"] = _EDGE_FEED # one fact a day, same for everyone return { "date": a["feature_date"], "year": a["year"], "text": a["text"], "summary": a["summary"], "image_url": a["image_url"], "source_url": a["page_url"], "source": a["source"], } @app.get("/api/quote/today") def quote_today(response: Response) -> dict: with get_conn() as conn: q = quote.get_today(conn) if not q: response.headers["Cache-Control"] = _PRIVATE raise HTTPException(status_code=404, detail="No quote yet.") response.headers["Cache-Control"] = _EDGE_FEED return {"date": q["feature_date"], "text": q["text"], "author": q["author"], "work": q["work"], "year": q["year"], "meaning": q["meaning"], "source": q["source"]} @app.get("/api/word/today") def word_today(response: Response) -> dict: with get_conn() as conn: w = wotd.get_today(conn) if not w: response.headers["Cache-Control"] = _PRIVATE raise HTTPException(status_code=404, detail="No word yet.") response.headers["Cache-Control"] = _EDGE_FEED # Prefer the LLM-polished gloss + everyday sentences; fall back to the raw dictionary. raw_examples = w.get("usage") or w.get("examples") try: examples = json.loads(raw_examples) if raw_examples else [] except (ValueError, TypeError): examples = [] return {"date": w["feature_date"], "word": w["word"], "part_of_speech": w["part_of_speech"], "phonetic": w["phonetic"], "definition": w.get("gloss") or w["definition"], "examples": examples, "audio_url": f"/api/word/audio/{w['word']}" if w["audio_file"] else None} @app.api_route("/api/word/audio/{word}", methods=["GET", "HEAD"]) def word_audio(word: str) -> FileResponse: matches = sorted(wotd.cache_dir().glob(f"{word.lower()}.*")) matches = [m for m in matches if not m.name.startswith(".")] if not matches: raise HTTPException(status_code=404, detail="No audio.") return FileResponse(str(matches[0]), headers={"Cache-Control": "public, max-age=31536000, immutable"}) # --- Small-joys admin: manage the WOTD / QOTD / On-This-Day pools ---------------- _JOY_TABLES = {"onthisday": "onthisday_pool", "quote": "quote_pool", "word": "wotd_pool"} _JOY_DAILY = {"onthisday": "daily_onthisday", "quote": "daily_quote", "word": "daily_wotd"} _JOY_MODULES = {"onthisday": onthisday, "quote": quote, "word": wotd} _JOY_EDITABLE = { # whitelist of editable columns "onthisday": {"text", "summary", "year"}, "quote": {"text", "author", "work", "year", "meaning"}, "word": {"definition", "part_of_speech", "phonetic"}, } @app.get("/api/admin/joys/{kind}") def admin_joys_list(kind: str, request: Request, limit: int = Query(300, ge=1, le=2000)) -> list[dict]: table = _JOY_TABLES.get(kind) if not table: raise HTTPException(status_code=404, detail="Unknown joy.") with get_conn() as conn: _require_admin(conn, request) rows = conn.execute(f"SELECT * FROM {table} ORDER BY featured DESC, id DESC LIMIT ?", (limit,)).fetchall() return [dict(r) for r in rows] @app.post("/api/admin/joys/{kind}/items/{item_id}") # /items/ so 'add'/'repick' don't parse as an id def admin_joys_mutate(kind: str, item_id: int, body: JoyAction, request: Request) -> dict: table = _JOY_TABLES.get(kind) if not table: raise HTTPException(status_code=404, detail="Unknown joy.") with get_conn() as conn: _require_admin(conn, request) a = body.action if a in ("block", "unblock"): conn.execute(f"UPDATE {table} SET blocked=? WHERE id=?", (1 if a == "block" else 0, item_id)) elif a in ("feature", "unfeature"): conn.execute(f"UPDATE {table} SET featured=? WHERE id=?", (1 if a == "feature" else 0, item_id)) elif a == "delete": conn.execute(f"DELETE FROM {table} WHERE id=?", (item_id,)) elif a == "edit": cols = _JOY_EDITABLE[kind] & set((body.fields or {}).keys()) if cols: sets = ", ".join(f"{c}=?" for c in cols) conn.execute(f"UPDATE {table} SET {sets} WHERE id=?", (*(body.fields[c] for c in cols), item_id)) else: raise HTTPException(status_code=400, detail="Unknown action.") conn.commit() return {"ok": True} @app.post("/api/admin/joys/{kind}/add") def admin_joys_add(kind: str, body: JoyAdd, request: Request) -> dict: with get_conn() as conn: _require_admin(conn, request) if kind == "quote": if not body.text: raise HTTPException(status_code=400, detail="text required") conn.execute("INSERT OR IGNORE INTO quote_pool (source, ckey, text, author, work) VALUES ('admin',?,?,?,?)", (daily.content_key(body.text, body.author), body.text, body.author, body.work)) elif kind == "onthisday": if not body.text or not body.md: raise HTTPException(status_code=400, detail="md + text required") conn.execute("INSERT OR IGNORE INTO onthisday_pool (source, md, year, ckey, text, summary, image_url, page_url) " "VALUES ('admin',?,?,?,?,?,?,?)", (body.md, body.year, daily.content_key(body.md, body.year, body.text), body.text, body.summary, body.image_url, body.page_url)) elif kind == "word": if not body.word: raise HTTPException(status_code=400, detail="word required") info = wotd._lookup(body.word.strip().lower()) # network up front if not info: raise HTTPException(status_code=400, detail="Word not found in dictionary.") audio_file = wotd._cache_audio(info["audio_url"], info["word"]) polished = wotd._polish(LocalModelClient.from_env(), info["word"], info["part_of_speech"], info["definition"]) gloss = polished["gloss"] if polished else None usage = json.dumps(polished["examples"]) if polished else None conn.execute("INSERT OR IGNORE INTO wotd_pool (source, word, part_of_speech, phonetic, audio_file, audio_url, definition, examples, gloss, usage) " "VALUES ('admin',?,?,?,?,?,?,?,?,?)", (info["word"], info["part_of_speech"], info["phonetic"], audio_file, info["audio_url"], info["definition"], json.dumps(info["examples"]), gloss, usage)) else: raise HTTPException(status_code=404, detail="Unknown joy.") conn.commit() return {"ok": True} @app.post("/api/admin/joys/{kind}/repick") def admin_joys_repick(kind: str, request: Request) -> dict: mod = _JOY_MODULES.get(kind) if not mod: raise HTTPException(status_code=404, detail="Unknown joy.") with get_conn() as conn: _require_admin(conn, request) cur = conn.execute( f"SELECT pool_id FROM {_JOY_DAILY[kind]} WHERE feature_date=?", (local_today(),) ).fetchone() avoid = cur["pool_id"] if cur else None # force a DIFFERENT item, not the same one kwargs = {"force": True, "avoid": avoid} if kind in ("quote", "word"): # these polish lazily (gloss / meaning) kwargs["client"] = LocalModelClient.from_env() picked = mod.pick_daily(conn, **kwargs) return {"ok": True, "picked": bool(picked)} @app.get("/api/replacement", response_model=Article | None) def replacement( exclude: str = Query("", description="comma-separated article ids already shown"), prefs: str | None = Query(None), avoid_paywall: bool = True, gentle: bool = Query(False, description="also require lead-safe (for replacing the hero)"), ) -> Article | None: # Swap a read or paywalled item for the next-best one the reader can # actually open. The client merges any active mood into `prefs` (same as # the feed), so this needs no mood param. fp = prefs_from_json(prefs) excl = {int(x) for x in exclude.split(",") if x.strip().lstrip("-").isdigit()} now = datetime.now(timezone.utc) kw = dict( include_topics=fp.include_topics or None, include_flavors=fp.include_flavors or None, mute_topics=list(fp.muted_topics(now)) or None, mute_flavors=list(fp.muted_flavors(now)) or None, max_cortisol=fp.max_cortisol, max_ragebait=fp.max_ragebait, ) with get_conn() as conn: rows = queries.feed(conn, accepted_only=True, limit=120, offset=0, **kw) for r in filter_articles(rows, fp, now): if r["id"] in excl: continue if avoid_paywall and is_paywalled_for_source(r["canonical_url"], r["paywall_override"]): continue if gentle and not safe_to_lead(r): continue return Article.from_row(r) return None @app.get("/api/candidates", response_model=list[Candidate]) def candidates(status: str | None = Query(None)) -> list[Candidate]: from .sources import list_candidates with get_conn() as conn: rows = list_candidates(conn, status=status) out = [] for r in rows: d = dict(r) pj = d.pop("preview_json", None) d["preview"] = json.loads(pj) if pj else None out.append(Candidate(**d)) return out @app.get("/api/source-preview", response_model=SourcePreview) def source_preview( url: str = Query(..., max_length=2048), sample: int = Query(25, ge=1, le=50), classify: bool = Query(False, description="Also classify with the local model (accurate but slower)"), ) -> SourcePreview: # Read-only sample scoring; nothing is persisted. Only http(s) is allowed. # NOTE: fetching a user-supplied URL is an SSRF surface — before exposing # this publicly, also block private/loopback/link-local address ranges. if not re.match(r"^https?://", url, re.IGNORECASE): raise HTTPException(400, "url must start with http:// or https://") client = LocalModelClient.from_env() if classify else None try: data = feeds.preview_feed(url, sample=sample, client=client) except Exception as exc: raise HTTPException(502, f"could not preview feed: {exc}") return SourcePreview(**data) # Static site last, mounted at root, so /api/* and /healthz win. if STATIC_DIR.is_dir(): app.mount("/", StaticFiles(directory=str(STATIC_DIR), html=True), name="site") return app app = create_app()