Files
upbeatBytes/goodnews/api.py
T
thejayman77 c25e14ed6a Add a permanent "Latest" lane beside "Highlights"
Restructure the nav around two permanent lanes, then the reader's chosen ones:
"Highlights" (the curated daily brief — formerly "Today") and "Latest" (the
freshest accepted stories, newest-first). Now that the gate is tight, a
chronological "incoming" feed is safe to expose.

* feed(): new sort="latest" (pure recency) alongside the default best-first
  rank; /api/feed exposes sort=ranked|latest (validated). Still accepted-only
  and boundary-respecting either way.
* lanes.py: two pinned lanes (Highlights + Latest) instead of one.
* Home: "Latest" view + "Load more" pagination for every feed view (offset-
  paged, de-duped). Mobile bottom bar gains a Latest tab.
* LanePicker shows both pinned lanes; nav rail renders them first.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-06 15:56:48 -04:00

1081 lines
42 KiB
Python

"""FastAPI service for goodNews.
A read-only JSON API over the ingestion database, plus a small static site that
consumes it. The same endpoints back both the website and any future companion
app; the auto-generated OpenAPI docs at /docs are that shared contract.
Run with the bundled CLI: goodnews serve
Or directly: uvicorn goodnews.api:app --host 0.0.0.0 --port 8000
The database path comes from GOODNEWS_DB (falling back to the repo's data dir),
so the API and CLI always read the same file.
"""
from __future__ import annotations
import hashlib
import hmac
import json
import os
import re
import secrets
import sqlite3
from collections import Counter
from contextlib import contextmanager
from datetime import datetime, timezone
from pathlib import Path
from fastapi import BackgroundTasks, FastAPI, HTTPException, Query, Request, Response
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from . import auth, email_send, feeds, oauth_google, queries, share, summarize
from .db import connect
from .filters import filter_articles, prefs_from_json
from .hero import safe_to_lead
from .llm import LocalModelClient
from .moods import MOODS, mood_filter
from .lanes import build_lane_pool
from .paywall import is_paywalled
from .taxonomy import FAMILIES, FLAVORS, TOPICS
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_DB = ROOT / "data" / "goodnews.sqlite3"
# Prefer the built SvelteKit site; fall back to the legacy single-page harness.
FRONTEND_DIR = ROOT / "frontend" / "build"
LEGACY_STATIC = Path(__file__).resolve().parent / "static"
STATIC_DIR = FRONTEND_DIR if FRONTEND_DIR.is_dir() else LEGACY_STATIC
def db_path() -> Path:
return Path(os.environ.get("GOODNEWS_DB", str(DEFAULT_DB)))
# --- Auth helpers -----------------------------------------------------------
PUBLIC_BASE_URL = os.environ.get("GOODNEWS_PUBLIC_BASE_URL", "https://upbeatbytes.com").rstrip("/")
SESSION_COOKIE = "ub_session"
OAUTH_COOKIE = "ub_oauth"
SESSION_MAX_AGE = int(auth.SESSION_TTL.total_seconds())
SESSION_SECRET = os.environ.get("GOODNEWS_SESSION_SECRET", "dev-insecure-secret")
# Emails that are always admins (normalized), in addition to users.is_admin.
ADMIN_EMAILS = {e.strip().lower() for e in os.environ.get("GOODNEWS_ADMIN_EMAILS", "").split(",") if e.strip()}
# Secure cookies in production (https); off for http (local/test) so they round-trip.
_COOKIE_SECURE = PUBLIC_BASE_URL.startswith("https")
_EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$")
def _sign(value: str) -> str:
sig = hmac.new(SESSION_SECRET.encode(), value.encode(), hashlib.sha256).hexdigest()
return f"{value}.{sig}"
def _unsign(signed: str | None) -> str | None:
if not signed or "." not in signed:
return None
value, _, sig = signed.rpartition(".")
expected = hmac.new(SESSION_SECRET.encode(), value.encode(), hashlib.sha256).hexdigest()
return value if hmac.compare_digest(sig, expected) else None
def _google_redirect_uri() -> str:
return f"{PUBLIC_BASE_URL}/api/auth/google/callback"
def _session_token_from_request(request: Request) -> str | None:
"""Web sends the session as an httpOnly cookie; the app sends a bearer token."""
cookie = request.cookies.get(SESSION_COOKIE)
if cookie:
return cookie
authz = request.headers.get("Authorization", "")
return authz[7:].strip() if authz.startswith("Bearer ") else None
def _current_user(conn: sqlite3.Connection, request: Request) -> sqlite3.Row | None:
user = auth.resolve_session(conn, _session_token_from_request(request))
if user:
conn.commit() # persist the last_seen touch
return user
def _require_user(conn: sqlite3.Connection, request: Request) -> sqlite3.Row:
user = _current_user(conn, request)
if not user:
raise HTTPException(status_code=401, detail="Sign in to do that.")
return user
def _is_admin(user: sqlite3.Row) -> bool:
return bool(user["is_admin"]) or auth.normalize_email(user["email"]) in ADMIN_EMAILS
def _require_admin(conn: sqlite3.Connection, request: Request) -> sqlite3.Row:
user = _require_user(conn, request)
if not _is_admin(user):
raise HTTPException(status_code=403, detail="Admins only.")
return user
def _user_out(user: sqlite3.Row) -> dict:
return {
"id": user["id"],
"email": user["email"],
"display_name": user["display_name"],
"avatar_url": user["avatar_url"],
"is_admin": _is_admin(user),
}
# Articles whose summary is being generated right now — so concurrent pollers /
# scrapers don't each kick off a duplicate LLM call.
_summarizing: set[int] = set()
def _run_summary(article_id: int) -> None:
try:
with get_conn() as conn:
summarize.generate_summary(conn, article_id)
except Exception:
pass
finally:
_summarizing.discard(article_id)
def _kick_summary(article_id: int, background_tasks: BackgroundTasks) -> None:
if article_id in _summarizing:
return
_summarizing.add(article_id)
background_tasks.add_task(_run_summary, article_id)
def _feedback_email_safe(addr: str, category: str, message: str, contact: str | None, who: str) -> None:
try:
email_send.send_feedback(addr, category, message, contact, who)
except Exception:
pass
def _send_link_safe(email: str, link: str) -> None:
"""Send the magic link, swallowing failures (runs off the request path)."""
try:
email_send.send_magic_link(email, link)
except Exception:
pass # don't crash the worker; never surfaced to the caller anyway
def _set_session_cookie(response: Response, token: str) -> None:
response.set_cookie(
SESSION_COOKIE, token, max_age=SESSION_MAX_AGE,
httponly=True, secure=_COOKIE_SECURE, samesite="lax", path="/",
)
@contextmanager
def get_conn():
conn = connect(db_path())
try:
yield conn
finally:
conn.close()
def _prefs_sql_kw(fp, now) -> dict:
"""Categorical prefs → queries.feed keyword filters (avoid-terms stay Python)."""
return dict(
include_topics=fp.include_topics or None,
include_flavors=fp.include_flavors or None,
mute_topics=list(fp.muted_topics(now)) or None,
mute_flavors=list(fp.muted_flavors(now)) or None,
max_cortisol=fp.max_cortisol,
max_ragebait=fp.max_ragebait,
)
def _pick_lead(items: list[dict]) -> list[dict]:
"""Lead with a gentle, readable, ideally illustrated story.
Preference order: gentle + readable + has an image, then gentle + readable,
then gentle, then leave the order alone. Charged/paywalled/imageless stories
still appear in the set — they just don't lead.
"""
def gentle(a: dict) -> bool:
return safe_to_lead(a) and not is_paywalled(a.get("canonical_url"))
for ok in (
lambda a: gentle(a) and bool(a.get("image_url")),
gentle,
safe_to_lead,
):
for i, a in enumerate(items):
if ok(a):
return items if i == 0 else [a, *items[:i], *items[i + 1:]]
return items
# --- Response models (the companion-app contract) ---------------------------
class Category(BaseModel):
key: str
description: str
class CategoriesResponse(BaseModel):
topics: list[Category]
flavors: list[Category]
class CategoryCount(BaseModel):
topic: str | None
flavor: str | None
count: int
class Article(BaseModel):
id: int
title: str
description: str | None = None
url: str
image_url: str | None = None
published_at: str | None = None
source: str
topic: str | None = None
flavor: str | None = None
accepted: bool
rank_score: int | None = None
reason_code: str | None = None
reason_text: str | None = None
model_name: str | None = None
rank: int | None = None # position within a brief, when applicable
paywalled: bool = False
tags: list[str] = []
summary: str | None = None # our own cached summary (present on the brief)
@classmethod
def from_row(cls, row: dict) -> "Article":
raw_tags = row.get("tags")
return cls(
summary=row.get("summary"),
id=row["id"],
title=row["title"],
description=row.get("description"),
url=row["canonical_url"],
image_url=row.get("image_url"),
published_at=row.get("published_at"),
source=row["source_name"],
topic=row.get("topic"),
flavor=row.get("flavor"),
accepted=bool(row.get("accepted")),
rank_score=row.get("rank_score"),
reason_code=row.get("reason_code"),
reason_text=row.get("reason_text"),
model_name=row.get("model_name"),
rank=row.get("rank"),
paywalled=is_paywalled(row.get("canonical_url")),
tags=[t for t in (raw_tags.split(",") if raw_tags else []) if t],
)
class FeedResponse(BaseModel):
topic: str | None
flavor: str | None
count: int
items: list[Article]
class BriefResponse(BaseModel):
brief_date: str | None
title: str | None
generated_at: str | None = None # freshness stamp: changes only when content changes
items: list[Article]
class RejectedExample(BaseModel):
title: str
reason: str
class Candidate(BaseModel):
id: int
feed_url: str
homepage_url: str | None = None
name: str | None = None
status: str
preview: dict | None = None
notes: str | None = None
last_previewed_at: str | None = None
created_at: str | None = None
updated_at: str | None = None
class SourcePreview(BaseModel):
url: str
sampled: int
classified: bool
accepted: int
acceptance_rate: float
avg_cortisol: float
avg_ragebait: float
avg_pr_risk: float
newest_published: str | None
recent_7d: int
topic_mix: dict[str, int]
flavor_mix: dict[str, int]
examples_accepted: list[str]
examples_rejected: list[RejectedExample]
class EmailStartRequest(BaseModel):
email: str
class TokenVerifyRequest(BaseModel):
token: str
class UserOut(BaseModel):
id: int
email: str
display_name: str | None = None
avatar_url: str | None = None
is_admin: bool = False
class SessionOut(BaseModel):
user: UserOut
token: str # for non-browser (app) clients; the web SPA uses the cookie
class IdsBody(BaseModel):
ids: list[int] = []
class ImportBody(BaseModel):
seen: list[int] = []
saved: list[int] = []
class PrefsBody(BaseModel):
prefs: dict = {}
class EventBody(BaseModel):
kind: str
article_id: int | None = None
visitor: str | None = None
class FeedbackBody(BaseModel):
category: str = "other"
message: str = ""
email: str | None = None
visitor: str | None = None
hp: str | None = None # honeypot — bots fill it, humans don't
_FEEDBACK_CATEGORIES = {"idea", "concern", "bug", "praise", "other"}
# The only event kinds we record. All aggregate, non-personal.
_EVENT_KINDS = {
"visit", "open", "summary_viewed", "full_story", "source_click",
"share_ub", "copy_source", "native_share",
"not_today", "less_like_this", "hide_topic",
"replace_used", "replace_none", "paywall_replace", "paywalled_source_open",
}
def _visitor_hash(token: str | None) -> str:
token = (token or "").strip()[:200]
if not token:
return ""
return hashlib.sha256(f"{SESSION_SECRET}:{token}".encode()).hexdigest()
# --- App --------------------------------------------------------------------
def create_app() -> FastAPI:
app = FastAPI(
title="goodNews API",
version="0.1.0",
description="Constructive, uplifting news — metadata and links only.",
)
# The website and companion app may live on other origins; allow them.
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["GET", "POST"],
allow_headers=["*"],
)
@app.get("/healthz")
def healthz() -> dict:
# Read-only: the schema is owned by the ingestion CLI, so the API never
# writes (it can run as a read-only replica against a shared DB).
try:
with get_conn() as conn:
scored = conn.execute("SELECT COUNT(*) FROM article_scores").fetchone()[0]
except sqlite3.Error:
scored = 0
return {"status": "ok", "scored_articles": scored}
# --- Auth: passwordless magic link (Google added in Phase 2) ----------
@app.post("/api/auth/email/start")
def auth_email_start(body: EmailStartRequest, background_tasks: BackgroundTasks) -> dict:
email = auth.normalize_email(body.email)
if not _EMAIL_RE.match(email):
raise HTTPException(status_code=422, detail="Please enter a valid email address.")
link = None
with get_conn() as conn:
# Light abuse guard: cap recent tokens per address (still reply OK).
recent = conn.execute(
"SELECT COUNT(*) FROM login_tokens WHERE email = ? "
"AND created_at > datetime('now', '-10 minutes')",
(email,),
).fetchone()[0]
if recent < 5:
raw = auth.create_login_token(conn, email)
conn.commit()
link = f"{PUBLIC_BASE_URL}/auth/verify?token={raw}"
# Hand the (slow) SMTP send to a background task so the request returns
# immediately. Reply is always identical (no account enumeration).
if link:
background_tasks.add_task(_send_link_safe, email, link)
return {"ok": True}
@app.post("/api/auth/email/verify", response_model=SessionOut)
def auth_email_verify(body: TokenVerifyRequest, request: Request, response: Response) -> SessionOut:
with get_conn() as conn:
email = auth.consume_login_token(conn, body.token)
if not email:
conn.commit()
raise HTTPException(status_code=400, detail="This sign-in link is invalid or has expired.")
user_id = auth.find_or_create_user(conn, email, "email", email)
token = auth.create_session(conn, user_id, user_agent=request.headers.get("User-Agent"))
conn.commit()
user = auth.get_user(conn, user_id)
_set_session_cookie(response, token)
return SessionOut(user=UserOut(**_user_out(user)), token=token)
@app.get("/api/auth/me", response_model=UserOut | None)
def auth_me(request: Request) -> UserOut | None:
with get_conn() as conn:
user = _current_user(conn, request)
return UserOut(**_user_out(user)) if user else None
@app.post("/api/auth/logout")
def auth_logout(request: Request, response: Response) -> dict:
with get_conn() as conn:
auth.revoke_session(conn, _session_token_from_request(request))
conn.commit()
response.delete_cookie(SESSION_COOKIE, path="/")
return {"ok": True}
# --- Auth: Google (OAuth 2.0 / OIDC) ----------------------------------
@app.get("/api/auth/google/start")
def google_start() -> RedirectResponse:
if not oauth_google.configured():
raise HTTPException(status_code=503, detail="Google sign-in isn't configured.")
state = secrets.token_urlsafe(24)
verifier, challenge = oauth_google.new_pkce()
url = oauth_google.auth_url(_google_redirect_uri(), state, challenge)
resp = RedirectResponse(url, status_code=302)
# Bind the flow to this browser; read back (and CSRF-checked) on callback.
resp.set_cookie(
OAUTH_COOKIE, _sign(f"{state}:{verifier}"), max_age=600,
httponly=True, secure=_COOKIE_SECURE, samesite="lax", path="/",
)
return resp
@app.get("/api/auth/google/callback")
def google_callback(
request: Request,
code: str | None = None,
state: str | None = None,
error: str | None = None,
) -> RedirectResponse:
fail = RedirectResponse(f"{PUBLIC_BASE_URL}/auth/verify?error=google", status_code=302)
if error or not code or not state:
return fail
saved = _unsign(request.cookies.get(OAUTH_COOKIE))
if not saved:
return fail
saved_state, _, verifier = saved.partition(":")
if not hmac.compare_digest(saved_state, state):
return fail
try:
tokens = oauth_google.exchange_code(code, _google_redirect_uri(), verifier)
info = oauth_google.verify_id_token(tokens["id_token"])
if not info.get("picture") and tokens.get("access_token"):
info["picture"] = oauth_google.fetch_userinfo(tokens["access_token"]).get("picture")
except Exception:
return fail
with get_conn() as conn:
user_id = auth.find_or_create_user(
conn, info["email"], "google", info["sub"],
display_name=info.get("name"), avatar_url=info.get("picture"),
)
token = auth.create_session(conn, user_id, user_agent=request.headers.get("User-Agent"))
conn.commit()
ok = RedirectResponse(f"{PUBLIC_BASE_URL}/", status_code=302)
_set_session_cookie(ok, token)
ok.delete_cookie(OAUTH_COOKIE, path="/")
return ok
# --- Saved articles, history, and one-time import (all require sign-in) ---
@app.get("/api/saved", response_model=FeedResponse)
def saved_list(request: Request) -> FeedResponse:
with get_conn() as conn:
user = _require_user(conn, request)
rows = queries.saved(conn, user["id"])
items = [Article.from_row(r) for r in rows]
return FeedResponse(topic=None, flavor=None, count=len(items), items=items)
@app.get("/api/saved/ids")
def saved_id_list(request: Request) -> list[int]:
with get_conn() as conn:
user = _require_user(conn, request)
return queries.saved_ids(conn, user["id"])
@app.post("/api/saved/{article_id}")
def save_article(article_id: int, request: Request) -> dict:
with get_conn() as conn:
user = _require_user(conn, request)
if not conn.execute("SELECT 1 FROM articles WHERE id = ?", (article_id,)).fetchone():
raise HTTPException(status_code=404, detail="No such article.")
conn.execute(
"INSERT OR IGNORE INTO saved_articles (user_id, article_id) VALUES (?, ?)",
(user["id"], article_id),
)
conn.commit()
return {"saved": True}
@app.delete("/api/saved/{article_id}")
def unsave_article(article_id: int, request: Request) -> dict:
with get_conn() as conn:
user = _require_user(conn, request)
conn.execute(
"DELETE FROM saved_articles WHERE user_id = ? AND article_id = ?",
(user["id"], article_id),
)
conn.commit()
return {"saved": False}
@app.get("/api/history", response_model=FeedResponse)
def history_list(request: Request) -> FeedResponse:
with get_conn() as conn:
user = _require_user(conn, request)
rows = queries.history(conn, user["id"])
items = [Article.from_row(r) for r in rows]
return FeedResponse(topic=None, flavor=None, count=len(items), items=items)
@app.post("/api/history")
def record_history(body: IdsBody, request: Request) -> dict:
with get_conn() as conn:
user = _require_user(conn, request)
for aid in queries.existing_article_ids(conn, body.ids):
conn.execute(
"INSERT OR IGNORE INTO user_history (user_id, article_id, event) "
"VALUES (?, ?, 'seen')",
(user["id"], aid),
)
conn.commit()
return {"ok": True}
@app.delete("/api/history")
def clear_history(request: Request) -> dict:
with get_conn() as conn:
user = _require_user(conn, request)
conn.execute("DELETE FROM user_history WHERE user_id = ?", (user["id"],))
conn.commit()
return {"ok": True}
@app.delete("/api/history/{article_id}")
def remove_history(article_id: int, request: Request) -> dict:
with get_conn() as conn:
user = _require_user(conn, request)
conn.execute(
"DELETE FROM user_history WHERE user_id = ? AND article_id = ?",
(user["id"], article_id),
)
conn.commit()
return {"ok": True}
# --- Prefs sync (Calm Filters / Boundaries follow the account) --------
@app.get("/api/prefs")
def get_prefs(request: Request) -> dict:
with get_conn() as conn:
user = _require_user(conn, request)
row = conn.execute(
"SELECT prefs_json FROM user_prefs WHERE user_id = ?", (user["id"],)
).fetchone()
if not row:
return {"prefs": None} # no row yet → caller seeds from the device
try:
return {"prefs": json.loads(row["prefs_json"])}
except (ValueError, TypeError):
return {"prefs": None}
@app.put("/api/prefs")
def put_prefs(body: PrefsBody, request: Request) -> dict:
blob = json.dumps(body.prefs)[:20000]
with get_conn() as conn:
user = _require_user(conn, request)
conn.execute(
"INSERT INTO user_prefs (user_id, prefs_json, updated_at) "
"VALUES (?, ?, CURRENT_TIMESTAMP) "
"ON CONFLICT(user_id) DO UPDATE SET prefs_json = excluded.prefs_json, "
"updated_at = CURRENT_TIMESTAMP",
(user["id"], blob),
)
conn.commit()
return {"ok": True}
# --- Account: profile, sessions, export, delete -----------------------
@app.get("/api/account")
def account_info(request: Request) -> dict:
with get_conn() as conn:
user = _require_user(conn, request)
providers = [r["provider"] for r in conn.execute(
"SELECT provider FROM identities WHERE user_id = ?", (user["id"],)
)]
sessions = conn.execute(
"SELECT COUNT(*) FROM sessions WHERE user_id = ?", (user["id"],)
).fetchone()[0]
saved = conn.execute(
"SELECT COUNT(*) FROM saved_articles WHERE user_id = ?", (user["id"],)
).fetchone()[0]
return {
"user": {"id": user["id"], "email": user["email"], "display_name": user["display_name"]},
"providers": providers,
"sessions": sessions,
"saved_count": saved,
}
@app.post("/api/account/logout-all")
def logout_all(request: Request, response: Response) -> dict:
with get_conn() as conn:
user = _require_user(conn, request)
conn.execute("DELETE FROM sessions WHERE user_id = ?", (user["id"],))
conn.commit()
response.delete_cookie(SESSION_COOKIE, path="/")
return {"ok": True}
@app.get("/api/account/export")
def export_account(request: Request) -> Response:
with get_conn() as conn:
user = _require_user(conn, request)
uid = user["id"]
providers = [r["provider"] for r in conn.execute(
"SELECT provider FROM identities WHERE user_id = ?", (uid,)
)]
saved = queries.saved(conn, uid, limit=10000)
hist = queries.history(conn, uid, limit=10000)
prow = conn.execute(
"SELECT prefs_json FROM user_prefs WHERE user_id = ?", (uid,)
).fetchone()
slim = lambda a: {"id": a["id"], "title": a["title"], "url": a["canonical_url"]}
data = {
"account": {"id": uid, "email": user["email"],
"display_name": user["display_name"], "created_at": user["created_at"]},
"sign_in_methods": providers,
"saved": [slim(a) for a in saved],
"history": [slim(a) for a in hist],
"preferences": json.loads(prow["prefs_json"]) if prow else None,
}
return Response(
content=json.dumps(data, indent=2),
media_type="application/json",
headers={"Content-Disposition": "attachment; filename=upbeatbytes-data.json"},
)
@app.delete("/api/account")
def delete_account(request: Request, response: Response) -> dict:
with get_conn() as conn:
user = _require_user(conn, request)
conn.execute("DELETE FROM users WHERE id = ?", (user["id"],)) # cascades to all account data
conn.commit()
response.delete_cookie(SESSION_COOKIE, path="/")
return {"ok": True}
# --- Public share/landing page for an article -------------------------
@app.get("/a/{article_id}", response_class=HTMLResponse)
def share_page(article_id: str, background_tasks: BackgroundTasks) -> HTMLResponse:
not_found = HTMLResponse(share.render_not_found(PUBLIC_BASE_URL), status_code=404)
try:
aid = int(article_id)
except (TypeError, ValueError):
return not_found # malformed id → calm 404, no stack trace
with get_conn() as conn:
row = conn.execute(
"SELECT a.id, a.title, a.description, a.image_url, a.canonical_url, "
"a.duplicate_of, src.name AS source_name, s.reason_text, s.accepted, "
"(SELECT group_concat(t.tag) FROM article_tags t WHERE t.article_id = a.id) AS tags "
"FROM articles a JOIN sources src ON src.id = a.source_id "
"LEFT JOIN article_scores s ON s.article_id = a.id WHERE a.id = ?",
(aid,),
).fetchone()
# Only render real, accepted, non-duplicate stories.
if not row or row["duplicate_of"] is not None or not row["accepted"]:
return not_found
summary = summarize.get_summary(conn, aid)
if not summary:
_kick_summary(aid, background_tasks) # generate for next time; page polls
return HTMLResponse(share.render_share_page(dict(row), PUBLIC_BASE_URL, summary=summary))
# --- Privacy-respecting first-party analytics -------------------------
@app.post("/api/events")
def record_event(body: EventBody) -> dict:
if body.kind in _EVENT_KINDS:
with get_conn() as conn:
conn.execute(
"INSERT OR IGNORE INTO events (kind, article_id, visitor_hash, day) "
"VALUES (?, ?, ?, date('now'))",
(body.kind, body.article_id or 0, _visitor_hash(body.visitor)),
)
conn.commit()
return {"ok": True} # always identical; dedup'd by the unique key
@app.post("/api/feedback")
def submit_feedback(body: FeedbackBody, request: Request, background_tasks: BackgroundTasks) -> dict:
if body.hp: # honeypot tripped → accept silently, store nothing
return {"ok": True}
message = (body.message or "").strip()[:4000]
if not message:
raise HTTPException(status_code=422, detail="Please add a short message.")
category = body.category if body.category in _FEEDBACK_CATEGORIES else "other"
email = ((body.email or "").strip()[:200]) or None
vh = _visitor_hash(body.visitor)
with get_conn() as conn:
if vh: # light flood cap per anonymous token per day
recent = conn.execute(
"SELECT COUNT(*) FROM feedback WHERE visitor_hash = ? AND day = date('now')", (vh,)
).fetchone()[0]
if recent >= 8:
return {"ok": True}
user = _current_user(conn, request)
conn.execute(
"INSERT INTO feedback (category, message, contact_email, user_id, visitor_hash, day) "
"VALUES (?, ?, ?, ?, ?, date('now'))",
(category, message, email, user["id"] if user else None, vh),
)
conn.commit()
who = user["email"] if user else "anonymous visitor"
for addr in ADMIN_EMAILS:
background_tasks.add_task(_feedback_email_safe, addr, category, message, email, who)
return {"ok": True}
@app.get("/api/admin/feedback")
def admin_feedback(request: Request) -> list[dict]:
with get_conn() as conn:
_require_admin(conn, request)
rows = conn.execute(
"SELECT f.id, f.category, f.message, f.contact_email, f.created_at, "
"u.email AS user_email FROM feedback f LEFT JOIN users u ON u.id = f.user_id "
"ORDER BY f.created_at DESC LIMIT 100"
).fetchall()
return [dict(r) for r in rows]
@app.get("/api/admin/stats")
def admin_stats(request: Request) -> dict:
with get_conn() as conn:
_require_admin(conn, request)
return queries.admin_stats(conn)
@app.get("/api/summary/{article_id}")
def article_summary(article_id: int, background_tasks: BackgroundTasks) -> dict:
with get_conn() as conn:
summary = summarize.get_summary(conn, article_id)
if summary:
return {"status": "ready", "summary": summary}
_kick_summary(article_id, background_tasks)
return {"status": "pending", "summary": None}
@app.get("/today", response_class=HTMLResponse)
def today_digest() -> HTMLResponse:
with get_conn() as conn:
b = queries.brief(conn)
items = b.get("items") or []
if not items:
return HTMLResponse(share.render_not_found(PUBLIC_BASE_URL), status_code=404)
return HTMLResponse(share.render_digest(items, PUBLIC_BASE_URL, b.get("brief_date")))
@app.get("/sitemap.xml")
def sitemap() -> Response:
with get_conn() as conn:
rows = conn.execute(
"SELECT a.id, COALESCE(a.published_at, a.discovered_at) AS lm "
"FROM articles a JOIN article_scores s ON s.article_id = a.id "
"WHERE s.accepted = 1 AND a.duplicate_of IS NULL "
"ORDER BY lm DESC LIMIT 5000"
).fetchall()
base = PUBLIC_BASE_URL
urls = [
f"<url><loc>{base}/</loc><changefreq>hourly</changefreq><priority>1.0</priority></url>",
f"<url><loc>{base}/today</loc><changefreq>daily</changefreq><priority>0.9</priority></url>",
]
for r in rows:
lm = (r["lm"] or "")[:10]
lastmod = f"<lastmod>{lm}</lastmod>" if lm else ""
urls.append(f"<url><loc>{base}/a/{r['id']}</loc>{lastmod}</url>")
xml = (
'<?xml version="1.0" encoding="UTF-8"?>'
'<urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9">'
+ "".join(urls) + "</urlset>"
)
return Response(content=xml, media_type="application/xml")
@app.post("/api/import")
def import_local(body: ImportBody, request: Request) -> dict:
"""Fold this device's anonymous history/saved into the account (one-time)."""
with get_conn() as conn:
user = _require_user(conn, request)
for aid in queries.existing_article_ids(conn, body.seen):
conn.execute(
"INSERT OR IGNORE INTO user_history (user_id, article_id, event) "
"VALUES (?, ?, 'seen')",
(user["id"], aid),
)
for aid in queries.existing_article_ids(conn, body.saved):
conn.execute(
"INSERT OR IGNORE INTO saved_articles (user_id, article_id) VALUES (?, ?)",
(user["id"], aid),
)
conn.commit()
return {"ok": True}
@app.get("/api/categories", response_model=CategoriesResponse)
def categories() -> CategoriesResponse:
return CategoriesResponse(
topics=[Category(key=k, description=v) for k, v in TOPICS.items()],
flavors=[Category(key=k, description=v) for k, v in FLAVORS.items()],
)
@app.get("/api/moods")
def moods() -> list[dict]:
# The humane front door: each mood resolves to a filter preset the
# client merges with the user's own Calm Filters.
return MOODS
@app.get("/api/lanes")
def lanes() -> dict:
# The customizable quick-access rail: 'today' is always pinned, and the
# reader pins any subset of these moods / topics / Discovery tags. Live
# counts let the client gate empty lanes and show volume.
with get_conn() as conn:
tagc = queries.tag_counts(conn)
topicc: dict[str, int] = {}
for row in queries.category_counts(conn):
topicc[row["topic"]] = topicc.get(row["topic"], 0) + int(row["count"])
return build_lane_pool(topicc, tagc)
@app.get("/api/families")
def families() -> list[dict]:
# Grouping vocabulary organised into calm families for the Explore UI.
with get_conn() as conn:
counts = queries.tag_counts(conn)
return [
{
"name": name,
"description": d["description"],
"tags": [{"key": t, "count": counts.get(t, 0)} for t in d["tags"]],
}
for name, d in FAMILIES.items()
]
@app.get("/api/category-counts", response_model=list[CategoryCount])
def category_counts(accepted_only: bool = True, prefs: str | None = Query(None)) -> list[CategoryCount]:
fp = prefs_from_json(prefs)
with get_conn() as conn:
if fp.is_empty():
rows = queries.category_counts(conn, accepted_only=accepted_only)
else:
# Count over the SAME filtered set the feed would return, so the
# browse numbers always match what the user actually sees.
allrows = queries.feed(conn, accepted_only=accepted_only, limit=100000, offset=0)
kept = filter_articles(allrows, fp, datetime.now(timezone.utc))
counts = Counter((r["topic"], r["flavor"]) for r in kept)
rows = [
{"topic": t, "flavor": f, "count": n}
for (t, f), n in sorted(counts.items(), key=lambda kv: (str(kv[0][0]), str(kv[0][1])))
]
return [CategoryCount(**row) for row in rows]
@app.get("/api/feed", response_model=FeedResponse)
def feed(
topic: str | None = Query(None),
flavor: str | None = Query(None),
accepted_only: bool = True,
limit: int = Query(30, ge=1, le=100),
offset: int = Query(0, ge=0),
prefs: str | None = Query(None),
exclude: str = Query("", description="comma-separated article ids the reader has dismissed"),
tag: str | None = Query(None, description="grouping tag to browse"),
sort: str = Query("ranked", pattern="^(ranked|latest)$", description="ranked (best-first) or latest (newest-first)"),
) -> FeedResponse:
if topic and topic.lower() not in TOPICS:
raise HTTPException(400, f"unknown topic: {topic}")
if flavor and flavor.lower() not in FLAVORS:
raise HTTPException(400, f"unknown flavor: {flavor}")
fp = prefs_from_json(prefs)
now = datetime.now(timezone.utc)
excl = {int(x) for x in exclude.split(",") if x.strip().lstrip("-").isdigit()}
# Categorical filters (include/mute topics+flavors incl. active pauses,
# cortisol ceiling) go to SQL so nothing is truncated by ranking. Only
# word-boundary avoid-terms and dismissals need a Python pass.
kw = _prefs_sql_kw(fp, now)
with get_conn() as conn:
if fp.avoid_terms or excl:
# Over-fetch enough to cover what the Python pass might remove.
fetch_n = min(2000, (offset + limit) * 4 + 50 + len(excl))
raw = queries.feed(
conn, topic=topic, flavor=flavor, accepted_only=accepted_only,
limit=fetch_n, offset=0, tag=tag, sort=sort, **kw,
)
kept = [a for a in filter_articles(raw, fp, now) if a["id"] not in excl]
rows = kept[offset : offset + limit]
else:
rows = queries.feed(
conn, topic=topic, flavor=flavor, accepted_only=accepted_only,
limit=limit, offset=offset, tag=tag, sort=sort, **kw,
)
# Keep the top of a browse view readable: stable-sort paywalled items
# below readable ones (composite order preserved within each group).
rows = sorted(rows, key=lambda r: is_paywalled(r["canonical_url"]))
return FeedResponse(
topic=topic,
flavor=flavor,
count=len(rows),
items=[Article.from_row(r) for r in rows],
)
@app.get("/api/brief", response_model=BriefResponse)
def brief(
date: str | None = Query(None),
limit: int = Query(10, ge=1, le=50),
prefs: str | None = Query(None),
exclude: str = Query("", description="comma-separated article ids the reader has dismissed"),
) -> BriefResponse:
fp = prefs_from_json(prefs)
now = datetime.now(timezone.utc)
excl = {int(x) for x in exclude.split(",") if x.strip().lstrip("-").isdigit()}
with get_conn() as conn:
data = queries.brief(conn, brief_date=date, limit=limit)
# Drop dismissed (replaced-away) items and anything the reader's
# boundaries hide; avoid-terms take precedence over curation.
items = [a for a in data["items"] if a["id"] not in excl]
if not fp.is_empty():
items = filter_articles(items, fp, now)
# Keep the highlights full: if a boundary or a dismissal removed a
# story, top up with other readable, boundary-respecting good news
# rather than show fewer.
if len(items) < limit:
have = {a["id"] for a in items} | excl
pool = queries.feed(
conn, accepted_only=True, limit=limit * 5 + 40, offset=0, **_prefs_sql_kw(fp, now)
)
for a in filter_articles(pool, fp, now):
if len(items) >= limit:
break
if a["id"] not in have:
items.append(a)
have.add(a["id"])
# Lead with a gentle, readable story (charged or paywalled stories stay
# in the set, just not as the first thing seen).
items = _pick_lead(items)
return BriefResponse(
brief_date=data["brief_date"],
title=data["title"],
generated_at=data.get("created_at"),
items=[Article.from_row(r) for r in items],
)
@app.get("/api/brief-dates", response_model=list[str])
def brief_dates(limit: int = Query(30, ge=1, le=365)) -> list[str]:
with get_conn() as conn:
return queries.available_dates(conn, limit=limit)
@app.get("/api/replacement", response_model=Article | None)
def replacement(
exclude: str = Query("", description="comma-separated article ids already shown"),
prefs: str | None = Query(None),
avoid_paywall: bool = True,
gentle: bool = Query(False, description="also require lead-safe (for replacing the hero)"),
) -> Article | None:
# Swap a read or paywalled item for the next-best one the reader can
# actually open. The client merges any active mood into `prefs` (same as
# the feed), so this needs no mood param.
fp = prefs_from_json(prefs)
excl = {int(x) for x in exclude.split(",") if x.strip().lstrip("-").isdigit()}
now = datetime.now(timezone.utc)
kw = dict(
include_topics=fp.include_topics or None,
include_flavors=fp.include_flavors or None,
mute_topics=list(fp.muted_topics(now)) or None,
mute_flavors=list(fp.muted_flavors(now)) or None,
max_cortisol=fp.max_cortisol,
max_ragebait=fp.max_ragebait,
)
with get_conn() as conn:
rows = queries.feed(conn, accepted_only=True, limit=120, offset=0, **kw)
for r in filter_articles(rows, fp, now):
if r["id"] in excl:
continue
if avoid_paywall and is_paywalled(r["canonical_url"]):
continue
if gentle and not safe_to_lead(r):
continue
return Article.from_row(r)
return None
@app.get("/api/candidates", response_model=list[Candidate])
def candidates(status: str | None = Query(None)) -> list[Candidate]:
from .sources import list_candidates
with get_conn() as conn:
rows = list_candidates(conn, status=status)
out = []
for r in rows:
d = dict(r)
pj = d.pop("preview_json", None)
d["preview"] = json.loads(pj) if pj else None
out.append(Candidate(**d))
return out
@app.get("/api/source-preview", response_model=SourcePreview)
def source_preview(
url: str = Query(..., max_length=2048),
sample: int = Query(25, ge=1, le=50),
classify: bool = Query(False, description="Also classify with the local model (accurate but slower)"),
) -> SourcePreview:
# Read-only sample scoring; nothing is persisted. Only http(s) is allowed.
# NOTE: fetching a user-supplied URL is an SSRF surface — before exposing
# this publicly, also block private/loopback/link-local address ranges.
if not re.match(r"^https?://", url, re.IGNORECASE):
raise HTTPException(400, "url must start with http:// or https://")
client = LocalModelClient.from_env() if classify else None
try:
data = feeds.preview_feed(url, sample=sample, client=client)
except Exception as exc:
raise HTTPException(502, f"could not preview feed: {exc}")
return SourcePreview(**data)
# Static site last, mounted at root, so /api/* and /healthz win.
if STATIC_DIR.is_dir():
app.mount("/", StaticFiles(directory=str(STATIC_DIR), html=True), name="site")
return app
app = create_app()