Files
upbeatBytes/goodnews/api.py
T
thejayman77 d2ae56dc65 Accounts Phase 1b: magic-link auth endpoints + sessions
- POST /api/auth/email/start — validate email, rate-limit, email a single-use
  magic link (identical reply regardless, so no account enumeration).
- POST /api/auth/email/verify — consume token, find-or-create user, open a
  session, set an httpOnly cookie (web) and return a bearer token (app).
- GET /api/auth/me, POST /api/auth/logout.
- Session resolved from cookie OR Authorization: Bearer; cookie is Secure in
  prod (https), relaxed for http so tests round-trip. CORS now allows POST.

Live SMTP send verified against the DNSExit relay (587/STARTTLS). 108 tests pass.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-03 01:08:33 +00:00

554 lines
20 KiB
Python

"""FastAPI service for goodNews.
A read-only JSON API over the ingestion database, plus a small static site that
consumes it. The same endpoints back both the website and any future companion
app; the auto-generated OpenAPI docs at /docs are that shared contract.
Run with the bundled CLI: goodnews serve
Or directly: uvicorn goodnews.api:app --host 0.0.0.0 --port 8000
The database path comes from GOODNEWS_DB (falling back to the repo's data dir),
so the API and CLI always read the same file.
"""
from __future__ import annotations
import json
import os
import re
import sqlite3
from collections import Counter
from contextlib import contextmanager
from datetime import datetime, timezone
from pathlib import Path
from fastapi import FastAPI, HTTPException, Query, Request, Response
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from . import auth, email_send, feeds, queries
from .db import connect
from .filters import filter_articles, prefs_from_json
from .hero import safe_to_lead
from .llm import LocalModelClient
from .moods import MOODS, mood_filter
from .paywall import is_paywalled
from .taxonomy import FAMILIES, FLAVORS, TOPICS
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_DB = ROOT / "data" / "goodnews.sqlite3"
# Prefer the built SvelteKit site; fall back to the legacy single-page harness.
FRONTEND_DIR = ROOT / "frontend" / "build"
LEGACY_STATIC = Path(__file__).resolve().parent / "static"
STATIC_DIR = FRONTEND_DIR if FRONTEND_DIR.is_dir() else LEGACY_STATIC
def db_path() -> Path:
return Path(os.environ.get("GOODNEWS_DB", str(DEFAULT_DB)))
# --- Auth helpers -----------------------------------------------------------
PUBLIC_BASE_URL = os.environ.get("GOODNEWS_PUBLIC_BASE_URL", "https://upbeatbytes.com").rstrip("/")
SESSION_COOKIE = "ub_session"
SESSION_MAX_AGE = int(auth.SESSION_TTL.total_seconds())
# Secure cookies in production (https); off for http (local/test) so they round-trip.
_COOKIE_SECURE = PUBLIC_BASE_URL.startswith("https")
_EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$")
def _session_token_from_request(request: Request) -> str | None:
"""Web sends the session as an httpOnly cookie; the app sends a bearer token."""
cookie = request.cookies.get(SESSION_COOKIE)
if cookie:
return cookie
authz = request.headers.get("Authorization", "")
return authz[7:].strip() if authz.startswith("Bearer ") else None
def _current_user(conn: sqlite3.Connection, request: Request) -> sqlite3.Row | None:
user = auth.resolve_session(conn, _session_token_from_request(request))
if user:
conn.commit() # persist the last_seen touch
return user
def _set_session_cookie(response: Response, token: str) -> None:
response.set_cookie(
SESSION_COOKIE, token, max_age=SESSION_MAX_AGE,
httponly=True, secure=_COOKIE_SECURE, samesite="lax", path="/",
)
@contextmanager
def get_conn():
conn = connect(db_path())
try:
yield conn
finally:
conn.close()
def _prefs_sql_kw(fp, now) -> dict:
"""Categorical prefs → queries.feed keyword filters (avoid-terms stay Python)."""
return dict(
include_topics=fp.include_topics or None,
include_flavors=fp.include_flavors or None,
mute_topics=list(fp.muted_topics(now)) or None,
mute_flavors=list(fp.muted_flavors(now)) or None,
max_cortisol=fp.max_cortisol,
max_ragebait=fp.max_ragebait,
)
def _pick_lead(items: list[dict]) -> list[dict]:
"""Lead with a gentle, readable, ideally illustrated story.
Preference order: gentle + readable + has an image, then gentle + readable,
then gentle, then leave the order alone. Charged/paywalled/imageless stories
still appear in the set — they just don't lead.
"""
def gentle(a: dict) -> bool:
return safe_to_lead(a) and not is_paywalled(a.get("canonical_url"))
for ok in (
lambda a: gentle(a) and bool(a.get("image_url")),
gentle,
safe_to_lead,
):
for i, a in enumerate(items):
if ok(a):
return items if i == 0 else [a, *items[:i], *items[i + 1:]]
return items
# --- Response models (the companion-app contract) ---------------------------
class Category(BaseModel):
key: str
description: str
class CategoriesResponse(BaseModel):
topics: list[Category]
flavors: list[Category]
class CategoryCount(BaseModel):
topic: str | None
flavor: str | None
count: int
class Article(BaseModel):
id: int
title: str
description: str | None = None
url: str
image_url: str | None = None
published_at: str | None = None
source: str
topic: str | None = None
flavor: str | None = None
accepted: bool
rank_score: int | None = None
reason_code: str | None = None
reason_text: str | None = None
model_name: str | None = None
rank: int | None = None # position within a brief, when applicable
paywalled: bool = False
tags: list[str] = []
@classmethod
def from_row(cls, row: dict) -> "Article":
raw_tags = row.get("tags")
return cls(
id=row["id"],
title=row["title"],
description=row.get("description"),
url=row["canonical_url"],
image_url=row.get("image_url"),
published_at=row.get("published_at"),
source=row["source_name"],
topic=row.get("topic"),
flavor=row.get("flavor"),
accepted=bool(row.get("accepted")),
rank_score=row.get("rank_score"),
reason_code=row.get("reason_code"),
reason_text=row.get("reason_text"),
model_name=row.get("model_name"),
rank=row.get("rank"),
paywalled=is_paywalled(row.get("canonical_url")),
tags=[t for t in (raw_tags.split(",") if raw_tags else []) if t],
)
class FeedResponse(BaseModel):
topic: str | None
flavor: str | None
count: int
items: list[Article]
class BriefResponse(BaseModel):
brief_date: str | None
title: str | None
generated_at: str | None = None # freshness stamp: changes only when content changes
items: list[Article]
class RejectedExample(BaseModel):
title: str
reason: str
class Candidate(BaseModel):
id: int
feed_url: str
homepage_url: str | None = None
name: str | None = None
status: str
preview: dict | None = None
notes: str | None = None
last_previewed_at: str | None = None
created_at: str | None = None
updated_at: str | None = None
class SourcePreview(BaseModel):
url: str
sampled: int
classified: bool
accepted: int
acceptance_rate: float
avg_cortisol: float
avg_ragebait: float
avg_pr_risk: float
newest_published: str | None
recent_7d: int
topic_mix: dict[str, int]
flavor_mix: dict[str, int]
examples_accepted: list[str]
examples_rejected: list[RejectedExample]
class EmailStartRequest(BaseModel):
email: str
class TokenVerifyRequest(BaseModel):
token: str
class UserOut(BaseModel):
id: int
email: str
display_name: str | None = None
class SessionOut(BaseModel):
user: UserOut
token: str # for non-browser (app) clients; the web SPA uses the cookie
# --- App --------------------------------------------------------------------
def create_app() -> FastAPI:
app = FastAPI(
title="goodNews API",
version="0.1.0",
description="Constructive, uplifting news — metadata and links only.",
)
# The website and companion app may live on other origins; allow them.
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["GET", "POST"],
allow_headers=["*"],
)
@app.get("/healthz")
def healthz() -> dict:
# Read-only: the schema is owned by the ingestion CLI, so the API never
# writes (it can run as a read-only replica against a shared DB).
try:
with get_conn() as conn:
scored = conn.execute("SELECT COUNT(*) FROM article_scores").fetchone()[0]
except sqlite3.Error:
scored = 0
return {"status": "ok", "scored_articles": scored}
# --- Auth: passwordless magic link (Google added in Phase 2) ----------
@app.post("/api/auth/email/start")
def auth_email_start(body: EmailStartRequest) -> dict:
email = auth.normalize_email(body.email)
if not _EMAIL_RE.match(email):
raise HTTPException(status_code=422, detail="Please enter a valid email address.")
with get_conn() as conn:
# Light abuse guard: cap recent tokens per address (still reply OK).
recent = conn.execute(
"SELECT COUNT(*) FROM login_tokens WHERE email = ? "
"AND created_at > datetime('now', '-10 minutes')",
(email,),
).fetchone()[0]
if recent < 5:
raw = auth.create_login_token(conn, email)
conn.commit()
link = f"{PUBLIC_BASE_URL}/auth/verify?token={raw}"
try:
email_send.send_magic_link(email, link)
except Exception:
pass # never leak send failures or whether the address exists
# Always identical (no account enumeration).
return {"ok": True}
@app.post("/api/auth/email/verify", response_model=SessionOut)
def auth_email_verify(body: TokenVerifyRequest, request: Request, response: Response) -> SessionOut:
with get_conn() as conn:
email = auth.consume_login_token(conn, body.token)
if not email:
conn.commit()
raise HTTPException(status_code=400, detail="This sign-in link is invalid or has expired.")
user_id = auth.find_or_create_user(conn, email, "email", email)
token = auth.create_session(conn, user_id, user_agent=request.headers.get("User-Agent"))
conn.commit()
user = auth.get_user(conn, user_id)
_set_session_cookie(response, token)
return SessionOut(
user=UserOut(id=user["id"], email=user["email"], display_name=user["display_name"]),
token=token,
)
@app.get("/api/auth/me", response_model=UserOut | None)
def auth_me(request: Request) -> UserOut | None:
with get_conn() as conn:
user = _current_user(conn, request)
if not user:
return None
return UserOut(id=user["id"], email=user["email"], display_name=user["display_name"])
@app.post("/api/auth/logout")
def auth_logout(request: Request, response: Response) -> dict:
with get_conn() as conn:
auth.revoke_session(conn, _session_token_from_request(request))
conn.commit()
response.delete_cookie(SESSION_COOKIE, path="/")
return {"ok": True}
@app.get("/api/categories", response_model=CategoriesResponse)
def categories() -> CategoriesResponse:
return CategoriesResponse(
topics=[Category(key=k, description=v) for k, v in TOPICS.items()],
flavors=[Category(key=k, description=v) for k, v in FLAVORS.items()],
)
@app.get("/api/moods")
def moods() -> list[dict]:
# The humane front door: each mood resolves to a filter preset the
# client merges with the user's own Calm Filters.
return MOODS
@app.get("/api/families")
def families() -> list[dict]:
# Grouping vocabulary organised into calm families for the Explore UI.
with get_conn() as conn:
counts = queries.tag_counts(conn)
return [
{
"name": name,
"description": d["description"],
"tags": [{"key": t, "count": counts.get(t, 0)} for t in d["tags"]],
}
for name, d in FAMILIES.items()
]
@app.get("/api/category-counts", response_model=list[CategoryCount])
def category_counts(accepted_only: bool = True, prefs: str | None = Query(None)) -> list[CategoryCount]:
fp = prefs_from_json(prefs)
with get_conn() as conn:
if fp.is_empty():
rows = queries.category_counts(conn, accepted_only=accepted_only)
else:
# Count over the SAME filtered set the feed would return, so the
# browse numbers always match what the user actually sees.
allrows = queries.feed(conn, accepted_only=accepted_only, limit=100000, offset=0)
kept = filter_articles(allrows, fp, datetime.now(timezone.utc))
counts = Counter((r["topic"], r["flavor"]) for r in kept)
rows = [
{"topic": t, "flavor": f, "count": n}
for (t, f), n in sorted(counts.items(), key=lambda kv: (str(kv[0][0]), str(kv[0][1])))
]
return [CategoryCount(**row) for row in rows]
@app.get("/api/feed", response_model=FeedResponse)
def feed(
topic: str | None = Query(None),
flavor: str | None = Query(None),
accepted_only: bool = True,
limit: int = Query(30, ge=1, le=100),
offset: int = Query(0, ge=0),
prefs: str | None = Query(None),
exclude: str = Query("", description="comma-separated article ids the reader has dismissed"),
tag: str | None = Query(None, description="grouping tag to browse"),
) -> FeedResponse:
if topic and topic.lower() not in TOPICS:
raise HTTPException(400, f"unknown topic: {topic}")
if flavor and flavor.lower() not in FLAVORS:
raise HTTPException(400, f"unknown flavor: {flavor}")
fp = prefs_from_json(prefs)
now = datetime.now(timezone.utc)
excl = {int(x) for x in exclude.split(",") if x.strip().lstrip("-").isdigit()}
# Categorical filters (include/mute topics+flavors incl. active pauses,
# cortisol ceiling) go to SQL so nothing is truncated by ranking. Only
# word-boundary avoid-terms and dismissals need a Python pass.
kw = _prefs_sql_kw(fp, now)
with get_conn() as conn:
if fp.avoid_terms or excl:
# Over-fetch enough to cover what the Python pass might remove.
fetch_n = min(2000, (offset + limit) * 4 + 50 + len(excl))
raw = queries.feed(
conn, topic=topic, flavor=flavor, accepted_only=accepted_only,
limit=fetch_n, offset=0, tag=tag, **kw,
)
kept = [a for a in filter_articles(raw, fp, now) if a["id"] not in excl]
rows = kept[offset : offset + limit]
else:
rows = queries.feed(
conn, topic=topic, flavor=flavor, accepted_only=accepted_only,
limit=limit, offset=offset, tag=tag, **kw,
)
# Keep the top of a browse view readable: stable-sort paywalled items
# below readable ones (composite order preserved within each group).
rows = sorted(rows, key=lambda r: is_paywalled(r["canonical_url"]))
return FeedResponse(
topic=topic,
flavor=flavor,
count=len(rows),
items=[Article.from_row(r) for r in rows],
)
@app.get("/api/brief", response_model=BriefResponse)
def brief(
date: str | None = Query(None),
limit: int = Query(10, ge=1, le=50),
prefs: str | None = Query(None),
exclude: str = Query("", description="comma-separated article ids the reader has dismissed"),
) -> BriefResponse:
fp = prefs_from_json(prefs)
now = datetime.now(timezone.utc)
excl = {int(x) for x in exclude.split(",") if x.strip().lstrip("-").isdigit()}
with get_conn() as conn:
data = queries.brief(conn, brief_date=date, limit=limit)
# Drop dismissed (replaced-away) items and anything the reader's
# boundaries hide; avoid-terms take precedence over curation.
items = [a for a in data["items"] if a["id"] not in excl]
if not fp.is_empty():
items = filter_articles(items, fp, now)
# Keep the highlights full: if a boundary or a dismissal removed a
# story, top up with other readable, boundary-respecting good news
# rather than show fewer.
if len(items) < limit:
have = {a["id"] for a in items} | excl
pool = queries.feed(
conn, accepted_only=True, limit=limit * 5 + 40, offset=0, **_prefs_sql_kw(fp, now)
)
for a in filter_articles(pool, fp, now):
if len(items) >= limit:
break
if a["id"] not in have:
items.append(a)
have.add(a["id"])
# Lead with a gentle, readable story (charged or paywalled stories stay
# in the set, just not as the first thing seen).
items = _pick_lead(items)
return BriefResponse(
brief_date=data["brief_date"],
title=data["title"],
generated_at=data.get("created_at"),
items=[Article.from_row(r) for r in items],
)
@app.get("/api/brief-dates", response_model=list[str])
def brief_dates(limit: int = Query(30, ge=1, le=365)) -> list[str]:
with get_conn() as conn:
return queries.available_dates(conn, limit=limit)
@app.get("/api/replacement", response_model=Article | None)
def replacement(
exclude: str = Query("", description="comma-separated article ids already shown"),
prefs: str | None = Query(None),
avoid_paywall: bool = True,
gentle: bool = Query(False, description="also require lead-safe (for replacing the hero)"),
) -> Article | None:
# Swap a read or paywalled item for the next-best one the reader can
# actually open. The client merges any active mood into `prefs` (same as
# the feed), so this needs no mood param.
fp = prefs_from_json(prefs)
excl = {int(x) for x in exclude.split(",") if x.strip().lstrip("-").isdigit()}
now = datetime.now(timezone.utc)
kw = dict(
include_topics=fp.include_topics or None,
include_flavors=fp.include_flavors or None,
mute_topics=list(fp.muted_topics(now)) or None,
mute_flavors=list(fp.muted_flavors(now)) or None,
max_cortisol=fp.max_cortisol,
max_ragebait=fp.max_ragebait,
)
with get_conn() as conn:
rows = queries.feed(conn, accepted_only=True, limit=120, offset=0, **kw)
for r in filter_articles(rows, fp, now):
if r["id"] in excl:
continue
if avoid_paywall and is_paywalled(r["canonical_url"]):
continue
if gentle and not safe_to_lead(r):
continue
return Article.from_row(r)
return None
@app.get("/api/candidates", response_model=list[Candidate])
def candidates(status: str | None = Query(None)) -> list[Candidate]:
from .sources import list_candidates
with get_conn() as conn:
rows = list_candidates(conn, status=status)
out = []
for r in rows:
d = dict(r)
pj = d.pop("preview_json", None)
d["preview"] = json.loads(pj) if pj else None
out.append(Candidate(**d))
return out
@app.get("/api/source-preview", response_model=SourcePreview)
def source_preview(
url: str = Query(..., max_length=2048),
sample: int = Query(25, ge=1, le=50),
classify: bool = Query(False, description="Also classify with the local model (accurate but slower)"),
) -> SourcePreview:
# Read-only sample scoring; nothing is persisted. Only http(s) is allowed.
# NOTE: fetching a user-supplied URL is an SSRF surface — before exposing
# this publicly, also block private/loopback/link-local address ranges.
if not re.match(r"^https?://", url, re.IGNORECASE):
raise HTTPException(400, "url must start with http:// or https://")
client = LocalModelClient.from_env() if classify else None
try:
data = feeds.preview_feed(url, sample=sample, client=client)
except Exception as exc:
raise HTTPException(502, f"could not preview feed: {exc}")
return SourcePreview(**data)
# Static site last, mounted at root, so /api/* and /healthz win.
if STATIC_DIR.is_dir():
app.mount("/", StaticFiles(directory=str(STATIC_DIR), html=True), name="site")
return app
app = create_app()