Files
upbeatBytes/goodnews/api.py
T
thejayman77 a2765af3fc Fix: capture Google avatar on returning sign-in (+ userinfo fallback)
find_or_create_user returned early when the identity already existed, so a
returning Google sign-in never refreshed the profile picture (the name had been
set earlier, at link time — which is why name worked but avatar stayed null).
Now profile bits refresh on every sign-in. Also fall back to the OIDC userinfo
endpoint for the picture if the ID token omits it. 119 tests pass.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-03 14:57:44 +00:00

852 lines
32 KiB
Python

"""FastAPI service for goodNews.
A read-only JSON API over the ingestion database, plus a small static site that
consumes it. The same endpoints back both the website and any future companion
app; the auto-generated OpenAPI docs at /docs are that shared contract.
Run with the bundled CLI: goodnews serve
Or directly: uvicorn goodnews.api:app --host 0.0.0.0 --port 8000
The database path comes from GOODNEWS_DB (falling back to the repo's data dir),
so the API and CLI always read the same file.
"""
from __future__ import annotations
import hashlib
import hmac
import json
import os
import re
import secrets
import sqlite3
from collections import Counter
from contextlib import contextmanager
from datetime import datetime, timezone
from pathlib import Path
from fastapi import BackgroundTasks, FastAPI, HTTPException, Query, Request, Response
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import RedirectResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from . import auth, email_send, feeds, oauth_google, queries
from .db import connect
from .filters import filter_articles, prefs_from_json
from .hero import safe_to_lead
from .llm import LocalModelClient
from .moods import MOODS, mood_filter
from .paywall import is_paywalled
from .taxonomy import FAMILIES, FLAVORS, TOPICS
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_DB = ROOT / "data" / "goodnews.sqlite3"
# Prefer the built SvelteKit site; fall back to the legacy single-page harness.
FRONTEND_DIR = ROOT / "frontend" / "build"
LEGACY_STATIC = Path(__file__).resolve().parent / "static"
STATIC_DIR = FRONTEND_DIR if FRONTEND_DIR.is_dir() else LEGACY_STATIC
def db_path() -> Path:
return Path(os.environ.get("GOODNEWS_DB", str(DEFAULT_DB)))
# --- Auth helpers -----------------------------------------------------------
PUBLIC_BASE_URL = os.environ.get("GOODNEWS_PUBLIC_BASE_URL", "https://upbeatbytes.com").rstrip("/")
SESSION_COOKIE = "ub_session"
OAUTH_COOKIE = "ub_oauth"
SESSION_MAX_AGE = int(auth.SESSION_TTL.total_seconds())
SESSION_SECRET = os.environ.get("GOODNEWS_SESSION_SECRET", "dev-insecure-secret")
# Secure cookies in production (https); off for http (local/test) so they round-trip.
_COOKIE_SECURE = PUBLIC_BASE_URL.startswith("https")
_EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$")
def _sign(value: str) -> str:
sig = hmac.new(SESSION_SECRET.encode(), value.encode(), hashlib.sha256).hexdigest()
return f"{value}.{sig}"
def _unsign(signed: str | None) -> str | None:
if not signed or "." not in signed:
return None
value, _, sig = signed.rpartition(".")
expected = hmac.new(SESSION_SECRET.encode(), value.encode(), hashlib.sha256).hexdigest()
return value if hmac.compare_digest(sig, expected) else None
def _google_redirect_uri() -> str:
return f"{PUBLIC_BASE_URL}/api/auth/google/callback"
def _session_token_from_request(request: Request) -> str | None:
"""Web sends the session as an httpOnly cookie; the app sends a bearer token."""
cookie = request.cookies.get(SESSION_COOKIE)
if cookie:
return cookie
authz = request.headers.get("Authorization", "")
return authz[7:].strip() if authz.startswith("Bearer ") else None
def _current_user(conn: sqlite3.Connection, request: Request) -> sqlite3.Row | None:
user = auth.resolve_session(conn, _session_token_from_request(request))
if user:
conn.commit() # persist the last_seen touch
return user
def _require_user(conn: sqlite3.Connection, request: Request) -> sqlite3.Row:
user = _current_user(conn, request)
if not user:
raise HTTPException(status_code=401, detail="Sign in to do that.")
return user
def _user_out(user: sqlite3.Row) -> dict:
return {
"id": user["id"],
"email": user["email"],
"display_name": user["display_name"],
"avatar_url": user["avatar_url"],
}
def _send_link_safe(email: str, link: str) -> None:
"""Send the magic link, swallowing failures (runs off the request path)."""
try:
email_send.send_magic_link(email, link)
except Exception:
pass # don't crash the worker; never surfaced to the caller anyway
def _set_session_cookie(response: Response, token: str) -> None:
response.set_cookie(
SESSION_COOKIE, token, max_age=SESSION_MAX_AGE,
httponly=True, secure=_COOKIE_SECURE, samesite="lax", path="/",
)
@contextmanager
def get_conn():
conn = connect(db_path())
try:
yield conn
finally:
conn.close()
def _prefs_sql_kw(fp, now) -> dict:
"""Categorical prefs → queries.feed keyword filters (avoid-terms stay Python)."""
return dict(
include_topics=fp.include_topics or None,
include_flavors=fp.include_flavors or None,
mute_topics=list(fp.muted_topics(now)) or None,
mute_flavors=list(fp.muted_flavors(now)) or None,
max_cortisol=fp.max_cortisol,
max_ragebait=fp.max_ragebait,
)
def _pick_lead(items: list[dict]) -> list[dict]:
"""Lead with a gentle, readable, ideally illustrated story.
Preference order: gentle + readable + has an image, then gentle + readable,
then gentle, then leave the order alone. Charged/paywalled/imageless stories
still appear in the set — they just don't lead.
"""
def gentle(a: dict) -> bool:
return safe_to_lead(a) and not is_paywalled(a.get("canonical_url"))
for ok in (
lambda a: gentle(a) and bool(a.get("image_url")),
gentle,
safe_to_lead,
):
for i, a in enumerate(items):
if ok(a):
return items if i == 0 else [a, *items[:i], *items[i + 1:]]
return items
# --- Response models (the companion-app contract) ---------------------------
class Category(BaseModel):
key: str
description: str
class CategoriesResponse(BaseModel):
topics: list[Category]
flavors: list[Category]
class CategoryCount(BaseModel):
topic: str | None
flavor: str | None
count: int
class Article(BaseModel):
id: int
title: str
description: str | None = None
url: str
image_url: str | None = None
published_at: str | None = None
source: str
topic: str | None = None
flavor: str | None = None
accepted: bool
rank_score: int | None = None
reason_code: str | None = None
reason_text: str | None = None
model_name: str | None = None
rank: int | None = None # position within a brief, when applicable
paywalled: bool = False
tags: list[str] = []
@classmethod
def from_row(cls, row: dict) -> "Article":
raw_tags = row.get("tags")
return cls(
id=row["id"],
title=row["title"],
description=row.get("description"),
url=row["canonical_url"],
image_url=row.get("image_url"),
published_at=row.get("published_at"),
source=row["source_name"],
topic=row.get("topic"),
flavor=row.get("flavor"),
accepted=bool(row.get("accepted")),
rank_score=row.get("rank_score"),
reason_code=row.get("reason_code"),
reason_text=row.get("reason_text"),
model_name=row.get("model_name"),
rank=row.get("rank"),
paywalled=is_paywalled(row.get("canonical_url")),
tags=[t for t in (raw_tags.split(",") if raw_tags else []) if t],
)
class FeedResponse(BaseModel):
topic: str | None
flavor: str | None
count: int
items: list[Article]
class BriefResponse(BaseModel):
brief_date: str | None
title: str | None
generated_at: str | None = None # freshness stamp: changes only when content changes
items: list[Article]
class RejectedExample(BaseModel):
title: str
reason: str
class Candidate(BaseModel):
id: int
feed_url: str
homepage_url: str | None = None
name: str | None = None
status: str
preview: dict | None = None
notes: str | None = None
last_previewed_at: str | None = None
created_at: str | None = None
updated_at: str | None = None
class SourcePreview(BaseModel):
url: str
sampled: int
classified: bool
accepted: int
acceptance_rate: float
avg_cortisol: float
avg_ragebait: float
avg_pr_risk: float
newest_published: str | None
recent_7d: int
topic_mix: dict[str, int]
flavor_mix: dict[str, int]
examples_accepted: list[str]
examples_rejected: list[RejectedExample]
class EmailStartRequest(BaseModel):
email: str
class TokenVerifyRequest(BaseModel):
token: str
class UserOut(BaseModel):
id: int
email: str
display_name: str | None = None
avatar_url: str | None = None
class SessionOut(BaseModel):
user: UserOut
token: str # for non-browser (app) clients; the web SPA uses the cookie
class IdsBody(BaseModel):
ids: list[int] = []
class ImportBody(BaseModel):
seen: list[int] = []
saved: list[int] = []
class PrefsBody(BaseModel):
prefs: dict = {}
# --- App --------------------------------------------------------------------
def create_app() -> FastAPI:
app = FastAPI(
title="goodNews API",
version="0.1.0",
description="Constructive, uplifting news — metadata and links only.",
)
# The website and companion app may live on other origins; allow them.
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["GET", "POST"],
allow_headers=["*"],
)
@app.get("/healthz")
def healthz() -> dict:
# Read-only: the schema is owned by the ingestion CLI, so the API never
# writes (it can run as a read-only replica against a shared DB).
try:
with get_conn() as conn:
scored = conn.execute("SELECT COUNT(*) FROM article_scores").fetchone()[0]
except sqlite3.Error:
scored = 0
return {"status": "ok", "scored_articles": scored}
# --- Auth: passwordless magic link (Google added in Phase 2) ----------
@app.post("/api/auth/email/start")
def auth_email_start(body: EmailStartRequest, background_tasks: BackgroundTasks) -> dict:
email = auth.normalize_email(body.email)
if not _EMAIL_RE.match(email):
raise HTTPException(status_code=422, detail="Please enter a valid email address.")
link = None
with get_conn() as conn:
# Light abuse guard: cap recent tokens per address (still reply OK).
recent = conn.execute(
"SELECT COUNT(*) FROM login_tokens WHERE email = ? "
"AND created_at > datetime('now', '-10 minutes')",
(email,),
).fetchone()[0]
if recent < 5:
raw = auth.create_login_token(conn, email)
conn.commit()
link = f"{PUBLIC_BASE_URL}/auth/verify?token={raw}"
# Hand the (slow) SMTP send to a background task so the request returns
# immediately. Reply is always identical (no account enumeration).
if link:
background_tasks.add_task(_send_link_safe, email, link)
return {"ok": True}
@app.post("/api/auth/email/verify", response_model=SessionOut)
def auth_email_verify(body: TokenVerifyRequest, request: Request, response: Response) -> SessionOut:
with get_conn() as conn:
email = auth.consume_login_token(conn, body.token)
if not email:
conn.commit()
raise HTTPException(status_code=400, detail="This sign-in link is invalid or has expired.")
user_id = auth.find_or_create_user(conn, email, "email", email)
token = auth.create_session(conn, user_id, user_agent=request.headers.get("User-Agent"))
conn.commit()
user = auth.get_user(conn, user_id)
_set_session_cookie(response, token)
return SessionOut(user=UserOut(**_user_out(user)), token=token)
@app.get("/api/auth/me", response_model=UserOut | None)
def auth_me(request: Request) -> UserOut | None:
with get_conn() as conn:
user = _current_user(conn, request)
return UserOut(**_user_out(user)) if user else None
@app.post("/api/auth/logout")
def auth_logout(request: Request, response: Response) -> dict:
with get_conn() as conn:
auth.revoke_session(conn, _session_token_from_request(request))
conn.commit()
response.delete_cookie(SESSION_COOKIE, path="/")
return {"ok": True}
# --- Auth: Google (OAuth 2.0 / OIDC) ----------------------------------
@app.get("/api/auth/google/start")
def google_start() -> RedirectResponse:
if not oauth_google.configured():
raise HTTPException(status_code=503, detail="Google sign-in isn't configured.")
state = secrets.token_urlsafe(24)
verifier, challenge = oauth_google.new_pkce()
url = oauth_google.auth_url(_google_redirect_uri(), state, challenge)
resp = RedirectResponse(url, status_code=302)
# Bind the flow to this browser; read back (and CSRF-checked) on callback.
resp.set_cookie(
OAUTH_COOKIE, _sign(f"{state}:{verifier}"), max_age=600,
httponly=True, secure=_COOKIE_SECURE, samesite="lax", path="/",
)
return resp
@app.get("/api/auth/google/callback")
def google_callback(
request: Request,
code: str | None = None,
state: str | None = None,
error: str | None = None,
) -> RedirectResponse:
fail = RedirectResponse(f"{PUBLIC_BASE_URL}/auth/verify?error=google", status_code=302)
if error or not code or not state:
return fail
saved = _unsign(request.cookies.get(OAUTH_COOKIE))
if not saved:
return fail
saved_state, _, verifier = saved.partition(":")
if not hmac.compare_digest(saved_state, state):
return fail
try:
tokens = oauth_google.exchange_code(code, _google_redirect_uri(), verifier)
info = oauth_google.verify_id_token(tokens["id_token"])
if not info.get("picture") and tokens.get("access_token"):
info["picture"] = oauth_google.fetch_userinfo(tokens["access_token"]).get("picture")
except Exception:
return fail
with get_conn() as conn:
user_id = auth.find_or_create_user(
conn, info["email"], "google", info["sub"],
display_name=info.get("name"), avatar_url=info.get("picture"),
)
token = auth.create_session(conn, user_id, user_agent=request.headers.get("User-Agent"))
conn.commit()
ok = RedirectResponse(f"{PUBLIC_BASE_URL}/", status_code=302)
_set_session_cookie(ok, token)
ok.delete_cookie(OAUTH_COOKIE, path="/")
return ok
# --- Saved articles, history, and one-time import (all require sign-in) ---
@app.get("/api/saved", response_model=FeedResponse)
def saved_list(request: Request) -> FeedResponse:
with get_conn() as conn:
user = _require_user(conn, request)
rows = queries.saved(conn, user["id"])
items = [Article.from_row(r) for r in rows]
return FeedResponse(topic=None, flavor=None, count=len(items), items=items)
@app.get("/api/saved/ids")
def saved_id_list(request: Request) -> list[int]:
with get_conn() as conn:
user = _require_user(conn, request)
return queries.saved_ids(conn, user["id"])
@app.post("/api/saved/{article_id}")
def save_article(article_id: int, request: Request) -> dict:
with get_conn() as conn:
user = _require_user(conn, request)
if not conn.execute("SELECT 1 FROM articles WHERE id = ?", (article_id,)).fetchone():
raise HTTPException(status_code=404, detail="No such article.")
conn.execute(
"INSERT OR IGNORE INTO saved_articles (user_id, article_id) VALUES (?, ?)",
(user["id"], article_id),
)
conn.commit()
return {"saved": True}
@app.delete("/api/saved/{article_id}")
def unsave_article(article_id: int, request: Request) -> dict:
with get_conn() as conn:
user = _require_user(conn, request)
conn.execute(
"DELETE FROM saved_articles WHERE user_id = ? AND article_id = ?",
(user["id"], article_id),
)
conn.commit()
return {"saved": False}
@app.get("/api/history", response_model=FeedResponse)
def history_list(request: Request) -> FeedResponse:
with get_conn() as conn:
user = _require_user(conn, request)
rows = queries.history(conn, user["id"])
items = [Article.from_row(r) for r in rows]
return FeedResponse(topic=None, flavor=None, count=len(items), items=items)
@app.post("/api/history")
def record_history(body: IdsBody, request: Request) -> dict:
with get_conn() as conn:
user = _require_user(conn, request)
for aid in queries.existing_article_ids(conn, body.ids):
conn.execute(
"INSERT OR IGNORE INTO user_history (user_id, article_id, event) "
"VALUES (?, ?, 'seen')",
(user["id"], aid),
)
conn.commit()
return {"ok": True}
@app.delete("/api/history/{article_id}")
def remove_history(article_id: int, request: Request) -> dict:
with get_conn() as conn:
user = _require_user(conn, request)
conn.execute(
"DELETE FROM user_history WHERE user_id = ? AND article_id = ?",
(user["id"], article_id),
)
conn.commit()
return {"ok": True}
# --- Prefs sync (Calm Filters / Boundaries follow the account) --------
@app.get("/api/prefs")
def get_prefs(request: Request) -> dict:
with get_conn() as conn:
user = _require_user(conn, request)
row = conn.execute(
"SELECT prefs_json FROM user_prefs WHERE user_id = ?", (user["id"],)
).fetchone()
if not row:
return {"prefs": None} # no row yet → caller seeds from the device
try:
return {"prefs": json.loads(row["prefs_json"])}
except (ValueError, TypeError):
return {"prefs": None}
@app.put("/api/prefs")
def put_prefs(body: PrefsBody, request: Request) -> dict:
blob = json.dumps(body.prefs)[:20000]
with get_conn() as conn:
user = _require_user(conn, request)
conn.execute(
"INSERT INTO user_prefs (user_id, prefs_json, updated_at) "
"VALUES (?, ?, CURRENT_TIMESTAMP) "
"ON CONFLICT(user_id) DO UPDATE SET prefs_json = excluded.prefs_json, "
"updated_at = CURRENT_TIMESTAMP",
(user["id"], blob),
)
conn.commit()
return {"ok": True}
# --- Account: profile, sessions, export, delete -----------------------
@app.get("/api/account")
def account_info(request: Request) -> dict:
with get_conn() as conn:
user = _require_user(conn, request)
providers = [r["provider"] for r in conn.execute(
"SELECT provider FROM identities WHERE user_id = ?", (user["id"],)
)]
sessions = conn.execute(
"SELECT COUNT(*) FROM sessions WHERE user_id = ?", (user["id"],)
).fetchone()[0]
saved = conn.execute(
"SELECT COUNT(*) FROM saved_articles WHERE user_id = ?", (user["id"],)
).fetchone()[0]
return {
"user": {"id": user["id"], "email": user["email"], "display_name": user["display_name"]},
"providers": providers,
"sessions": sessions,
"saved_count": saved,
}
@app.post("/api/account/logout-all")
def logout_all(request: Request, response: Response) -> dict:
with get_conn() as conn:
user = _require_user(conn, request)
conn.execute("DELETE FROM sessions WHERE user_id = ?", (user["id"],))
conn.commit()
response.delete_cookie(SESSION_COOKIE, path="/")
return {"ok": True}
@app.get("/api/account/export")
def export_account(request: Request) -> Response:
with get_conn() as conn:
user = _require_user(conn, request)
uid = user["id"]
providers = [r["provider"] for r in conn.execute(
"SELECT provider FROM identities WHERE user_id = ?", (uid,)
)]
saved = queries.saved(conn, uid, limit=10000)
hist = queries.history(conn, uid, limit=10000)
prow = conn.execute(
"SELECT prefs_json FROM user_prefs WHERE user_id = ?", (uid,)
).fetchone()
slim = lambda a: {"id": a["id"], "title": a["title"], "url": a["canonical_url"]}
data = {
"account": {"id": uid, "email": user["email"],
"display_name": user["display_name"], "created_at": user["created_at"]},
"sign_in_methods": providers,
"saved": [slim(a) for a in saved],
"history": [slim(a) for a in hist],
"preferences": json.loads(prow["prefs_json"]) if prow else None,
}
return Response(
content=json.dumps(data, indent=2),
media_type="application/json",
headers={"Content-Disposition": "attachment; filename=upbeatbytes-data.json"},
)
@app.delete("/api/account")
def delete_account(request: Request, response: Response) -> dict:
with get_conn() as conn:
user = _require_user(conn, request)
conn.execute("DELETE FROM users WHERE id = ?", (user["id"],)) # cascades to all account data
conn.commit()
response.delete_cookie(SESSION_COOKIE, path="/")
return {"ok": True}
@app.post("/api/import")
def import_local(body: ImportBody, request: Request) -> dict:
"""Fold this device's anonymous history/saved into the account (one-time)."""
with get_conn() as conn:
user = _require_user(conn, request)
for aid in queries.existing_article_ids(conn, body.seen):
conn.execute(
"INSERT OR IGNORE INTO user_history (user_id, article_id, event) "
"VALUES (?, ?, 'seen')",
(user["id"], aid),
)
for aid in queries.existing_article_ids(conn, body.saved):
conn.execute(
"INSERT OR IGNORE INTO saved_articles (user_id, article_id) VALUES (?, ?)",
(user["id"], aid),
)
conn.commit()
return {"ok": True}
@app.get("/api/categories", response_model=CategoriesResponse)
def categories() -> CategoriesResponse:
return CategoriesResponse(
topics=[Category(key=k, description=v) for k, v in TOPICS.items()],
flavors=[Category(key=k, description=v) for k, v in FLAVORS.items()],
)
@app.get("/api/moods")
def moods() -> list[dict]:
# The humane front door: each mood resolves to a filter preset the
# client merges with the user's own Calm Filters.
return MOODS
@app.get("/api/families")
def families() -> list[dict]:
# Grouping vocabulary organised into calm families for the Explore UI.
with get_conn() as conn:
counts = queries.tag_counts(conn)
return [
{
"name": name,
"description": d["description"],
"tags": [{"key": t, "count": counts.get(t, 0)} for t in d["tags"]],
}
for name, d in FAMILIES.items()
]
@app.get("/api/category-counts", response_model=list[CategoryCount])
def category_counts(accepted_only: bool = True, prefs: str | None = Query(None)) -> list[CategoryCount]:
fp = prefs_from_json(prefs)
with get_conn() as conn:
if fp.is_empty():
rows = queries.category_counts(conn, accepted_only=accepted_only)
else:
# Count over the SAME filtered set the feed would return, so the
# browse numbers always match what the user actually sees.
allrows = queries.feed(conn, accepted_only=accepted_only, limit=100000, offset=0)
kept = filter_articles(allrows, fp, datetime.now(timezone.utc))
counts = Counter((r["topic"], r["flavor"]) for r in kept)
rows = [
{"topic": t, "flavor": f, "count": n}
for (t, f), n in sorted(counts.items(), key=lambda kv: (str(kv[0][0]), str(kv[0][1])))
]
return [CategoryCount(**row) for row in rows]
@app.get("/api/feed", response_model=FeedResponse)
def feed(
topic: str | None = Query(None),
flavor: str | None = Query(None),
accepted_only: bool = True,
limit: int = Query(30, ge=1, le=100),
offset: int = Query(0, ge=0),
prefs: str | None = Query(None),
exclude: str = Query("", description="comma-separated article ids the reader has dismissed"),
tag: str | None = Query(None, description="grouping tag to browse"),
) -> FeedResponse:
if topic and topic.lower() not in TOPICS:
raise HTTPException(400, f"unknown topic: {topic}")
if flavor and flavor.lower() not in FLAVORS:
raise HTTPException(400, f"unknown flavor: {flavor}")
fp = prefs_from_json(prefs)
now = datetime.now(timezone.utc)
excl = {int(x) for x in exclude.split(",") if x.strip().lstrip("-").isdigit()}
# Categorical filters (include/mute topics+flavors incl. active pauses,
# cortisol ceiling) go to SQL so nothing is truncated by ranking. Only
# word-boundary avoid-terms and dismissals need a Python pass.
kw = _prefs_sql_kw(fp, now)
with get_conn() as conn:
if fp.avoid_terms or excl:
# Over-fetch enough to cover what the Python pass might remove.
fetch_n = min(2000, (offset + limit) * 4 + 50 + len(excl))
raw = queries.feed(
conn, topic=topic, flavor=flavor, accepted_only=accepted_only,
limit=fetch_n, offset=0, tag=tag, **kw,
)
kept = [a for a in filter_articles(raw, fp, now) if a["id"] not in excl]
rows = kept[offset : offset + limit]
else:
rows = queries.feed(
conn, topic=topic, flavor=flavor, accepted_only=accepted_only,
limit=limit, offset=offset, tag=tag, **kw,
)
# Keep the top of a browse view readable: stable-sort paywalled items
# below readable ones (composite order preserved within each group).
rows = sorted(rows, key=lambda r: is_paywalled(r["canonical_url"]))
return FeedResponse(
topic=topic,
flavor=flavor,
count=len(rows),
items=[Article.from_row(r) for r in rows],
)
@app.get("/api/brief", response_model=BriefResponse)
def brief(
date: str | None = Query(None),
limit: int = Query(10, ge=1, le=50),
prefs: str | None = Query(None),
exclude: str = Query("", description="comma-separated article ids the reader has dismissed"),
) -> BriefResponse:
fp = prefs_from_json(prefs)
now = datetime.now(timezone.utc)
excl = {int(x) for x in exclude.split(",") if x.strip().lstrip("-").isdigit()}
with get_conn() as conn:
data = queries.brief(conn, brief_date=date, limit=limit)
# Drop dismissed (replaced-away) items and anything the reader's
# boundaries hide; avoid-terms take precedence over curation.
items = [a for a in data["items"] if a["id"] not in excl]
if not fp.is_empty():
items = filter_articles(items, fp, now)
# Keep the highlights full: if a boundary or a dismissal removed a
# story, top up with other readable, boundary-respecting good news
# rather than show fewer.
if len(items) < limit:
have = {a["id"] for a in items} | excl
pool = queries.feed(
conn, accepted_only=True, limit=limit * 5 + 40, offset=0, **_prefs_sql_kw(fp, now)
)
for a in filter_articles(pool, fp, now):
if len(items) >= limit:
break
if a["id"] not in have:
items.append(a)
have.add(a["id"])
# Lead with a gentle, readable story (charged or paywalled stories stay
# in the set, just not as the first thing seen).
items = _pick_lead(items)
return BriefResponse(
brief_date=data["brief_date"],
title=data["title"],
generated_at=data.get("created_at"),
items=[Article.from_row(r) for r in items],
)
@app.get("/api/brief-dates", response_model=list[str])
def brief_dates(limit: int = Query(30, ge=1, le=365)) -> list[str]:
with get_conn() as conn:
return queries.available_dates(conn, limit=limit)
@app.get("/api/replacement", response_model=Article | None)
def replacement(
exclude: str = Query("", description="comma-separated article ids already shown"),
prefs: str | None = Query(None),
avoid_paywall: bool = True,
gentle: bool = Query(False, description="also require lead-safe (for replacing the hero)"),
) -> Article | None:
# Swap a read or paywalled item for the next-best one the reader can
# actually open. The client merges any active mood into `prefs` (same as
# the feed), so this needs no mood param.
fp = prefs_from_json(prefs)
excl = {int(x) for x in exclude.split(",") if x.strip().lstrip("-").isdigit()}
now = datetime.now(timezone.utc)
kw = dict(
include_topics=fp.include_topics or None,
include_flavors=fp.include_flavors or None,
mute_topics=list(fp.muted_topics(now)) or None,
mute_flavors=list(fp.muted_flavors(now)) or None,
max_cortisol=fp.max_cortisol,
max_ragebait=fp.max_ragebait,
)
with get_conn() as conn:
rows = queries.feed(conn, accepted_only=True, limit=120, offset=0, **kw)
for r in filter_articles(rows, fp, now):
if r["id"] in excl:
continue
if avoid_paywall and is_paywalled(r["canonical_url"]):
continue
if gentle and not safe_to_lead(r):
continue
return Article.from_row(r)
return None
@app.get("/api/candidates", response_model=list[Candidate])
def candidates(status: str | None = Query(None)) -> list[Candidate]:
from .sources import list_candidates
with get_conn() as conn:
rows = list_candidates(conn, status=status)
out = []
for r in rows:
d = dict(r)
pj = d.pop("preview_json", None)
d["preview"] = json.loads(pj) if pj else None
out.append(Candidate(**d))
return out
@app.get("/api/source-preview", response_model=SourcePreview)
def source_preview(
url: str = Query(..., max_length=2048),
sample: int = Query(25, ge=1, le=50),
classify: bool = Query(False, description="Also classify with the local model (accurate but slower)"),
) -> SourcePreview:
# Read-only sample scoring; nothing is persisted. Only http(s) is allowed.
# NOTE: fetching a user-supplied URL is an SSRF surface — before exposing
# this publicly, also block private/loopback/link-local address ranges.
if not re.match(r"^https?://", url, re.IGNORECASE):
raise HTTPException(400, "url must start with http:// or https://")
client = LocalModelClient.from_env() if classify else None
try:
data = feeds.preview_feed(url, sample=sample, client=client)
except Exception as exc:
raise HTTPException(502, f"could not preview feed: {exc}")
return SourcePreview(**data)
# Static site last, mounted at root, so /api/* and /healthz win.
if STATIC_DIR.is_dir():
app.mount("/", StaticFiles(directory=str(STATIC_DIR), html=True), name="site")
return app
app = create_app()