Files
thejayman77 2dc4419024 images/analytics: purge on policy revoke + engagement warm-up note (Codex close-out)
- newsimg.purge_source(): when a source leaves 'cache' (permission revoked / re-classified),
  the admin image-policy endpoint now deletes that source's re-hosted copies immediately,
  rather than leaving them inaccessible-but-on-disk. Endpoint returns {purged}.
- Admin "Engaged readers" carries a warm-up note: tracking began 2026-06-30, so low
  rolling windows are partly warm-up, not all bots (compare d7 after a week, the window
  after its full span). Guards against misreading "6 engaged vs 135 visits" as 129 bots.
Tests: purge_source removes only the target source's copies; endpoint reports purged.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-30 14:29:55 -04:00

2620 lines
120 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""FastAPI service for goodNews.
A read-only JSON API over the ingestion database, plus a small static site that
consumes it. The same endpoints back both the website and any future companion
app; the auto-generated OpenAPI docs at /docs are that shared contract.
Run with the bundled CLI: goodnews serve
Or directly: uvicorn goodnews.api:app --host 0.0.0.0 --port 8000
The database path comes from GOODNEWS_DB (falling back to the repo's data dir),
so the API and CLI always read the same file.
"""
from __future__ import annotations
import csv
import hashlib
import hmac
import io
import json
import logging
import os
import re
import secrets
import sqlite3
import threading
import time
from collections import Counter
from contextlib import contextmanager
from datetime import datetime, timezone
from pathlib import Path
from fastapi import BackgroundTasks, FastAPI, HTTPException, Query, Request, Response
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, HTMLResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from . import art, auth, bloom, daily, email_send, feeds, games, newsimg, oauth_google, onthisday, publishing, queries, quote, readtime, share, sources, summarize, wotd
from .localtime import local_today
from .markup import reply_html_to_text, sanitize_reply_html
from .db import connect
from .filters import filter_articles, prefs_from_json
from .hero import safe_to_lead
from .llm import LocalModelClient
from .moods import MOODS, mood_filter
from .lanes import build_lane_pool
from .paywall import is_paywalled, is_paywalled_for_source
from .taxonomy import FAMILIES, FLAVORS, TOPICS
# Edge-cache directives for GLOBAL endpoints — responses that depend only on the
# URL, never the session/user, so Cloudflare may safely serve one visitor's copy
# to another (this is what makes "Gathering the good news…" resolve from the edge
# instead of a round-trip to the residential origin). The personalization
# boundary is hard: anything session- or filter-specific stays private/no-store.
_EDGE_CONFIG = "public, max-age=0, s-maxage=900, stale-while-revalidate=600" # static config (moods/categories)
_EDGE_DERIVED = "public, max-age=0, s-maxage=120, stale-while-revalidate=120" # global, data-derived (lanes/families)
_EDGE_FEED = "public, max-age=0, s-maxage=45, stale-while-revalidate=30" # global feed (URL-keyed, shareable only)
_PRIVATE = "private, no-store" # never share across users
log = logging.getLogger("goodnews.api")
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_DB = ROOT / "data" / "goodnews.sqlite3"
# Prefer the built SvelteKit site; fall back to the legacy single-page harness.
FRONTEND_DIR = ROOT / "frontend" / "build"
LEGACY_STATIC = Path(__file__).resolve().parent / "static"
STATIC_DIR = FRONTEND_DIR if FRONTEND_DIR.is_dir() else LEGACY_STATIC
def db_path() -> Path:
return Path(os.environ.get("GOODNEWS_DB", str(DEFAULT_DB)))
# --- Auth helpers -----------------------------------------------------------
PUBLIC_BASE_URL = os.environ.get("GOODNEWS_PUBLIC_BASE_URL", "https://upbeatbytes.com").rstrip("/")
SESSION_COOKIE = "ub_session"
OAUTH_COOKIE = "ub_oauth"
SESSION_MAX_AGE = int(auth.SESSION_TTL.total_seconds())
SESSION_SECRET = os.environ.get("GOODNEWS_SESSION_SECRET", "dev-insecure-secret")
# Emails that are always admins (normalized), in addition to users.is_admin.
ADMIN_EMAILS = {e.strip().lower() for e in os.environ.get("GOODNEWS_ADMIN_EMAILS", "").split(",") if e.strip()}
# Secure cookies in production (https); off for http (local/test) so they round-trip.
_COOKIE_SECURE = PUBLIC_BASE_URL.startswith("https")
_EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$")
def _sign(value: str) -> str:
sig = hmac.new(SESSION_SECRET.encode(), value.encode(), hashlib.sha256).hexdigest()
return f"{value}.{sig}"
def _unsign(signed: str | None) -> str | None:
if not signed or "." not in signed:
return None
value, _, sig = signed.rpartition(".")
expected = hmac.new(SESSION_SECRET.encode(), value.encode(), hashlib.sha256).hexdigest()
return value if hmac.compare_digest(sig, expected) else None
def _google_redirect_uri() -> str:
return f"{PUBLIC_BASE_URL}/api/auth/google/callback"
def _session_token_from_request(request: Request) -> str | None:
"""Web sends the session as an httpOnly cookie; the app sends a bearer token."""
cookie = request.cookies.get(SESSION_COOKIE)
if cookie:
return cookie
authz = request.headers.get("Authorization", "")
return authz[7:].strip() if authz.startswith("Bearer ") else None
def _current_user(conn: sqlite3.Connection, request: Request) -> sqlite3.Row | None:
user = auth.resolve_session(conn, _session_token_from_request(request))
if user:
conn.commit() # persist the last_seen touch
return user
def _require_user(conn: sqlite3.Connection, request: Request) -> sqlite3.Row:
user = _current_user(conn, request)
if not user:
raise HTTPException(status_code=401, detail="Sign in to do that.")
return user
def _is_admin(user: sqlite3.Row) -> bool:
return bool(user["is_admin"]) or auth.normalize_email(user["email"]) in ADMIN_EMAILS
def _require_admin(conn: sqlite3.Connection, request: Request) -> sqlite3.Row:
user = _require_user(conn, request)
if not _is_admin(user):
raise HTTPException(status_code=403, detail="Admins only.")
return user
def _user_out(user: sqlite3.Row) -> dict:
return {
"id": user["id"],
"email": user["email"],
"display_name": user["display_name"],
"avatar_url": user["avatar_url"],
"is_admin": _is_admin(user),
"digest_enabled": bool(user["digest_enabled"]),
}
# Articles whose summary is being generated right now — so concurrent pollers /
# scrapers don't each kick off a duplicate LLM call.
_summarizing: set[int] = set()
# In-process cache of fully-rendered /a/{id} share pages. We're direct-origin (no
# CDN), so Cache-Control alone can't shield the box from crawler bursts hitting the
# sitemap's article URLs while the LAN LLM / cycle is loading it. Only COMPLETE
# pages (summary + explanation present) are cached, so a "still generating" page is
# never pinned; a short TTL still picks up edits. Per-process (fine across workers).
# INVARIANT: the share page is PUBLIC/anonymous — the cache key is article_id alone.
# If /a/{id} ever personalizes (per-viewer content), key by viewer or drop the cache,
# or one visitor's variant would be served to another.
_SHARE_CACHE: dict[int, tuple[float, str]] = {}
_SHARE_TTL = 900.0 # 15 min
_SHARE_CACHE_MAX = 512
def _share_cache_get(aid: int) -> str | None:
hit = _SHARE_CACHE.get(aid)
if hit and (time.monotonic() - hit[0]) < _SHARE_TTL:
return hit[1]
return None
def _share_cache_put(aid: int, html: str) -> None:
if len(_SHARE_CACHE) >= _SHARE_CACHE_MAX:
oldest = min(_SHARE_CACHE, key=lambda k: _SHARE_CACHE[k][0])
_SHARE_CACHE.pop(oldest, None)
_SHARE_CACHE[aid] = (time.monotonic(), html)
def _run_summary(article_id: int) -> None:
try:
with get_conn() as conn:
summarize.generate_summary(conn, article_id)
except Exception:
pass
finally:
_summarizing.discard(article_id)
# Publishing Desk: the "Build queue" job runs in the background (one bounded
# comparative LLM call can be slow); the admin polls the queue endpoint. Mirrors the
# summary-kick pattern — never holds an HTTP request open on the model. The lock makes
# the check-and-set atomic so two rapid clicks can't launch two expensive jobs.
_publish_build: dict = {"building": False, "result": None, "error": None}
_publish_build_lock = threading.Lock()
def _run_publish_build() -> None:
try:
try:
client = LocalModelClient.from_env()
except Exception: # noqa: BLE001 — model down → deterministic fallback inside build_queue
client = None
with get_conn() as conn:
res = publishing.build_queue(conn, PUBLIC_BASE_URL, client=client)
_publish_build.update(result=res, error=None)
except Exception as exc: # noqa: BLE001 — surface, don't crash the worker
_publish_build.update(error=str(exc)[:300])
finally:
_publish_build["building"] = False
def _kick_summary(article_id: int, background_tasks: BackgroundTasks) -> None:
if article_id in _summarizing:
return
_summarizing.add(article_id)
background_tasks.add_task(_run_summary, article_id)
def _feedback_email_safe(addr: str, category: str, message: str, contact: str | None, who: str) -> None:
try:
email_send.send_feedback(addr, category, message, contact, who)
except Exception:
pass
def _send_link_safe(email: str, link: str) -> None:
"""Send the magic link, swallowing failures (runs off the request path)."""
try:
email_send.send_magic_link(email, link)
except Exception:
pass # don't crash the worker; never surfaced to the caller anyway
def _set_session_cookie(response: Response, token: str) -> None:
response.set_cookie(
SESSION_COOKIE, token, max_age=SESSION_MAX_AGE,
httponly=True, secure=_COOKIE_SECURE, samesite="lax", path="/",
)
@contextmanager
def get_conn():
conn = connect(db_path())
try:
yield conn
finally:
conn.close()
def _prefs_sql_kw(fp, now) -> dict:
"""Categorical prefs → queries.feed keyword filters (avoid-terms stay Python)."""
return dict(
include_topics=fp.include_topics or None,
include_flavors=fp.include_flavors or None,
mute_topics=list(fp.muted_topics(now)) or None,
mute_flavors=list(fp.muted_flavors(now)) or None,
max_cortisol=fp.max_cortisol,
max_ragebait=fp.max_ragebait,
)
def _pick_lead(items: list[dict]) -> list[dict]:
"""Lead with a gentle, readable, ideally illustrated story.
Preference order: gentle + readable + has an image, then gentle + readable,
then gentle, then leave the order alone. Charged/paywalled/imageless stories
still appear in the set — they just don't lead.
"""
def gentle(a: dict) -> bool:
return safe_to_lead(a) and not is_paywalled_for_source(a.get("canonical_url"), a.get("paywall_override"))
for ok in (
lambda a: gentle(a) and bool(a.get("image_url")),
gentle,
safe_to_lead,
):
for i, a in enumerate(items):
if ok(a):
return items if i == 0 else [a, *items[:i], *items[i + 1:]]
return items
# --- Response models (the companion-app contract) ---------------------------
class Category(BaseModel):
key: str
description: str
class CategoriesResponse(BaseModel):
topics: list[Category]
flavors: list[Category]
class CategoryCount(BaseModel):
topic: str | None
flavor: str | None
count: int
class Article(BaseModel):
id: int
title: str
description: str | None = None
url: str
image_url: str | None = None
published_at: str | None = None
source: str
source_id: int | None = None
topic: str | None = None
flavor: str | None = None
accepted: bool
rank_score: int | None = None
reason_code: str | None = None
reason_text: str | None = None
model_name: str | None = None
rank: int | None = None # position within a brief, when applicable
paywalled: bool = False
tags: list[str] = []
summary: str | None = None # our own cached summary (present on the brief)
source_read_minutes: int | None = None # ~minutes to read the FULL source article (null = unknown)
# Subject geography (present on feed rows; absent/empty on the brief). breadth is
# locality|regional|national|multinational|global|unknown; places are ISO codes.
geo_breadth: str | None = None
geo_confidence: str | None = None
geo_places: list[dict] = [] # e.g. [{"country": "US", "state": "NY"}, {"country": "GB", "state": None}]
section: str | None = None # 'near' | 'country' | 'world' when a home is set (Closer to Home)
@classmethod
def from_row(cls, row: dict) -> "Article":
raw_tags = row.get("tags")
places = []
for tok in (row.get("geo_places") or "").split(","):
tok = tok.strip()
if not tok:
continue
cc, _, sc = tok.partition("-")
places.append({"country": cc, "state": sc or None})
return cls(
section=row.get("__section"),
geo_breadth=row.get("geo_breadth"),
geo_confidence=row.get("geo_confidence"),
geo_places=places,
summary=row.get("summary"),
source_read_minutes=readtime.source_read_minutes(row.get("source_words")),
id=row["id"],
title=row["title"],
description=row.get("description"),
url=row["canonical_url"],
# Resolve per the source's image policy: our cached copy, the publisher's URL
# (hotlink), or none — so we never re-host an image we haven't cleared.
image_url=newsimg.display_url(row["id"], row.get("image_policy"), row.get("image_url")),
published_at=row.get("published_at"),
source=row["source_name"],
source_id=row.get("source_id"),
topic=row.get("topic"),
flavor=row.get("flavor"),
accepted=bool(row.get("accepted")),
rank_score=row.get("rank_score"),
reason_code=row.get("reason_code"),
reason_text=row.get("reason_text"),
model_name=row.get("model_name"),
rank=row.get("rank"),
paywalled=is_paywalled_for_source(row.get("canonical_url"), row.get("paywall_override")),
tags=[t for t in (raw_tags.split(",") if raw_tags else []) if t],
)
class FeedResponse(BaseModel):
topic: str | None
flavor: str | None
count: int
items: list[Article]
next_offset: int | None = None # world-tier offset for the next page (Closer to Home paging)
class BriefResponse(BaseModel):
brief_date: str | None
title: str | None
generated_at: str | None = None # freshness stamp: changes only when content changes
items: list[Article]
class RejectedExample(BaseModel):
title: str
reason: str
class JoyAction(BaseModel):
action: str # block | unblock | feature | unfeature | delete | edit
fields: dict | None = None # for edit: {column: value}
class JoyAdd(BaseModel):
text: str | None = None # quote / onthisday
author: str | None = None # quote
work: str | None = None # quote
md: str | None = None # onthisday 'MM-DD'
year: int | None = None # onthisday
summary: str | None = None # onthisday
image_url: str | None = None # onthisday
page_url: str | None = None # onthisday
word: str | None = None # word
class Candidate(BaseModel):
id: int
feed_url: str
homepage_url: str | None = None
name: str | None = None
status: str
preview: dict | None = None
notes: str | None = None
last_previewed_at: str | None = None
created_at: str | None = None
updated_at: str | None = None
class SourcePreview(BaseModel):
url: str
sampled: int
classified: bool
accepted: int
acceptance_rate: float | None # None when there are no English items to judge (all held)
avg_cortisol: float
avg_ragebait: float
avg_pr_risk: float
newest_published: str | None
recent_7d: int
topic_mix: dict[str, int]
flavor_mix: dict[str, int]
examples_accepted: list[str]
examples_rejected: list[RejectedExample]
class WordGuessRequest(BaseModel):
variant: str = "5"
guess: str
n: int = 1 # this guess's position (1-based); the answer is revealed only at n >= max
class GameStateBody(BaseModel):
game: str
variant: str
date: str
state: dict = {}
class PublishStatusBody(BaseModel):
status: str
draft_text: str | None = None
final_text: str | None = None
post_url: str | None = None
snooze_until: str | None = None
class PublishDraftBody(BaseModel):
draft_text: str = ""
class EntityHandleBody(BaseModel):
entity_name: str
handle: str
profile_url: str | None = None
class GameStateItem(BaseModel):
game: str
variant: str
state: dict = {}
class GameStateBatchBody(BaseModel):
date: str
items: list[GameStateItem] = []
class BloomReportBody(BaseModel):
word: str = ""
date: str | None = None
mode: str | None = None
format: str | None = None
letters: str | None = None
reason: str | None = None
class BloomOverrideBody(BaseModel):
word: str = ""
action: str = "allow" # 'allow' | 'block'
reason: str | None = None
class BloomReportActionBody(BaseModel):
action: str = "" # 'approve' | 'block' | 'dismiss'
class WordPoolBody(BaseModel):
word: str
class WordPoolImportBody(BaseModel):
text: str = ""
words: list[str] = []
class ClientErrorBody(BaseModel):
reason: str = ""
path: str = ""
version: str = ""
class WordsearchThemeBody(BaseModel):
theme: str
words: list[str] = []
id: int | None = None
class WordsearchSuggestBody(BaseModel):
theme: str
existing: list[str] = []
class EmailStartRequest(BaseModel):
email: str
class TokenVerifyRequest(BaseModel):
token: str
class UserOut(BaseModel):
id: int
email: str
display_name: str | None = None
avatar_url: str | None = None
is_admin: bool = False
digest_enabled: bool = False
class SessionOut(BaseModel):
user: UserOut
token: str # for non-browser (app) clients; the web SPA uses the cookie
class IdsBody(BaseModel):
ids: list[int] = []
class ImportBody(BaseModel):
seen: list[int] = []
saved: list[int] = []
class PrefsBody(BaseModel):
prefs: dict = {}
class EventBody(BaseModel):
kind: str
article_id: int | None = None
visitor: str | None = None
class FeedbackBody(BaseModel):
category: str = "other"
message: str = ""
email: str | None = None
visitor: str | None = None
hp: str | None = None # honeypot — bots fill it, humans don't
class FeedbackReadBody(BaseModel):
read: bool = True
class FeedbackReplyBody(BaseModel):
html: str = "" # raw editor HTML — sanitized server-side before use
class SourceStatusBody(BaseModel):
status: str = "active" # active | paused | retired
class SourceVisibilityBody(BaseModel):
visible: bool = True
class SourcePaywallBody(BaseModel):
override: str | None = None # None = use domain rule · 'free' · 'paywalled'
class SourceImagePolicyBody(BaseModel):
policy: str = "remote" # 'cache' · 'remote' (default) · 'none'
class CandidateSuggestBody(BaseModel):
feed_url: str = ""
name: str | None = None
class CandidatePromoteBody(BaseModel):
default_category: str | None = None
active: bool = False # promote-as-paused by default; opt in to activate
trust_score: int = 5
pr_risk_score: int = 3
poll_interval_minutes: int = 180
class CandidateRenameBody(BaseModel):
name: str = ""
class DigestBody(BaseModel):
enabled: bool = True
class FollowBody(BaseModel):
kind: str # 'source' | 'tag'
value: str # source id (as text) or tag key
class SourceReviewBody(BaseModel):
flag: bool = False
reason: str | None = None
_FEEDBACK_CATEGORIES = {"idea", "concern", "bug", "praise", "other"}
# The only event kinds we record. All aggregate, non-personal.
# Per-game funnel events (article_id is reused as 0 — no article dimension). Per-game
# kinds (not a generic "game_started") so the admin kind-count breakdown shows which
# game drives play and, crucially, shares — the growth loop we're instrumenting.
_GAME_NAMES = ("word", "wordsearch", "bloom", "match")
# arrival = landed on the game via a shared link (utm_source=game_share) — the share
# loop's acquisition signal; started/completed/shared are the engagement funnel.
_GAME_EVENT_KINDS = {f"{g}_{e}" for g in _GAME_NAMES for e in ("started", "completed", "shared", "arrival")}
_EVENT_KINDS = {
"visit", "open", "summary_viewed", "full_story", "source_click",
"share_ub", "copy_source", "native_share",
"not_today", "less_like_this", "hide_topic",
"replace_used", "replace_none", "paywall_replace", "paywalled_source_open",
"engaged", # genuine engagement: ~8s visible + a real gesture (vs. a raw visit)
"client_error", # boot-failure seatbelt beacon (blank-screen risk signal)
} | _GAME_EVENT_KINDS
def _fts_query(q: str) -> str:
"""Raw search box → safe FTS5 query: alnum terms only (no operator/quote
injection), each prefix-matched and AND'd together. '' when nothing usable."""
terms = re.findall(r"[A-Za-z0-9]+", q or "")[:8]
return " ".join(f"{t}*" for t in terms)
def _visitor_hash(token: str | None) -> str:
token = (token or "").strip()[:200]
if not token:
return ""
return hashlib.sha256(f"{SESSION_SECRET}:{token}".encode()).hexdigest()
# --- App --------------------------------------------------------------------
def create_app() -> FastAPI:
app = FastAPI(
title="goodNews API",
version="0.1.0",
description="Constructive, uplifting news — metadata and links only.",
)
# The website and companion app may live on other origins; allow them.
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["GET", "POST"],
allow_headers=["*"],
)
@app.get("/healthz")
def healthz() -> dict:
# Read-only: the schema is owned by the ingestion CLI, so the API never
# writes (it can run as a read-only replica against a shared DB).
try:
with get_conn() as conn:
scored = conn.execute("SELECT COUNT(*) FROM article_scores").fetchone()[0]
except sqlite3.Error:
scored = 0
return {"status": "ok", "scored_articles": scored}
# --- Auth: passwordless magic link (Google added in Phase 2) ----------
@app.post("/api/auth/email/start")
def auth_email_start(body: EmailStartRequest, background_tasks: BackgroundTasks) -> dict:
email = auth.normalize_email(body.email)
if not _EMAIL_RE.match(email):
raise HTTPException(status_code=422, detail="Please enter a valid email address.")
link = None
with get_conn() as conn:
# Light abuse guard: cap recent tokens per address (still reply OK).
recent = conn.execute(
"SELECT COUNT(*) FROM login_tokens WHERE email = ? "
"AND created_at > datetime('now', '-10 minutes')",
(email,),
).fetchone()[0]
if recent < 5:
raw = auth.create_login_token(conn, email)
conn.commit()
link = f"{PUBLIC_BASE_URL}/auth/verify?token={raw}"
# Hand the (slow) SMTP send to a background task so the request returns
# immediately. Reply is always identical (no account enumeration).
if link:
background_tasks.add_task(_send_link_safe, email, link)
return {"ok": True}
@app.post("/api/auth/email/verify", response_model=SessionOut)
def auth_email_verify(body: TokenVerifyRequest, request: Request, response: Response) -> SessionOut:
with get_conn() as conn:
email = auth.consume_login_token(conn, body.token)
if not email:
conn.commit()
raise HTTPException(status_code=400, detail="This sign-in link is invalid or has expired.")
user_id = auth.find_or_create_user(conn, email, "email", email)
token = auth.create_session(conn, user_id, user_agent=request.headers.get("User-Agent"))
conn.commit()
user = auth.get_user(conn, user_id)
_set_session_cookie(response, token)
return SessionOut(user=UserOut(**_user_out(user)), token=token)
@app.get("/api/auth/me", response_model=UserOut | None)
def auth_me(request: Request) -> UserOut | None:
with get_conn() as conn:
user = _current_user(conn, request)
return UserOut(**_user_out(user)) if user else None
@app.post("/api/account/digest")
def account_digest(body: DigestBody, request: Request) -> dict:
with get_conn() as conn:
user = _current_user(conn, request)
if not user:
raise HTTPException(status_code=401, detail="sign in required")
token = user["digest_unsub_token"]
if body.enabled and not token:
token = secrets.token_urlsafe(18)
conn.execute(
"UPDATE users SET digest_enabled = ?, digest_unsub_token = ? WHERE id = ?",
(1 if body.enabled else 0, token, user["id"]),
)
conn.commit()
return {"ok": True, "digest_enabled": body.enabled}
def _do_unsubscribe(u: int, t: str) -> bool:
with get_conn() as conn:
row = conn.execute("SELECT digest_unsub_token FROM users WHERE id = ?", (u,)).fetchone()
if row and row["digest_unsub_token"] and hmac.compare_digest(row["digest_unsub_token"], t):
conn.execute("UPDATE users SET digest_enabled = 0 WHERE id = ?", (u,))
conn.commit()
return True
return False
@app.post("/api/digest/unsubscribe")
def digest_unsubscribe_oneclick(u: int = Query(...), t: str = Query(...)) -> dict:
# RFC 8058 one-click: the mailbox provider POSTs here; just do it + 200.
_do_unsubscribe(u, t)
return {"ok": True}
@app.get("/api/digest/unsubscribe", response_class=HTMLResponse)
def digest_unsubscribe(u: int = Query(...), t: str = Query(...)) -> HTMLResponse:
# One-click, no login: match the per-user token, then turn the digest off.
ok = _do_unsubscribe(u, t)
msg = (
"Youre unsubscribed from the daily digest. No hard feelings — "
"upbeatBytes is always here when you want it."
if ok else
"That unsubscribe link looks invalid or expired. You can manage the "
"digest from your account settings."
)
html = (
'<!doctype html><meta charset="utf-8"><meta name="viewport" content="width=device-width,initial-scale=1">'
'<div style="max-width:520px;margin:12vh auto;padding:0 24px;text-align:center;'
'font-family:-apple-system,Segoe UI,Roboto,Helvetica,Arial,sans-serif;color:#16263a">'
'<h1 style="font-size:22px">upbeatBytes</h1>'
f'<p style="font-size:16px;line-height:1.5;color:#3b4754">{msg}</p>'
'<p><a href="/" style="color:#0083ad;text-decoration:none">← Back to upbeatBytes</a></p></div>'
)
return HTMLResponse(html)
@app.post("/api/auth/logout")
def auth_logout(request: Request, response: Response) -> dict:
with get_conn() as conn:
auth.revoke_session(conn, _session_token_from_request(request))
conn.commit()
response.delete_cookie(SESSION_COOKIE, path="/")
return {"ok": True}
# --- Auth: Google (OAuth 2.0 / OIDC) ----------------------------------
@app.get("/api/auth/google/start")
def google_start() -> RedirectResponse:
if not oauth_google.configured():
raise HTTPException(status_code=503, detail="Google sign-in isn't configured.")
state = secrets.token_urlsafe(24)
verifier, challenge = oauth_google.new_pkce()
url = oauth_google.auth_url(_google_redirect_uri(), state, challenge)
resp = RedirectResponse(url, status_code=302)
# Bind the flow to this browser; read back (and CSRF-checked) on callback.
resp.set_cookie(
OAUTH_COOKIE, _sign(f"{state}:{verifier}"), max_age=600,
httponly=True, secure=_COOKIE_SECURE, samesite="lax", path="/",
)
return resp
@app.get("/api/auth/google/callback")
def google_callback(
request: Request,
code: str | None = None,
state: str | None = None,
error: str | None = None,
) -> RedirectResponse:
# The user always sees the same generic error=google (no detail leaked),
# but we log WHY internally so device/host-specific failures (e.g. a www
# vs apex cookie loss, a state mismatch, a token-exchange error) are
# diagnosable instead of all looking identical.
def fail(reason: str, exc: Exception | None = None) -> RedirectResponse:
host = request.headers.get("Host", "?")
if exc is not None:
log.warning("google callback failed: %s (host=%s): %s", reason, host, exc)
else:
log.warning("google callback failed: %s (host=%s)", reason, host)
return RedirectResponse(f"{PUBLIC_BASE_URL}/auth/verify?error=google", status_code=302)
if error:
return fail(f"provider_error:{error}")
if not code or not state:
return fail("missing_code_or_state")
saved = _unsign(request.cookies.get(OAUTH_COOKIE))
if not saved:
# Most likely the host-only ub_oauth cookie was set on a different
# host than this callback (www vs apex). Canonicalizing www→apex at
# the edge prevents this.
return fail("missing_oauth_cookie")
saved_state, _, verifier = saved.partition(":")
if not hmac.compare_digest(saved_state, state):
return fail("state_mismatch")
try:
tokens = oauth_google.exchange_code(code, _google_redirect_uri(), verifier)
info = oauth_google.verify_id_token(tokens["id_token"])
if not info.get("picture") and tokens.get("access_token"):
info["picture"] = oauth_google.fetch_userinfo(tokens["access_token"]).get("picture")
except Exception as exc: # noqa: BLE001 — log reason, show generic error
return fail("token_exchange_or_verify", exc)
with get_conn() as conn:
user_id = auth.find_or_create_user(
conn, info["email"], "google", info["sub"],
display_name=info.get("name"), avatar_url=info.get("picture"),
)
token = auth.create_session(conn, user_id, user_agent=request.headers.get("User-Agent"))
conn.commit()
ok = RedirectResponse(f"{PUBLIC_BASE_URL}/", status_code=302)
_set_session_cookie(ok, token)
ok.delete_cookie(OAUTH_COOKIE, path="/")
return ok
# --- Saved articles, history, and one-time import (all require sign-in) ---
@app.get("/api/saved", response_model=FeedResponse)
def saved_list(request: Request) -> FeedResponse:
with get_conn() as conn:
user = _require_user(conn, request)
rows = queries.saved(conn, user["id"])
items = [Article.from_row(r) for r in rows]
return FeedResponse(topic=None, flavor=None, count=len(items), items=items)
@app.get("/api/saved/ids")
def saved_id_list(request: Request) -> list[int]:
with get_conn() as conn:
user = _require_user(conn, request)
return queries.saved_ids(conn, user["id"])
# --- Follows: durable source / tag interests (require sign-in) ---
def _follows_for(conn, user_id: int) -> list[dict]:
rows = conn.execute(
"SELECT kind, value FROM user_follows WHERE user_id = ? ORDER BY created_at DESC", (user_id,)
).fetchall()
out = []
for r in rows:
d = {"kind": r["kind"], "value": r["value"], "name": r["value"]}
if r["kind"] == "source" and r["value"].isdigit():
src = conn.execute("SELECT name FROM sources WHERE id = ?", (int(r["value"]),)).fetchone()
if src:
d["name"] = src["name"]
out.append(d)
return out
@app.get("/api/follows")
def follows_list(request: Request) -> list[dict]:
with get_conn() as conn:
user = _require_user(conn, request)
return _follows_for(conn, user["id"])
@app.post("/api/follows")
def follow_add(body: FollowBody, request: Request) -> dict:
if body.kind not in ("source", "tag"):
raise HTTPException(status_code=422, detail="kind must be 'source' or 'tag'")
value = (body.value or "").strip()
if body.kind == "tag":
value = value.lower()
if not value:
raise HTTPException(status_code=422, detail="value is required")
with get_conn() as conn:
user = _require_user(conn, request)
if body.kind == "source":
if not value.isdigit() or not conn.execute(
"SELECT 1 FROM sources WHERE id = ?", (int(value),)
).fetchone():
raise HTTPException(status_code=404, detail="source not found")
conn.execute(
"INSERT OR IGNORE INTO user_follows (user_id, kind, value) VALUES (?, ?, ?)",
(user["id"], body.kind, value),
)
conn.commit()
return {"ok": True, "kind": body.kind, "value": value}
@app.delete("/api/follows")
def follow_remove(request: Request, kind: str = Query(...), value: str = Query(...)) -> dict:
v = value.strip().lower() if kind == "tag" else value.strip()
with get_conn() as conn:
user = _require_user(conn, request)
conn.execute(
"DELETE FROM user_follows WHERE user_id = ? AND kind = ? AND value = ?", (user["id"], kind, v)
)
conn.commit()
return {"ok": True}
@app.post("/api/saved/{article_id}")
def save_article(article_id: int, request: Request) -> dict:
with get_conn() as conn:
user = _require_user(conn, request)
if not conn.execute("SELECT 1 FROM articles WHERE id = ?", (article_id,)).fetchone():
raise HTTPException(status_code=404, detail="No such article.")
conn.execute(
"INSERT OR IGNORE INTO saved_articles (user_id, article_id) VALUES (?, ?)",
(user["id"], article_id),
)
conn.commit()
return {"saved": True}
@app.delete("/api/saved/{article_id}")
def unsave_article(article_id: int, request: Request) -> dict:
with get_conn() as conn:
user = _require_user(conn, request)
conn.execute(
"DELETE FROM saved_articles WHERE user_id = ? AND article_id = ?",
(user["id"], article_id),
)
conn.commit()
return {"saved": False}
@app.get("/api/history", response_model=FeedResponse)
def history_list(request: Request) -> FeedResponse:
with get_conn() as conn:
user = _require_user(conn, request)
rows = queries.history(conn, user["id"])
items = [Article.from_row(r) for r in rows]
return FeedResponse(topic=None, flavor=None, count=len(items), items=items)
@app.post("/api/history")
def record_history(body: IdsBody, request: Request) -> dict:
with get_conn() as conn:
user = _require_user(conn, request)
for aid in queries.existing_article_ids(conn, body.ids):
conn.execute(
"INSERT OR IGNORE INTO user_history (user_id, article_id, event) "
"VALUES (?, ?, 'seen')",
(user["id"], aid),
)
conn.commit()
return {"ok": True}
@app.delete("/api/history")
def clear_history(request: Request) -> dict:
with get_conn() as conn:
user = _require_user(conn, request)
conn.execute("DELETE FROM user_history WHERE user_id = ?", (user["id"],))
conn.commit()
return {"ok": True}
@app.delete("/api/history/{article_id}")
def remove_history(article_id: int, request: Request) -> dict:
with get_conn() as conn:
user = _require_user(conn, request)
conn.execute(
"DELETE FROM user_history WHERE user_id = ? AND article_id = ?",
(user["id"], article_id),
)
conn.commit()
return {"ok": True}
# --- Prefs sync (Calm Filters / Boundaries follow the account) --------
@app.get("/api/prefs")
def get_prefs(request: Request) -> dict:
with get_conn() as conn:
user = _require_user(conn, request)
row = conn.execute(
"SELECT prefs_json FROM user_prefs WHERE user_id = ?", (user["id"],)
).fetchone()
if not row:
return {"prefs": None} # no row yet → caller seeds from the device
try:
return {"prefs": json.loads(row["prefs_json"])}
except (ValueError, TypeError):
return {"prefs": None}
@app.put("/api/prefs")
def put_prefs(body: PrefsBody, request: Request) -> dict:
blob = json.dumps(body.prefs)[:20000]
with get_conn() as conn:
user = _require_user(conn, request)
conn.execute(
"INSERT INTO user_prefs (user_id, prefs_json, updated_at) "
"VALUES (?, ?, CURRENT_TIMESTAMP) "
"ON CONFLICT(user_id) DO UPDATE SET prefs_json = excluded.prefs_json, "
"updated_at = CURRENT_TIMESTAMP",
(user["id"], blob),
)
conn.commit()
return {"ok": True}
# --- Account: profile, sessions, export, delete -----------------------
@app.get("/api/account")
def account_info(request: Request) -> dict:
with get_conn() as conn:
user = _require_user(conn, request)
providers = [r["provider"] for r in conn.execute(
"SELECT provider FROM identities WHERE user_id = ?", (user["id"],)
)]
sessions = conn.execute(
"SELECT COUNT(*) FROM sessions WHERE user_id = ?", (user["id"],)
).fetchone()[0]
saved = conn.execute(
"SELECT COUNT(*) FROM saved_articles WHERE user_id = ?", (user["id"],)
).fetchone()[0]
return {
"user": {"id": user["id"], "email": user["email"], "display_name": user["display_name"]},
"providers": providers,
"sessions": sessions,
"saved_count": saved,
}
@app.post("/api/account/logout-all")
def logout_all(request: Request, response: Response) -> dict:
with get_conn() as conn:
user = _require_user(conn, request)
conn.execute("DELETE FROM sessions WHERE user_id = ?", (user["id"],))
conn.commit()
response.delete_cookie(SESSION_COOKIE, path="/")
return {"ok": True}
@app.get("/api/account/export")
def export_account(request: Request) -> Response:
with get_conn() as conn:
user = _require_user(conn, request)
uid = user["id"]
providers = [r["provider"] for r in conn.execute(
"SELECT provider FROM identities WHERE user_id = ?", (uid,)
)]
saved = queries.saved(conn, uid, limit=10000)
hist = queries.history(conn, uid, limit=10000)
prow = conn.execute(
"SELECT prefs_json FROM user_prefs WHERE user_id = ?", (uid,)
).fetchone()
slim = lambda a: {"id": a["id"], "title": a["title"], "url": a["canonical_url"]}
data = {
"account": {"id": uid, "email": user["email"],
"display_name": user["display_name"], "created_at": user["created_at"]},
"sign_in_methods": providers,
"saved": [slim(a) for a in saved],
"history": [slim(a) for a in hist],
"preferences": json.loads(prow["prefs_json"]) if prow else None,
}
return Response(
content=json.dumps(data, indent=2),
media_type="application/json",
headers={"Content-Disposition": "attachment; filename=upbeatbytes-data.json"},
)
@app.delete("/api/account")
def delete_account(request: Request, response: Response) -> dict:
with get_conn() as conn:
user = _require_user(conn, request)
conn.execute("DELETE FROM users WHERE id = ?", (user["id"],)) # cascades to all account data
conn.commit()
response.delete_cookie(SESSION_COOKIE, path="/")
return {"ok": True}
# --- Public share/landing page for an article -------------------------
# GET + HEAD: FastAPI's @app.get registers GET only (no auto-HEAD), so a HEAD would
# fall through to the catch-all StaticFiles mount at "/" and 404. Register both so
# HEAD returns the same status (200/301/404) as GET, sans body.
@app.api_route("/a/{article_id}", methods=["GET", "HEAD"], response_class=HTMLResponse)
def share_page(article_id: str, background_tasks: BackgroundTasks) -> HTMLResponse:
not_found = HTMLResponse(share.render_not_found(PUBLIC_BASE_URL), status_code=404)
try:
aid = int(article_id)
except (TypeError, ValueError):
return not_found # malformed id → calm 404, no stack trace
cached = _share_cache_get(aid)
if cached is not None: # serve a rendered page without touching SQLite/render
return HTMLResponse(cached, headers={"Cache-Control": "public, max-age=300"})
with get_conn() as conn:
row = conn.execute(
"SELECT a.id, a.title, a.description, a.image_url, a.canonical_url, "
"a.duplicate_of, a.source_id, src.name AS source_name, src.image_policy, s.reason_text, s.accepted, "
"(SELECT group_concat(t.tag) FROM article_tags t WHERE t.article_id = a.id) AS tags "
"FROM articles a JOIN sources src ON src.id = a.source_id "
"LEFT JOIN article_scores s ON s.article_id = a.id WHERE a.id = ?",
(aid,),
).fetchone()
if not row:
return not_found
# A duplicate's URL may already be indexed by Google. A hard 404 silently
# drops it (and any newer twin that arrives later retires the OLDER, already
# indexed URL) — that's what tanked impressions. So 301 to the canonical twin
# instead: Google consolidates the page onto the survivor. dedup stores a star
# (dup -> rep, rep.duplicate_of IS NULL); we still follow a short chain with a
# cycle guard as cheap insurance.
if row["duplicate_of"] is not None:
seen, cur, target = {aid}, row["duplicate_of"], None
for _ in range(8):
if cur in seen:
break
seen.add(cur)
r2 = conn.execute(
"SELECT a.id, a.duplicate_of, s.accepted FROM articles a "
"LEFT JOIN article_scores s ON s.article_id = a.id WHERE a.id = ?",
(cur,),
).fetchone()
if not r2:
break
if r2["duplicate_of"] is None:
target = r2 if r2["accepted"] else None
break
cur = r2["duplicate_of"]
if target is not None:
return RedirectResponse(f"/a/{target['id']}", status_code=301)
return not_found # canonical itself is gone/rejected → genuinely 404
if not row["accepted"]:
return not_found
summary = summarize.get_summary(conn, aid)
explanation = summarize.get_explanation(conn, aid)
complete = bool(summary and explanation)
if not complete:
_kick_summary(aid, background_tasks) # generate/top-up for next time; page polls
html = share.render_share_page(dict(row), PUBLIC_BASE_URL, summary=summary, explanation=explanation)
if complete:
_share_cache_put(aid, html) # cache only the finished page (never the "generating" state)
return HTMLResponse(html, headers={"Cache-Control": "public, max-age=300" if complete else "no-cache"})
# --- Privacy-respecting first-party analytics -------------------------
@app.post("/api/events")
def record_event(body: EventBody, request: Request) -> dict:
# Don't let crawlers inflate visitor/funnel counts. Many modern bots run JS and
# DO fire this beacon, so filter by User-Agent (same check the load-error beacon
# uses) — catches honest bot UAs (GPTBot, AhrefsBot, headless Chrome, …). The
# response is identical either way, so a bot can't tell it was dropped.
ua = request.headers.get("user-agent", "")
if body.kind in _EVENT_KINDS and not queries.is_bot_ua(ua):
with get_conn() as conn:
conn.execute(
"INSERT OR IGNORE INTO events (kind, article_id, visitor_hash, day) "
"VALUES (?, ?, ?, date('now'))",
(body.kind, body.article_id or 0, _visitor_hash(body.visitor)),
)
conn.commit()
return {"ok": True} # always identical; dedup'd by the unique key
@app.post("/api/client-error")
def record_client_error(body: ClientErrorBody, request: Request) -> dict:
# Boot-failure seatbelt telemetry — what blank-risk looks like in the wild.
ua = (request.headers.get("user-agent") or "")[:300]
with get_conn() as conn:
conn.execute(
"INSERT INTO client_errors (reason, path, user_agent, app_version) VALUES (?, ?, ?, ?)",
((body.reason or "")[:500], (body.path or "")[:200], ua, (body.version or "")[:60]),
)
conn.execute("DELETE FROM client_errors WHERE created_at < datetime('now','-14 days')")
conn.commit()
return {"ok": True}
@app.get("/api/admin/client-errors")
def admin_client_errors(request: Request,
show: str = Query("unread", pattern="^(unread|read|all)$")) -> list[dict]:
with get_conn() as conn:
_require_admin(conn, request)
where = {"unread": "WHERE read_at IS NULL", "read": "WHERE read_at IS NOT NULL", "all": ""}[show]
rows = conn.execute(
f"SELECT id, reason, path, user_agent, app_version, created_at, read_at "
f"FROM client_errors {where} ORDER BY id DESC LIMIT 50"
).fetchall()
# Bots stay visible (tagged) but are excluded from the headline count — see admin_stats.
return [{**dict(r), "read": r["read_at"] is not None,
"bot": queries.is_bot_ua(r["user_agent"])} for r in rows]
@app.post("/api/admin/client-errors/read-all")
def admin_client_errors_read_all(request: Request) -> dict:
with get_conn() as conn:
_require_admin(conn, request)
cur = conn.execute("UPDATE client_errors SET read_at = CURRENT_TIMESTAMP WHERE read_at IS NULL")
conn.commit()
return {"ok": True, "marked": cur.rowcount}
@app.post("/api/admin/client-errors/{eid}/read")
def admin_client_error_read(eid: int, body: FeedbackReadBody, request: Request) -> dict:
with get_conn() as conn:
_require_admin(conn, request)
ts = "CURRENT_TIMESTAMP" if body.read else "NULL"
cur = conn.execute(f"UPDATE client_errors SET read_at = {ts} WHERE id = ?", (eid,))
if cur.rowcount == 0:
raise HTTPException(status_code=404, detail="load error not found")
conn.commit()
return {"ok": True, "read": body.read}
@app.post("/api/feedback")
def submit_feedback(body: FeedbackBody, request: Request, background_tasks: BackgroundTasks) -> dict:
if body.hp: # honeypot tripped → accept silently, store nothing
return {"ok": True}
message = (body.message or "").strip()[:4000]
if not message:
raise HTTPException(status_code=422, detail="Please add a short message.")
category = body.category if body.category in _FEEDBACK_CATEGORIES else "other"
email = ((body.email or "").strip()[:200]) or None
vh = _visitor_hash(body.visitor)
with get_conn() as conn:
if vh: # light flood cap per anonymous token per day
recent = conn.execute(
"SELECT COUNT(*) FROM feedback WHERE visitor_hash = ? AND day = date('now')", (vh,)
).fetchone()[0]
if recent >= 8:
return {"ok": True}
user = _current_user(conn, request)
conn.execute(
"INSERT INTO feedback (category, message, contact_email, user_id, visitor_hash, day) "
"VALUES (?, ?, ?, ?, ?, date('now'))",
(category, message, email, user["id"] if user else None, vh),
)
conn.commit()
who = user["email"] if user else "anonymous visitor"
for addr in ADMIN_EMAILS:
background_tasks.add_task(_feedback_email_safe, addr, category, message, email, who)
return {"ok": True}
@app.get("/api/admin/feedback")
def admin_feedback(request: Request) -> list[dict]:
with get_conn() as conn:
_require_admin(conn, request)
rows = conn.execute(
"SELECT f.id, f.category, f.message, f.contact_email, f.created_at, f.read_at, "
"u.email AS user_email FROM feedback f LEFT JOIN users u ON u.id = f.user_id "
"ORDER BY f.created_at DESC LIMIT 200"
).fetchall()
items = [dict(r) for r in rows]
if items:
ids = [it["id"] for it in items]
ph = ",".join("?" * len(ids))
reps = conn.execute(
f"SELECT id, feedback_id, message, message_html, sent_to, sent_at FROM feedback_replies "
f"WHERE feedback_id IN ({ph}) ORDER BY sent_at",
ids,
).fetchall()
by_fid: dict = {}
for r in reps:
by_fid.setdefault(r["feedback_id"], []).append(dict(r))
for it in items:
it["replies"] = by_fid.get(it["id"], [])
return items
@app.post("/api/admin/feedback/{fid}/read")
def admin_feedback_read(fid: int, body: FeedbackReadBody, request: Request) -> dict:
with get_conn() as conn:
_require_admin(conn, request)
ts = "CURRENT_TIMESTAMP" if body.read else "NULL"
cur = conn.execute(f"UPDATE feedback SET read_at = {ts} WHERE id = ?", (fid,))
if cur.rowcount == 0:
raise HTTPException(status_code=404, detail="feedback not found")
conn.commit()
return {"ok": True, "read": body.read}
@app.delete("/api/admin/feedback/{fid}")
def admin_feedback_delete(fid: int, request: Request) -> dict:
with get_conn() as conn:
_require_admin(conn, request)
cur = conn.execute("DELETE FROM feedback WHERE id = ?", (fid,))
if cur.rowcount == 0:
raise HTTPException(status_code=404, detail="feedback not found")
conn.commit()
return {"ok": True}
@app.post("/api/admin/feedback/{fid}/reply")
def admin_feedback_reply(fid: int, body: FeedbackReplyBody, request: Request) -> dict:
# Cap the RAW editor HTML first (slicing sanitized output could sever a
# tag), then sanitize the whole thing.
reply_html = sanitize_reply_html((body.html or "")[:20000])
reply_text = reply_html_to_text(reply_html)
if not reply_text:
raise HTTPException(status_code=422, detail="Reply message is required.")
# 1. Validate + gather, then release the DB connection — SMTP can take
# ~20s and shouldn't keep a connection open.
with get_conn() as conn:
admin = _require_admin(conn, request)
fb = conn.execute("SELECT contact_email, message FROM feedback WHERE id = ?", (fid,)).fetchone()
if not fb:
raise HTTPException(status_code=404, detail="feedback not found")
if not fb["contact_email"]:
raise HTTPException(status_code=400, detail="No reply address for this feedback.")
admin_id, to, original = admin["id"], fb["contact_email"], fb["message"]
# 2. Send with no DB connection held; only record a reply that actually
# went out, so the UI can keep the draft on failure.
try:
email_send.send_feedback_reply(to, reply_text, reply_html, original)
except Exception as exc: # noqa: BLE001 — surface any SMTP failure to the admin
raise HTTPException(status_code=502, detail=f"Could not send the reply: {exc}")
# 3. Record the sent reply + mark the item read.
with get_conn() as conn:
cur = conn.execute(
"INSERT INTO feedback_replies (feedback_id, user_id, message, message_html, sent_to) "
"VALUES (?, ?, ?, ?, ?)",
(fid, admin_id, reply_text, reply_html, to),
)
conn.execute(
"UPDATE feedback SET read_at = COALESCE(read_at, CURRENT_TIMESTAMP) WHERE id = ?", (fid,)
)
conn.commit()
reply = conn.execute(
"SELECT id, feedback_id, message, message_html, sent_to, sent_at FROM feedback_replies WHERE id = ?",
(cur.lastrowid,),
).fetchone()
return {"ok": True, "reply": dict(reply)}
@app.post("/api/admin/sources/{sid}/status")
def admin_source_status(sid: int, body: SourceStatusBody, request: Request) -> dict:
# Lifecycle: active | paused | retired. Stops/resumes polling only —
# existing articles stay visible (use /visibility to hide). `active` is
# kept mirrored so the scheduler/CLI keep working off the legacy flag.
if body.status not in ("active", "paused", "retired"):
raise HTTPException(status_code=422, detail="invalid status")
active = 1 if body.status == "active" else 0
with get_conn() as conn:
_require_admin(conn, request)
cur = conn.execute(
"UPDATE sources SET status = ?, active = ? WHERE id = ?", (body.status, active, sid)
)
if cur.rowcount == 0:
raise HTTPException(status_code=404, detail="source not found")
conn.commit()
return {"ok": True, "status": body.status, "active": active}
@app.post("/api/admin/sources/{sid}/visibility")
def admin_source_visibility(sid: int, body: SourceVisibilityBody, request: Request) -> dict:
# Pull a source's existing articles out of (or back into) the public feed
# and brief. Reversible; never deletes anything.
with get_conn() as conn:
_require_admin(conn, request)
cur = conn.execute(
"UPDATE sources SET content_visible = ? WHERE id = ?", (1 if body.visible else 0, sid)
)
if cur.rowcount == 0:
raise HTTPException(status_code=404, detail="source not found")
conn.commit()
return {"ok": True, "visible": body.visible}
@app.post("/api/admin/sources/{sid}/preview")
def admin_source_preview(sid: int, request: Request) -> dict:
# Read-only spot-check of a LIVE source: safe-fetch + heuristic preview.
# Mutates nothing — no DB write, no poll attempt, no health/state change.
with get_conn() as conn:
_require_admin(conn, request)
src = conn.execute("SELECT feed_url FROM sources WHERE id = ?", (sid,)).fetchone()
if not src:
raise HTTPException(status_code=404, detail="source not found")
url = src["feed_url"]
return _preview_or_502(url) # safe fetch, no DB connection held
@app.get("/api/admin/sources/{sid}/articles")
def admin_source_articles(sid: int, request: Request, filter: str = "all",
limit: int = 25, offset: int = 0) -> dict:
# Read-only inspector: the REAL ingested articles behind a source's metrics,
# so paywall/image/acceptance/duplicate signals can be verified against evidence.
limit = max(1, min(int(limit), 100))
offset = max(0, int(offset))
with get_conn() as conn:
_require_admin(conn, request)
if not conn.execute("SELECT 1 FROM sources WHERE id = ?", (sid,)).fetchone():
raise HTTPException(status_code=404, detail="source not found")
arts = queries.source_articles(conn, sid, filter, limit, offset)
return {
"articles": arts,
"summary": queries.source_articles_summary(conn, sid) if offset == 0 else None,
"has_more": len(arts) == limit,
}
@app.post("/api/admin/sources/{sid}/paywall")
def admin_source_paywall(sid: int, body: SourcePaywallBody, request: Request) -> dict:
# Per-source paywall override: corrects domain-rule false positives
# (NY Times Learning is free) / negatives. Threaded into feed/lead/brief
# ranking + badges via is_paywalled_for_source.
ov = body.override or None
if ov not in (None, "free", "paywalled"):
raise HTTPException(status_code=422, detail="override must be null, 'free', or 'paywalled'")
with get_conn() as conn:
_require_admin(conn, request)
cur = conn.execute("UPDATE sources SET paywall_override = ? WHERE id = ?", (ov, sid))
if cur.rowcount == 0:
raise HTTPException(status_code=404, detail="source not found")
conn.commit()
return {"ok": True, "override": ov}
@app.post("/api/admin/sources/{sid}/image-policy")
def admin_source_image_policy(sid: int, body: SourceImagePolicyBody, request: Request) -> dict:
# Image rights policy: 'cache' (re-host a downscaled copy — only for sources we've
# cleared: open license / permission / public-domain), 'remote' (hotlink the
# publisher's image), 'none' (no image). Default is the conservative 'remote'.
pol = body.policy
if pol not in ("cache", "remote", "none"):
raise HTTPException(status_code=422, detail="policy must be 'cache', 'remote', or 'none'")
with get_conn() as conn:
_require_admin(conn, request)
cur = conn.execute("UPDATE sources SET image_policy = ? WHERE id = ?", (pol, sid))
if cur.rowcount == 0:
raise HTTPException(status_code=404, detail="source not found")
conn.commit()
# Leaving 'cache' (e.g. permission revoked) → take the re-hosted copies down now,
# not just make them inaccessible. Setting TO 'cache' just flips the flag; the
# cycle warms it.
purged = newsimg.purge_source(conn, sid) if pol != "cache" else 0
return {"ok": True, "policy": pol, "purged": purged}
# --- Source candidates (supervised add-a-source pipeline) ----------------
def _candidate_dict(row) -> dict:
d = dict(row)
raw = d.pop("preview_json", None)
try:
d["preview"] = json.loads(raw) if raw else None
except (ValueError, TypeError):
d["preview"] = None
return d
@app.get("/api/admin/candidates")
def admin_candidates(request: Request) -> list[dict]:
with get_conn() as conn:
_require_admin(conn, request)
rows = sources.list_candidates(conn)
return [_candidate_dict(r) for r in rows]
def _preview_or_502(url: str, deep: bool = False) -> dict:
# SSRF-safe fetch (admin-pasted URL is untrusted). Default is the fast
# heuristic; deep=True also runs the real LLM classifier on a small sample
# (slower, ~5-7s/item — the true acceptance view, not an estimate).
client = None
if deep:
try:
client = LocalModelClient.from_env()
except Exception: # noqa: BLE001 — fall back to heuristic if the model is down
client = None
try:
return feeds.preview_feed(
url, sample=(8 if deep else 20), fetcher=feeds.safe_fetch_feed, client=client
)
except Exception as exc: # noqa: BLE001 — surface a readable reason
raise HTTPException(status_code=502, detail=f"Couldn't preview that feed: {exc}")
@app.post("/api/admin/candidates")
def admin_candidate_suggest(body: CandidateSuggestBody, request: Request) -> dict:
url = (body.feed_url or "").strip()
if not url:
raise HTTPException(status_code=422, detail="feed_url is required")
with get_conn() as conn: # gate BEFORE the outbound fetch
_require_admin(conn, request)
# Don't re-add a feed that's already live or already queued (catches
# http/https · www · trailing-slash variants, not just exact dups).
existing = sources.find_existing_feed(conn, url)
if existing:
where = "already a source" if existing["kind"] == "source" else "already in the candidate queue"
raise HTTPException(
status_code=409,
detail=f"{existing['name']}” is {where} ({existing['status']}). "
"Find it below — Re-preview it there if you want a fresh read.",
)
preview = _preview_or_502(url) # no DB connection held during network I/O
with get_conn() as conn:
_require_admin(conn, request)
row = sources.save_candidate(conn, url, preview=preview, name=(body.name or None), status="suggested")
return _candidate_dict(row)
@app.post("/api/admin/candidates/{cid}/preview")
def admin_candidate_repreview(cid: int, request: Request, deep: bool = False) -> dict:
with get_conn() as conn:
_require_admin(conn, request)
cand = conn.execute("SELECT feed_url FROM source_candidates WHERE id = ?", (cid,)).fetchone()
if not cand:
raise HTTPException(status_code=404, detail="candidate not found")
url = cand["feed_url"]
preview = _preview_or_502(url, deep=deep) # conn released during the (slow) model pass
with get_conn() as conn:
_require_admin(conn, request)
row = sources.save_candidate(conn, url, preview=preview)
return _candidate_dict(row)
@app.post("/api/admin/candidates/{cid}/rename")
def admin_candidate_rename(cid: int, body: CandidateRenameBody, request: Request) -> dict:
with get_conn() as conn:
_require_admin(conn, request)
cand = conn.execute("SELECT status FROM source_candidates WHERE id = ?", (cid,)).fetchone()
if not cand:
raise HTTPException(status_code=404, detail="candidate not found")
# Match the UI policy server-side: a promoted/rejected candidate is
# settled history — rename only while it's pending review.
if cand["status"] in ("promoted", "rejected"):
raise HTTPException(status_code=409, detail=f"Can't rename a {cand['status']} candidate.")
name = (body.name or "").strip()[:160] or None # cap so a pasted paragraph can't wreck the queue
row = sources.rename_candidate(conn, cid, name)
return _candidate_dict(row)
@app.post("/api/admin/candidates/{cid}/promote")
def admin_candidate_promote(cid: int, body: CandidatePromoteBody, request: Request) -> dict:
with get_conn() as conn:
_require_admin(conn, request)
try:
source_id = sources.promote_candidate(
conn, cid, active=body.active, default_category=body.default_category,
trust_score=body.trust_score, pr_risk_score=body.pr_risk_score,
poll_interval_minutes=body.poll_interval_minutes,
)
except sources.DuplicateFeedError as exc:
ex = exc.existing
raise HTTPException(
status_code=409,
detail=f"{ex['name']}” is already a source ({ex['status']}) — "
"promote skipped so its settings aren't overwritten.",
)
except ValueError:
raise HTTPException(status_code=404, detail="candidate not found")
src = conn.execute(
"SELECT id, name, status, active, content_visible FROM sources WHERE id = ?", (source_id,)
).fetchone()
cand = conn.execute("SELECT * FROM source_candidates WHERE id = ?", (cid,)).fetchone()
return {"ok": True, "source_id": source_id, "source": dict(src), "candidate": _candidate_dict(cand)}
@app.post("/api/admin/candidates/{cid}/reject")
def admin_candidate_reject(cid: int, request: Request) -> dict:
with get_conn() as conn:
_require_admin(conn, request)
if not sources.reject_candidate(conn, cid):
raise HTTPException(status_code=404, detail="candidate not found")
cand = conn.execute("SELECT * FROM source_candidates WHERE id = ?", (cid,)).fetchone()
return _candidate_dict(cand)
@app.post("/api/admin/candidates/{cid}/restore")
def admin_candidate_restore(cid: int, request: Request) -> dict:
# Send a rejected candidate back to staging for another look.
with get_conn() as conn:
_require_admin(conn, request)
if not sources.restore_candidate(conn, cid):
raise HTTPException(status_code=404, detail="no rejected candidate with that id")
cand = conn.execute("SELECT * FROM source_candidates WHERE id = ?", (cid,)).fetchone()
return _candidate_dict(cand)
# --- Publishing Desk (admin): outbound-share queue for X (platform-neutral) ---
@app.post("/api/admin/publishing/build")
def admin_publishing_build(request: Request, background_tasks: BackgroundTasks) -> dict:
# Kick the queue build in the background (the comparative LLM call can be slow);
# the client polls /queue. No-op if a build is already running.
with get_conn() as conn:
_require_admin(conn, request)
with _publish_build_lock: # atomic check-and-set: one job at a time
if not _publish_build["building"]:
_publish_build.update(building=True, result=None, error=None)
background_tasks.add_task(_run_publish_build)
return {"building": True}
@app.get("/api/admin/publishing/queue")
def admin_publishing_queue(request: Request, archived: bool = False) -> dict:
with get_conn() as conn:
_require_admin(conn, request)
items = publishing.list_queue(conn, include_archived=archived)
return {"building": _publish_build["building"], "last": _publish_build.get("result"),
"error": _publish_build.get("error"), "items": items}
@app.post("/api/admin/publishing/{sid}/status")
def admin_publishing_status(sid: int, body: PublishStatusBody, request: Request) -> dict:
with get_conn() as conn:
_require_admin(conn, request)
ok = publishing.set_status(conn, sid, body.status, draft_text=body.draft_text,
final_text=body.final_text, post_url=body.post_url,
snooze_until=body.snooze_until)
if not ok:
raise HTTPException(status_code=400, detail="bad status or id")
return {"ok": True}
@app.post("/api/admin/publishing/{sid}/draft")
def admin_publishing_draft(sid: int, body: PublishDraftBody, request: Request) -> dict:
with get_conn() as conn:
_require_admin(conn, request)
ok = publishing.save_draft(conn, sid, body.draft_text)
if not ok:
raise HTTPException(status_code=404, detail="no such share")
return {"ok": True}
@app.post("/api/admin/publishing/{sid}/restore")
def admin_publishing_restore(sid: int, request: Request) -> dict:
with get_conn() as conn:
_require_admin(conn, request)
ok = publishing.restore(conn, sid)
if not ok:
raise HTTPException(status_code=400, detail="not a restorable (skipped/snoozed) share")
return {"ok": True}
@app.post("/api/admin/publishing/handles")
def admin_publishing_add_handle(body: EntityHandleBody, request: Request) -> dict:
# Save a verified handle (e.g. after confirming one via 'Find on X').
with get_conn() as conn:
_require_admin(conn, request)
ok = publishing.add_entity_handle(conn, body.entity_name, body.handle, body.profile_url)
if not ok:
raise HTTPException(status_code=400, detail="bad entity or handle")
return {"ok": True}
# --- CSV exports (admin-gated, for inspection / archiving) ---------------
def _csv_cell(v):
# Defuse CSV formula injection: a cell a spreadsheet might evaluate (=,+,-,@)
# gets a leading apostrophe. Numbers pass through untouched (no risk, and
# this avoids mangling any legitimate negative value).
if v is None:
return ""
if isinstance(v, (int, float)):
return v
s = str(v)
return "'" + s if s[:1] in ("=", "+", "-", "@") else s
def _csv_response(filename: str, write) -> Response:
buf = io.StringIO()
writer = csv.writer(buf)
write(lambda values: writer.writerow([_csv_cell(v) for v in values]))
return Response(
content=buf.getvalue(),
media_type="text/csv",
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)
@app.get("/api/admin/export/sources.csv")
def admin_export_sources(request: Request) -> Response:
# Current-state snapshot of every source (active + paused + retired).
with get_conn() as conn:
_require_admin(conn, request)
rows = queries.source_health(conn)
def write(row):
row([
"name", "feed_url", "homepage", "status", "visible", "served", "accepted_total",
"total_articles", "acceptance_pct", "duplicate_pct", "accepted_dup_pct",
"image_coverage_pct", "paywalled", "last_success", "last_error", "retry_after", "review_flag", "review_reason",
])
for s in rows:
row([
s["name"], s["feed_url"], s.get("homepage_url") or "",
s.get("status") or "", "yes" if s.get("content_visible") else "no",
s["served"], s["accepted_total"], s["total_articles"],
s["acceptance_rate"], s["duplicate_rate"], s["accepted_dup_rate"], s["image_coverage"],
"yes" if s.get("paywalled") else "no",
s.get("last_success_at") or "", s.get("last_error") or "", s.get("retry_after_at") or "",
"yes" if s.get("review_flag") else "no", s.get("review_reason") or "",
])
return _csv_response("sources.csv", write)
@app.get("/api/admin/export/audience.csv")
def admin_export_audience(request: Request, days: int = Query(30)) -> Response:
days = days if days in (7, 30, 90) else 30
with get_conn() as conn:
_require_admin(conn, request)
st = queries.admin_stats(conn, days=days)
def write(row):
v, a = st["visitors"], st["accounts"]
row(["metric", "value"])
for label, value in [
("window_days", st["days"]),
("visitors_today", v["today"]), ("visitors_7d", v["d7"]), ("visitors_30d", v["d30"]),
("returning_30d", st.get("returning", 0)), ("once_30d", st.get("once", 0)),
("accounts_total", a["total"]), ("accounts_new_7d", a["new_7d"]), ("accounts_active_7d", a["active_7d"]),
("feedback_7d", st.get("feedback_7d", 0)), ("feedback_unread", st.get("feedback_unread", 0)),
]:
row([label, value])
for kind, n in (st.get("shares") or {}).items():
row([f"share_{kind}", n])
row([]) # blank line, then the daily time series
row(["date", "visitors", "opens"])
for d in st.get("daily", []):
row([d["day"], d["visits"], d["opens"]])
return _csv_response("audience.csv", write)
@app.post("/api/admin/sources/{sid}/review")
def admin_source_review(sid: int, body: SourceReviewBody, request: Request) -> dict:
with get_conn() as conn:
_require_admin(conn, request)
cur = conn.execute(
"UPDATE sources SET review_flag = ?, review_reason = ? WHERE id = ?",
(1 if body.flag else 0, (body.reason or None) if body.flag else None, sid),
)
if cur.rowcount == 0:
raise HTTPException(status_code=404, detail="source not found")
conn.commit()
return {"ok": True, "flag": body.flag}
@app.get("/api/admin/stats")
def admin_stats(request: Request, days: int = Query(30)) -> dict:
days = days if days in (7, 30, 90) else 30 # clamp to the offered windows
with get_conn() as conn:
_require_admin(conn, request)
return queries.admin_stats(conn, days=days)
@app.get("/api/summary/{article_id}")
def article_summary(article_id: int, background_tasks: BackgroundTasks) -> dict:
with get_conn() as conn:
summary = summarize.get_summary(conn, article_id)
explanation = summarize.get_explanation(conn, article_id)
if summary:
return {"status": "ready", "summary": summary, "explanation": explanation}
_kick_summary(article_id, background_tasks)
return {"status": "pending", "summary": None, "explanation": None}
@app.get("/today", response_class=HTMLResponse)
def today_digest() -> HTMLResponse:
with get_conn() as conn:
b = queries.brief(conn)
items = b.get("items") or []
if not items:
return HTMLResponse(share.render_not_found(PUBLIC_BASE_URL), status_code=404)
return HTMLResponse(share.render_digest(items, PUBLIC_BASE_URL, b.get("brief_date")))
@app.api_route("/sitemap.xml", methods=["GET", "HEAD"])
def sitemap() -> Response:
with get_conn() as conn:
pwx = queries.paywalled_source_ids(conn)
pw_clause = f" AND a.source_id NOT IN ({','.join('?' * len(pwx))})" if pwx else ""
# Only articles with a real summary (the page has substance), paywalled excluded.
# Cap near the 50k protocol ceiling so older canonical pages aren't dropped.
rows = conn.execute(
"SELECT a.id, COALESCE(a.published_at, a.discovered_at) AS lm "
"FROM articles a JOIN article_scores s ON s.article_id = a.id "
"WHERE s.accepted = 1 AND a.duplicate_of IS NULL" + pw_clause + " "
"AND EXISTS (SELECT 1 FROM article_summaries asum WHERE asum.article_id = a.id) "
"ORDER BY lm DESC LIMIT 50000",
pwx,
).fetchall()
base = PUBLIC_BASE_URL
# Hub + the public sections, then every summarized article page.
urls = [
f"<url><loc>{base}/</loc><changefreq>hourly</changefreq><priority>1.0</priority></url>",
f"<url><loc>{base}/news</loc><changefreq>hourly</changefreq><priority>0.9</priority></url>",
f"<url><loc>{base}/today</loc><changefreq>daily</changefreq><priority>0.9</priority></url>",
f"<url><loc>{base}/art</loc><changefreq>daily</changefreq><priority>0.6</priority></url>",
f"<url><loc>{base}/play</loc><changefreq>weekly</changefreq><priority>0.5</priority></url>",
f"<url><loc>{base}/word</loc><changefreq>daily</changefreq><priority>0.5</priority></url>",
f"<url><loc>{base}/quote</loc><changefreq>daily</changefreq><priority>0.5</priority></url>",
f"<url><loc>{base}/onthisday</loc><changefreq>daily</changefreq><priority>0.5</priority></url>",
]
for r in rows:
lm = (r["lm"] or "")[:10]
lastmod = f"<lastmod>{lm}</lastmod>" if lm else ""
urls.append(f"<url><loc>{base}/a/{r['id']}</loc>{lastmod}</url>")
xml = (
'<?xml version="1.0" encoding="UTF-8"?>'
'<urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9">'
+ "".join(urls) + "</urlset>"
)
return Response(content=xml, media_type="application/xml")
@app.post("/api/import")
def import_local(body: ImportBody, request: Request) -> dict:
"""Fold this device's anonymous history/saved into the account (one-time)."""
with get_conn() as conn:
user = _require_user(conn, request)
for aid in queries.existing_article_ids(conn, body.seen):
conn.execute(
"INSERT OR IGNORE INTO user_history (user_id, article_id, event) "
"VALUES (?, ?, 'seen')",
(user["id"], aid),
)
for aid in queries.existing_article_ids(conn, body.saved):
conn.execute(
"INSERT OR IGNORE INTO saved_articles (user_id, article_id) VALUES (?, ?)",
(user["id"], aid),
)
conn.commit()
return {"ok": True}
@app.get("/api/categories", response_model=CategoriesResponse)
def categories(response: Response) -> CategoriesResponse:
response.headers["Cache-Control"] = _EDGE_CONFIG # static taxonomy, identical for everyone
return CategoriesResponse(
topics=[Category(key=k, description=v) for k, v in TOPICS.items()],
flavors=[Category(key=k, description=v) for k, v in FLAVORS.items()],
)
@app.get("/api/moods")
def moods(response: Response) -> list[dict]:
# The humane front door: each mood resolves to a filter preset the
# client merges with the user's own Calm Filters.
response.headers["Cache-Control"] = _EDGE_CONFIG # static presets, identical for everyone
return MOODS
@app.get("/api/lanes")
def lanes(response: Response) -> dict:
# The customizable quick-access rail: 'today' is always pinned, and the
# reader pins any subset of these moods / topics / Discovery tags. Live
# counts let the client gate empty lanes and show volume.
response.headers["Cache-Control"] = _EDGE_DERIVED # global counts, no per-user data
with get_conn() as conn:
tagc = queries.tag_counts(conn)
topicc: dict[str, int] = {}
for row in queries.category_counts(conn):
topicc[row["topic"]] = topicc.get(row["topic"], 0) + int(row["count"])
return build_lane_pool(topicc, tagc)
@app.get("/api/families")
def families(response: Response) -> list[dict]:
# Grouping vocabulary organised into calm families for the Explore UI.
response.headers["Cache-Control"] = _EDGE_DERIVED # global vocab + counts, no per-user data
with get_conn() as conn:
counts = queries.tag_counts(conn)
return [
{
"name": name,
"description": d["description"],
"tags": [{"key": t, "count": counts.get(t, 0)} for t in d["tags"]],
}
for name, d in FAMILIES.items()
]
@app.get("/api/category-counts", response_model=list[CategoryCount])
def category_counts(accepted_only: bool = True, prefs: str | None = Query(None)) -> list[CategoryCount]:
fp = prefs_from_json(prefs)
with get_conn() as conn:
if fp.is_empty():
rows = queries.category_counts(conn, accepted_only=accepted_only)
else:
# Count over the SAME filtered set the feed would return, so the
# browse numbers always match what the user actually sees.
allrows = queries.feed(conn, accepted_only=accepted_only, limit=100000, offset=0)
kept = filter_articles(allrows, fp, datetime.now(timezone.utc))
counts = Counter((r["topic"], r["flavor"]) for r in kept)
rows = [
{"topic": t, "flavor": f, "count": n}
for (t, f), n in sorted(counts.items(), key=lambda kv: (str(kv[0][0]), str(kv[0][1])))
]
return [CategoryCount(**row) for row in rows]
@app.get("/api/feed", response_model=FeedResponse)
def feed(
response: Response,
topic: str | None = Query(None),
flavor: str | None = Query(None),
accepted_only: bool = True,
limit: int = Query(30, ge=1, le=100),
offset: int = Query(0, ge=0),
prefs: str | None = Query(None),
exclude: str = Query("", description="comma-separated article ids the reader has dismissed"),
tag: str | None = Query(None, description="grouping tag to browse"),
source_id: int | None = Query(None, ge=1, description="show only this source's articles"),
sort: str = Query("ranked", pattern="^(ranked|latest)$", description="ranked (best-first) or latest (newest-first)"),
following: bool = Query(False, description="restrict to the signed-in user's followed sources/tags"),
home: str | None = Query(None, max_length=8, description="Closer to Home: reader's home as 'US' or 'US-NY'"),
request: Request = None,
) -> FeedResponse:
# Edge-cacheable ONLY when the response depends purely on the URL: not the
# following feed (reads the session's follows) and not personal filters
# (prefs/dismissals are per-reader). The shareable cases — the default
# home feed, topic/flavor/tag/source browse — are identical for everyone,
# so the edge can serve one copy to all. Everything else stays private.
shareable = not following and not prefs and not exclude.strip() and not home
response.headers["Cache-Control"] = _EDGE_FEED if shareable else _PRIVATE
if topic and topic.lower() not in TOPICS:
raise HTTPException(400, f"unknown topic: {topic}")
if flavor and flavor.lower() not in FLAVORS:
raise HTTPException(400, f"unknown flavor: {flavor}")
# Parse the reader's home: 'US' or 'US-NY'. State granularity is US-only for v1.
home_country = home_state = None
if home:
parts = home.upper().split("-", 1)
home_country = (parts[0][:2] or None)
if home_country == "US" and len(parts) > 1:
home_state = parts[1][:2] or None
fp = prefs_from_json(prefs)
now = datetime.now(timezone.utc)
excl = {int(x) for x in exclude.split(",") if x.strip().lstrip("-").isdigit()}
# Categorical filters (include/mute topics+flavors incl. active pauses,
# cortisol ceiling) go to SQL so nothing is truncated by ranking. Only
# word-boundary avoid-terms and dismissals need a Python pass.
kw = _prefs_sql_kw(fp, now)
with get_conn() as conn:
if following:
user = _current_user(conn, request)
if not user:
return FeedResponse(topic=topic, flavor=flavor, count=0, items=[])
frows = conn.execute(
"SELECT kind, value FROM user_follows WHERE user_id = ?", (user["id"],)
).fetchall()
kw["follow_sources"] = [int(r["value"]) for r in frows if r["kind"] == "source" and r["value"].isdigit()]
kw["follow_tags"] = [r["value"] for r in frows if r["kind"] == "tag"]
def _fetch(scope, lim, off):
# One scoped page, applying the avoid-terms/dismissal Python pass when needed.
if fp.avoid_terms or excl:
fetch_n = min(2000, (off + lim) * 4 + 50 + len(excl))
raw = queries.feed(
conn, topic=topic, flavor=flavor, accepted_only=accepted_only, limit=fetch_n, offset=0,
tag=tag, source_id=source_id, sort=sort,
home_country=home_country, home_state=home_state, geo_scope=scope, **kw,
)
kept = [a for a in filter_articles(raw, fp, now) if a["id"] not in excl]
return kept[off : off + lim]
return queries.feed(
conn, topic=topic, flavor=flavor, accepted_only=accepted_only, limit=lim, offset=off,
tag=tag, source_id=source_id, sort=sort,
home_country=home_country, home_state=home_state, geo_scope=scope, **kw,
)
next_offset = None
if home_country:
# Closer to Home. Near you (+ Elsewhere in your country when a state is set)
# is a ONE-TIME lead block on page 0; the world is the paginated body. Thin
# tiers fold down so a header is never shown empty (Codex: lead, don't trap).
NEAR_CAP, COUNTRY_CAP, MIN_TIER = 8, 8, 3
if offset == 0:
near = _fetch("near", NEAR_CAP, 0)
country = _fetch("country", COUNTRY_CAP, 0) if home_state else []
world = _fetch("world", limit, 0)
next_offset = limit if len(world) == limit else None
tiers = []
if home_state:
if len(near) >= MIN_TIER:
tiers.append(("near", near))
else:
country = near + country # fold sparse "near" into your country
if len(country) >= MIN_TIER:
tiers.append(("country", country))
else:
world = country + world # fold sparse country into the world
elif len(near) >= MIN_TIER:
tiers.append(("near", near)) # near == your whole country here
else:
world = near + world
tiers.append(("world", world))
rows = []
for key, group in tiers:
for r in group:
r["__section"] = key
rows.append(r)
else:
rows = _fetch("world", limit, offset)
for r in rows:
r["__section"] = "world"
next_offset = offset + limit if len(rows) == limit else None
else:
rows = _fetch(None, limit, offset)
next_offset = offset + len(rows) if len(rows) == limit else None
# Section grouping only — paywalled-source stories are excluded upstream now.
_SEC = {"near": 0, "country": 1, "world": 2}
rows = sorted(rows, key=lambda r: _SEC.get(r.get("__section"), 0))
return FeedResponse(
topic=topic,
flavor=flavor,
count=len(rows),
items=[Article.from_row(r) for r in rows],
next_offset=next_offset,
)
@app.get("/api/search", response_model=FeedResponse)
def search(response: Response, q: str = Query("", max_length=120),
prefs: str | None = Query(None), limit: int = Query(30, ge=1, le=60),
offset: int = Query(0, ge=0)) -> FeedResponse:
# Public article search across the visitor-facing corpus. Mirrors the feed's
# boundaries (accepted/visible/non-duplicate + the reader's Calm Filters /
# avoid-terms) but NOT a lane scope — you searched on purpose. Ranked by
# relevance (bm25), recency as a tie-break. Per-reader → never edge-cached.
response.headers["Cache-Control"] = _PRIVATE
fts = _fts_query(q)
if not fts:
return FeedResponse(topic=None, flavor=None, count=0, items=[])
fp = prefs_from_json(prefs)
now = datetime.now(timezone.utc)
kw = _prefs_sql_kw(fp, now)
with get_conn() as conn:
if not conn.execute("SELECT 1 FROM article_search LIMIT 1").fetchone():
queries.reindex_search(conn) # lazy build (fresh deploy / before first cycle)
fetch_n = min(2000, (offset + limit) * 4 + 40) if fp.avoid_terms else (offset + limit)
raw = queries.feed(conn, accepted_only=True, limit=fetch_n, offset=0, match=fts, **kw)
kept = filter_articles(raw, fp, now) if fp.avoid_terms else raw # word-boundary avoid-terms
items = kept[offset:offset + limit]
# Keep relevance order (don't paywall-reorder); the badge still shows true status.
return FeedResponse(topic=None, flavor=None, count=len(items),
items=[Article.from_row(r) for r in items])
@app.get("/api/puzzle/{game}")
def daily_puzzle(game: str, variant: str = Query("5")) -> dict:
with get_conn() as conn:
if game == "word" and variant in games.WORD_VARIANTS:
return games.word_puzzle_response(conn, local_today(), variant)
if game == "wordsearch":
return games.wordsearch_response(conn, local_today(), variant)
if game == "bloom":
return bloom.bloom_response(conn, local_today())
raise HTTPException(status_code=404, detail="no such puzzle")
@app.get("/api/puzzle/bloom/free")
def bloom_free(response: Response, format: str = "center", seed: str | None = None) -> dict:
# A free-play wheel: deterministic by `seed` (client stores it to resume),
# random when none is given. Center Circle or Wild Bloom. No DB, no sync.
fmt = "wild" if format == "wild" else "center"
s = seed if (seed and re.fullmatch(r"[A-Za-z0-9_-]{1,32}", seed)) else secrets.token_urlsafe(6)
response.headers["Cache-Control"] = "no-store"
with get_conn() as conn:
return bloom.bloom_free_response(conn, s, fmt)
@app.post("/api/bloom/report")
def bloom_report(body: BloomReportBody) -> dict:
# A player flagging a rejected word as "should count". Public + deduped;
# lands in the admin queue (approve→allow / block / dismiss).
with get_conn() as conn:
ok = bloom.add_report(conn, body.word, body.date, body.mode, body.format,
body.letters, body.reason)
return {"ok": bool(ok)}
@app.post("/api/puzzle/word/guess")
def word_guess(body: WordGuessRequest) -> dict:
if body.variant not in games.WORD_VARIANTS:
raise HTTPException(status_code=404, detail="no such puzzle")
with get_conn() as conn:
res = games.adjudicate_word_guess(conn, local_today(), body.variant, body.guess, body.n)
if "error" in res:
raise HTTPException(status_code=400, detail=res["error"])
return res
# --- Cross-device game state sync (signed-in only; merged server-side) ---
def _game_ok(game: str, variant: str) -> bool:
return (game == "word" and variant in games.WORD_VARIANTS) or \
(game == "wordsearch" and variant in games.WS_TIERS) or \
(game == "bloom" and variant == "") or \
(game == "match" and variant in games.MATCH_VARIANTS) # "<tier>-<format>"
def _valid_pdate(d: str) -> bool:
return bool(re.match(r"^\d{4}-\d{2}-\d{2}$", d or "")) # plain YYYY-MM-DD, no junk rows
@app.get("/api/games/state")
def game_state_get(game: str, variant: str, date: str, request: Request) -> dict:
if not _game_ok(game, variant):
raise HTTPException(status_code=404, detail="no such game")
if not _valid_pdate(date):
raise HTTPException(status_code=400, detail="bad date")
with get_conn() as conn:
user = _current_user(conn, request)
if not user:
return {"state": None}
return {"state": games.load_game_state(conn, user["id"], game, variant, date)}
@app.put("/api/games/state")
def game_state_put(body: GameStateBody, request: Request) -> dict:
if not _game_ok(body.game, body.variant):
raise HTTPException(status_code=404, detail="no such game")
if not _valid_pdate(body.date):
raise HTTPException(status_code=400, detail="bad date")
if len(json.dumps(body.state)) > 20000: # a real game state is tiny — reject junk
raise HTTPException(status_code=413, detail="state too large")
with get_conn() as conn:
user = _current_user(conn, request)
if not user:
return {"state": body.state} # signed out → no sync, just echo
merged = games.save_game_state(conn, user["id"], body.game, body.variant, body.date, body.state or {})
return {"state": merged}
@app.put("/api/games/state/batch")
def game_state_put_batch(body: GameStateBatchBody, request: Request) -> dict:
"""Reconcile many (game, variant) states for one date in a SINGLE request, so
the hub doesn't fan out a dozen calls on every /play load. Each item is
validated/sanitized/merged exactly like the single PUT; unknown or oversized
items are dropped (not fatal). Signed-out → echo (no sync), same as the single
endpoint, so cross-device pull is preserved for signed-in users."""
if not _valid_pdate(body.date):
raise HTTPException(status_code=400, detail="bad date")
items = [it for it in body.items[:32]
if _game_ok(it.game, it.variant) and len(json.dumps(it.state)) <= 20000]
with get_conn() as conn:
user = _current_user(conn, request)
if not user:
return {"states": [{"game": it.game, "variant": it.variant, "state": it.state} for it in items]}
out = []
for it in items:
merged = games.save_game_state(conn, user["id"], it.game, it.variant, body.date, it.state or {})
out.append({"game": it.game, "variant": it.variant, "state": merged})
return {"states": out}
@app.get("/api/games/stats")
def game_stats_get(game: str, variant: str, request: Request) -> dict:
if not _game_ok(game, variant):
raise HTTPException(status_code=404, detail="no such game")
with get_conn() as conn:
user = _current_user(conn, request)
return {"stats": games.game_stats(conn, user["id"], game, variant) if user else None}
# --- Admin: Daily Word pool curation ---
# --- Admin: Bloom word curation (runtime, no deploy) ---
@app.get("/api/admin/bloom/reports")
def admin_bloom_reports(request: Request, status: str = "pending") -> dict:
with get_conn() as conn:
_require_admin(conn, request)
st = status if status in ("pending", "approved", "blocked", "dismissed") else "pending"
return {"status": st, "reports": bloom.list_reports(conn, st),
"overrides": bloom.list_overrides(conn)}
@app.post("/api/admin/bloom/reports/{report_id}")
def admin_bloom_resolve(report_id: int, body: BloomReportActionBody, request: Request) -> dict:
with get_conn() as conn:
admin = _require_admin(conn, request)
ok = bloom.resolve_report(conn, report_id, body.action, by=admin["email"])
if not ok:
raise HTTPException(status_code=400, detail="bad report or action")
return {"ok": True}
@app.post("/api/admin/bloom/overrides")
def admin_bloom_override(body: BloomOverrideBody, request: Request) -> dict:
with get_conn() as conn:
admin = _require_admin(conn, request)
ok = bloom.set_override(conn, body.word, body.action, reason=body.reason, by=admin["email"])
if not ok:
raise HTTPException(status_code=422,
detail="allow needs a real ≥4-letter word with no 'S'; block accepts any word")
return {"ok": True}
@app.delete("/api/admin/bloom/overrides/{word}")
def admin_bloom_override_clear(word: str, request: Request) -> dict:
with get_conn() as conn:
_require_admin(conn, request)
bloom.clear_override(conn, word)
return {"ok": True}
@app.get("/api/admin/word/lookup")
def admin_word_lookup(word: str, request: Request) -> dict:
with get_conn() as conn:
_require_admin(conn, request)
res = games.lookup_word(word)
res["in_pool"] = bool(res["variant"]) and res["word"] in games.answer_pool(conn, res["variant"])
res["removed"] = bool(res["variant"]) and res["word"] in games._db_removed(conn, res["variant"])
return res
@app.get("/api/admin/word/pool")
def admin_word_pool(request: Request) -> dict:
with get_conn() as conn:
_require_admin(conn, request)
return games.pool_summary(conn)
@app.post("/api/admin/word/pool")
def admin_word_pool_add(body: WordPoolBody, request: Request) -> dict:
with get_conn() as conn:
_require_admin(conn, request)
res = games.add_pool_word(conn, body.word)
if "error" in res:
raise HTTPException(status_code=400, detail=res["error"])
return {**res, "pool": games.pool_summary(conn)}
@app.delete("/api/admin/word/pool/{word}")
def admin_word_pool_remove(word: str, request: Request) -> dict:
with get_conn() as conn:
_require_admin(conn, request)
games.remove_pool_word(conn, word)
return games.pool_summary(conn)
@app.post("/api/admin/word/pool/restore")
def admin_word_pool_restore(body: WordPoolBody, request: Request) -> dict:
with get_conn() as conn:
_require_admin(conn, request)
games.restore_pool_word(conn, body.word)
return games.pool_summary(conn)
@app.post("/api/admin/word/pool/import")
def admin_word_pool_import(body: WordPoolImportBody, request: Request) -> dict:
with get_conn() as conn:
_require_admin(conn, request)
words = list(body.words)
if body.text:
words += re.findall(r"[A-Za-z]+", body.text)
res = games.import_pool_words(conn, words)
return {**res, "pool": games.pool_summary(conn)}
# --- Admin: Word Search theme authoring ---
@app.get("/api/admin/wordsearch/themes")
def admin_ws_themes(request: Request) -> list[dict]:
with get_conn() as conn:
_require_admin(conn, request)
return games.list_wordsearch_themes(conn)
@app.post("/api/admin/wordsearch/themes")
def admin_ws_theme_save(body: WordsearchThemeBody, request: Request) -> dict:
with get_conn() as conn:
_require_admin(conn, request)
res = games.save_wordsearch_theme(conn, body.theme, body.words, body.id)
if "error" in res:
raise HTTPException(status_code=400, detail=res["error"])
return {**res, "themes": games.list_wordsearch_themes(conn)}
@app.delete("/api/admin/wordsearch/themes/{tid}")
def admin_ws_theme_remove(tid: int, request: Request) -> list[dict]:
with get_conn() as conn:
_require_admin(conn, request)
games.remove_wordsearch_theme(conn, tid)
return games.list_wordsearch_themes(conn)
@app.post("/api/admin/wordsearch/suggest")
def admin_ws_suggest(body: WordsearchSuggestBody, request: Request) -> dict:
with get_conn() as conn:
_require_admin(conn, request)
from .llm import LocalModelClient
try:
client = LocalModelClient.from_env()
except Exception: # noqa: BLE001
client = None
res = games.suggest_wordsearch_word(client, body.theme, body.existing)
if "error" in res:
raise HTTPException(status_code=503, detail=res["error"])
return res
@app.get("/api/since", response_model=FeedResponse)
def feed_since(ts: str = Query(...), prefs: str | None = Query(None)) -> FeedResponse:
# A calm welcome-back cue: accepted/non-dup/visible articles discovered
# since the reader's last visit (boundary-respecting). count = how many;
# items = a few to show inline. No nagging, no unread state stored.
try:
norm = ts.replace("Z", "+00:00")
dt = datetime.fromisoformat(norm)
since = (dt.astimezone(timezone.utc) if dt.tzinfo else dt).strftime("%Y-%m-%d %H:%M:%S")
except (ValueError, TypeError):
return FeedResponse(topic=None, flavor=None, count=0, items=[])
fp = prefs_from_json(prefs)
now = datetime.now(timezone.utc)
kw = _prefs_sql_kw(fp, now)
with get_conn() as conn:
rows = queries.feed(conn, sort="latest", since=since, limit=60, **kw)
if fp.avoid_terms:
rows = filter_articles(rows, fp, now)
rows = sorted(rows, key=lambda r: is_paywalled_for_source(r["canonical_url"], r["paywall_override"]))
return FeedResponse(topic=None, flavor=None, count=len(rows), items=[Article.from_row(r) for r in rows[:8]])
@app.get("/api/brief", response_model=BriefResponse)
def brief(
response: Response,
date: str | None = Query(None),
limit: int = Query(10, ge=1, le=50),
prefs: str | None = Query(None),
exclude: str = Query("", description="comma-separated article ids the reader has dismissed"),
home: str | None = Query(None, max_length=8, description="local-first highlights: 'US' or 'US-NY'"),
scope: str = Query("nearby", pattern="^(nearby|region|country|world)$", description="radius dial"),
) -> BriefResponse:
# The default highlights are global (date-keyed, no session) → edge-cacheable
# so a new visitor's "Gathering the good news…" resolves from their POP, not
# a pull to the residential origin. Personal filters (incl. a home) stay private.
home_country = home_state = None
if home:
parts = home.upper().split("-", 1)
home_country = parts[0][:2] or None
if home_country == "US" and len(parts) > 1:
home_state = parts[1][:2] or None
shareable = not prefs and not exclude.strip() and not home_country
response.headers["Cache-Control"] = _EDGE_FEED if shareable else _PRIVATE
fp = prefs_from_json(prefs)
now = datetime.now(timezone.utc)
excl = {int(x) for x in exclude.split(",") if x.strip().lstrip("-").isdigit()}
with get_conn() as conn:
if home_country and scope != "world":
# The reader's home + scope dial lead the landing: closest-first good news,
# blended outward so it's always a full set. Over-fetch to survive dismissal/
# boundary filtering, then cap to limit. scope='world' = the global brief.
meta = queries.brief(conn, brief_date=date, limit=1)
data = {"brief_date": meta["brief_date"], "title": "Close to home", "created_at": meta.get("created_at")}
pool = queries.home_brief(conn, home_country, home_state, scope=scope, limit=limit + 12)
else:
data = queries.brief(conn, brief_date=date, limit=limit)
pool = data["items"]
# Drop dismissed (replaced-away) items and anything the reader's
# boundaries hide; avoid-terms take precedence over curation.
items = [a for a in pool if a["id"] not in excl
and not is_paywalled_for_source(a.get("canonical_url"), a.get("paywall_override"))]
if not fp.is_empty():
items = filter_articles(items, fp, now)
items = items[:limit] # home mode over-fetches to survive filtering; cap here
# Keep the highlights full: if a boundary or a dismissal removed a
# story, top up with other readable, boundary-respecting good news
# rather than show fewer. (Home mode's home_brief already blends to world.)
if len(items) < limit:
have = {a["id"] for a in items} | excl
pool = queries.feed(
conn, accepted_only=True, limit=limit * 5 + 40, offset=0, **_prefs_sql_kw(fp, now)
)
for a in filter_articles(pool, fp, now):
if len(items) >= limit:
break
if a["id"] not in have:
items.append(a)
have.add(a["id"])
# Lead with a gentle, readable story (charged or paywalled stories stay
# in the set, just not as the first thing seen).
if home_country and scope != "world" and items:
# Keep "closest first": pick the gentlest hero from ONLY the closest non-empty
# section, so _pick_lead can never float a wider-region/world story above a
# local one. Wider tiers stay in their order behind it.
lead = items[0].get("__section")
head = [a for a in items if a.get("__section") == lead]
tail = [a for a in items if a.get("__section") != lead]
items = _pick_lead(head) + tail
else:
items = _pick_lead(items)
return BriefResponse(
brief_date=data["brief_date"],
title=data["title"],
generated_at=data.get("created_at"),
items=[Article.from_row(r) for r in items],
)
@app.get("/api/brief-dates", response_model=list[str])
def brief_dates(limit: int = Query(30, ge=1, le=365)) -> list[str]:
with get_conn() as conn:
return queries.available_dates(conn, limit=limit)
# --- Daily Art (the /art room) -----------------------------------------
@app.get("/api/art/today")
def art_today(response: Response) -> dict:
with get_conn() as conn:
a = art.get_today(conn)
if not a:
response.headers["Cache-Control"] = _PRIVATE
raise HTTPException(status_code=404, detail="No art yet.")
response.headers["Cache-Control"] = _EDGE_FEED # one piece a day, same for everyone
museums = {"met": "The Met", "aic": "Art Institute of Chicago", "si": "Smithsonian"}
return {
"date": a["art_date"], "object_id": a["object_id"], "title": a["title"],
"artist": a["artist"], "date_text": a["date_text"], "medium": a["medium"],
"department": a["department"], "credit": a["credit"], "source_url": a["source_url"],
"source": a["source"], "museum": museums.get(a["source"], a["source"]),
"is_public_domain": bool(a["is_public_domain"]),
"license": "Public Domain (CC0)" if a["is_public_domain"] else None,
"blurb": a.get("blurb"),
"palette": json.loads(a["palette"]) if a.get("palette") else [],
"image_url": f"/api/art/image/{a['object_id']}",
"image_url_large": f"/api/art/image/{a['object_id']}?size=full",
}
@app.api_route("/api/img/{article_id}", methods=["GET", "HEAD"])
def article_image(article_id: int) -> FileResponse:
"""Serve the locally-cached, downscaled WebP for an accepted, canonical article.
Cache HITS ONLY — this never fetches (the cycle owns all fetching via newsimg.warm),
so the public endpoint has no SSRF/worker-exhaustion surface. A miss 404s; the
frontend handles that (retry, then the typographic cover)."""
with get_conn() as conn:
row = conn.execute(
"SELECT a.image_url FROM articles a JOIN article_scores s ON s.article_id = a.id "
"JOIN sources src ON src.id = a.source_id "
"WHERE a.id = ? AND s.accepted = 1 AND a.duplicate_of IS NULL AND src.image_policy = 'cache'",
(article_id,),
).fetchone()
url = row["image_url"] if row else None
path = newsimg.path_for(url) if url else None
if not path:
raise HTTPException(status_code=404, detail="image not cached")
# NOT immutable: a re-enriched article can change image_url, so the bytes behind a
# given id can change. A day's browser cache is plenty (we're direct-origin, no CDN).
return FileResponse(str(path), media_type="image/webp",
headers={"Cache-Control": "public, max-age=86400"})
@app.api_route("/api/art/image/{object_id}", methods=["GET", "HEAD"])
def art_image(object_id: int, size: str = Query("")) -> FileResponse:
cdir = art.cache_dir()
matches = sorted(cdir.glob(f"{object_id}-full.*")) if size == "full" else []
if not matches: # fall back to the web-large copy
matches = sorted(cdir.glob(f"{object_id}.*"))
if not matches:
raise HTTPException(status_code=404, detail="Not cached.")
# Cached museum image: immutable for a given object id.
return FileResponse(str(matches[0]), headers={"Cache-Control": "public, max-age=31536000, immutable"})
@app.get("/api/onthisday/today")
def onthisday_today(response: Response) -> dict:
with get_conn() as conn:
a = onthisday.get_today(conn)
if not a:
response.headers["Cache-Control"] = _PRIVATE
raise HTTPException(status_code=404, detail="No fact yet.")
response.headers["Cache-Control"] = _EDGE_FEED # one fact a day, same for everyone
return {
"date": a["feature_date"], "year": a["year"], "text": a["text"],
"summary": a["summary"], "image_url": a["image_url"], "source_url": a["page_url"],
"source": a["source"],
}
@app.get("/api/quote/today")
def quote_today(response: Response) -> dict:
with get_conn() as conn:
q = quote.get_today(conn)
if not q:
response.headers["Cache-Control"] = _PRIVATE
raise HTTPException(status_code=404, detail="No quote yet.")
response.headers["Cache-Control"] = _EDGE_FEED
return {"date": q["feature_date"], "text": q["text"], "author": q["author"],
"work": q["work"], "year": q["year"], "meaning": q["meaning"], "source": q["source"]}
@app.get("/api/word/today")
def word_today(response: Response) -> dict:
with get_conn() as conn:
w = wotd.get_today(conn)
if not w:
response.headers["Cache-Control"] = _PRIVATE
raise HTTPException(status_code=404, detail="No word yet.")
response.headers["Cache-Control"] = _EDGE_FEED
# Prefer the LLM-polished gloss + everyday sentences; fall back to the raw dictionary.
raw_examples = w.get("usage") or w.get("examples")
try:
examples = json.loads(raw_examples) if raw_examples else []
except (ValueError, TypeError):
examples = []
return {"date": w["feature_date"], "word": w["word"], "part_of_speech": w["part_of_speech"],
"phonetic": w["phonetic"], "definition": w.get("gloss") or w["definition"], "examples": examples,
"audio_url": f"/api/word/audio/{w['word']}" if w["audio_file"] else None}
@app.api_route("/api/word/audio/{word}", methods=["GET", "HEAD"])
def word_audio(word: str) -> FileResponse:
matches = sorted(wotd.cache_dir().glob(f"{word.lower()}.*"))
matches = [m for m in matches if not m.name.startswith(".")]
if not matches:
raise HTTPException(status_code=404, detail="No audio.")
return FileResponse(str(matches[0]), headers={"Cache-Control": "public, max-age=31536000, immutable"})
# --- Small-joys admin: manage the WOTD / QOTD / On-This-Day pools ----------------
_JOY_TABLES = {"onthisday": "onthisday_pool", "quote": "quote_pool", "word": "wotd_pool"}
_JOY_DAILY = {"onthisday": "daily_onthisday", "quote": "daily_quote", "word": "daily_wotd"}
_JOY_MODULES = {"onthisday": onthisday, "quote": quote, "word": wotd}
_JOY_EDITABLE = { # whitelist of editable columns
"onthisday": {"text", "summary", "year"},
"quote": {"text", "author", "work", "year", "meaning"},
"word": {"definition", "part_of_speech", "phonetic"},
}
@app.get("/api/admin/joys/{kind}")
def admin_joys_list(kind: str, request: Request, limit: int = Query(300, ge=1, le=2000)) -> list[dict]:
table = _JOY_TABLES.get(kind)
if not table:
raise HTTPException(status_code=404, detail="Unknown joy.")
with get_conn() as conn:
_require_admin(conn, request)
rows = conn.execute(f"SELECT * FROM {table} ORDER BY featured DESC, id DESC LIMIT ?", (limit,)).fetchall()
return [dict(r) for r in rows]
@app.post("/api/admin/joys/{kind}/items/{item_id}") # /items/ so 'add'/'repick' don't parse as an id
def admin_joys_mutate(kind: str, item_id: int, body: JoyAction, request: Request) -> dict:
table = _JOY_TABLES.get(kind)
if not table:
raise HTTPException(status_code=404, detail="Unknown joy.")
with get_conn() as conn:
_require_admin(conn, request)
a = body.action
if a in ("block", "unblock"):
conn.execute(f"UPDATE {table} SET blocked=? WHERE id=?", (1 if a == "block" else 0, item_id))
elif a in ("feature", "unfeature"):
conn.execute(f"UPDATE {table} SET featured=? WHERE id=?", (1 if a == "feature" else 0, item_id))
elif a == "delete":
conn.execute(f"DELETE FROM {table} WHERE id=?", (item_id,))
elif a == "edit":
cols = _JOY_EDITABLE[kind] & set((body.fields or {}).keys())
if cols:
sets = ", ".join(f"{c}=?" for c in cols)
conn.execute(f"UPDATE {table} SET {sets} WHERE id=?", (*(body.fields[c] for c in cols), item_id))
else:
raise HTTPException(status_code=400, detail="Unknown action.")
conn.commit()
return {"ok": True}
@app.post("/api/admin/joys/{kind}/add")
def admin_joys_add(kind: str, body: JoyAdd, request: Request) -> dict:
with get_conn() as conn:
_require_admin(conn, request)
if kind == "quote":
if not body.text:
raise HTTPException(status_code=400, detail="text required")
conn.execute("INSERT OR IGNORE INTO quote_pool (source, ckey, text, author, work) VALUES ('admin',?,?,?,?)",
(daily.content_key(body.text, body.author), body.text, body.author, body.work))
elif kind == "onthisday":
if not body.text or not body.md:
raise HTTPException(status_code=400, detail="md + text required")
conn.execute("INSERT OR IGNORE INTO onthisday_pool (source, md, year, ckey, text, summary, image_url, page_url) "
"VALUES ('admin',?,?,?,?,?,?,?)",
(body.md, body.year, daily.content_key(body.md, body.year, body.text),
body.text, body.summary, body.image_url, body.page_url))
elif kind == "word":
if not body.word:
raise HTTPException(status_code=400, detail="word required")
info = wotd._lookup(body.word.strip().lower()) # network up front
if not info:
raise HTTPException(status_code=400, detail="Word not found in dictionary.")
audio_file = wotd._cache_audio(info["audio_url"], info["word"])
polished = wotd._polish(LocalModelClient.from_env(), info["word"], info["part_of_speech"], info["definition"])
gloss = polished["gloss"] if polished else None
usage = json.dumps(polished["examples"]) if polished else None
conn.execute("INSERT OR IGNORE INTO wotd_pool (source, word, part_of_speech, phonetic, audio_file, audio_url, definition, examples, gloss, usage) "
"VALUES ('admin',?,?,?,?,?,?,?,?,?)",
(info["word"], info["part_of_speech"], info["phonetic"], audio_file, info["audio_url"],
info["definition"], json.dumps(info["examples"]), gloss, usage))
else:
raise HTTPException(status_code=404, detail="Unknown joy.")
conn.commit()
return {"ok": True}
@app.post("/api/admin/joys/{kind}/repick")
def admin_joys_repick(kind: str, request: Request) -> dict:
mod = _JOY_MODULES.get(kind)
if not mod:
raise HTTPException(status_code=404, detail="Unknown joy.")
with get_conn() as conn:
_require_admin(conn, request)
cur = conn.execute(
f"SELECT pool_id FROM {_JOY_DAILY[kind]} WHERE feature_date=?", (local_today(),)
).fetchone()
avoid = cur["pool_id"] if cur else None # force a DIFFERENT item, not the same one
kwargs = {"force": True, "avoid": avoid}
if kind in ("quote", "word"): # these polish lazily (gloss / meaning)
kwargs["client"] = LocalModelClient.from_env()
picked = mod.pick_daily(conn, **kwargs)
return {"ok": True, "picked": bool(picked)}
@app.get("/api/replacement", response_model=Article | None)
def replacement(
exclude: str = Query("", description="comma-separated article ids already shown"),
prefs: str | None = Query(None),
avoid_paywall: bool = True,
gentle: bool = Query(False, description="also require lead-safe (for replacing the hero)"),
) -> Article | None:
# Swap a read or paywalled item for the next-best one the reader can
# actually open. The client merges any active mood into `prefs` (same as
# the feed), so this needs no mood param.
fp = prefs_from_json(prefs)
excl = {int(x) for x in exclude.split(",") if x.strip().lstrip("-").isdigit()}
now = datetime.now(timezone.utc)
kw = dict(
include_topics=fp.include_topics or None,
include_flavors=fp.include_flavors or None,
mute_topics=list(fp.muted_topics(now)) or None,
mute_flavors=list(fp.muted_flavors(now)) or None,
max_cortisol=fp.max_cortisol,
max_ragebait=fp.max_ragebait,
)
with get_conn() as conn:
rows = queries.feed(conn, accepted_only=True, limit=120, offset=0, **kw)
for r in filter_articles(rows, fp, now):
if r["id"] in excl:
continue
if avoid_paywall and is_paywalled_for_source(r["canonical_url"], r["paywall_override"]):
continue
if gentle and not safe_to_lead(r):
continue
return Article.from_row(r)
return None
@app.get("/api/candidates", response_model=list[Candidate])
def candidates(status: str | None = Query(None)) -> list[Candidate]:
from .sources import list_candidates
with get_conn() as conn:
rows = list_candidates(conn, status=status)
out = []
for r in rows:
d = dict(r)
pj = d.pop("preview_json", None)
d["preview"] = json.loads(pj) if pj else None
out.append(Candidate(**d))
return out
@app.get("/api/source-preview", response_model=SourcePreview)
def source_preview(
url: str = Query(..., max_length=2048),
sample: int = Query(25, ge=1, le=50),
classify: bool = Query(False, description="Also classify with the local model (accurate but slower)"),
) -> SourcePreview:
# Read-only sample scoring; nothing is persisted. Only http(s) is allowed.
# NOTE: fetching a user-supplied URL is an SSRF surface — before exposing
# this publicly, also block private/loopback/link-local address ranges.
if not re.match(r"^https?://", url, re.IGNORECASE):
raise HTTPException(400, "url must start with http:// or https://")
client = LocalModelClient.from_env() if classify else None
try:
data = feeds.preview_feed(url, sample=sample, client=client)
except Exception as exc:
raise HTTPException(502, f"could not preview feed: {exc}")
return SourcePreview(**data)
# Static site last, mounted at root, so /api/* and /healthz win.
if STATIC_DIR.is_dir():
app.mount("/", StaticFiles(directory=str(STATIC_DIR), html=True), name="site")
return app
app = create_app()