Files
upbeatBytes/goodnews/cli.py
T
thejayman77 2f4bdf2d00 Add FastAPI web/API layer and static site
- queries.py: shared read-only query helpers (feed, brief, category counts)
  returning plain dicts, used by the API and available to the CLI.
- api.py: FastAPI service with Pydantic response models (the companion-app
  contract), CORS, and endpoints for categories, feed, brief, and health;
  mounts a static site at /.
- static/index.html: minimal dependency-free site rendering the daily five
  and topic/flavor category browsing.
- 'goodnews serve' command launches uvicorn (lazy import; core CLI stays
  pure-stdlib). Web deps live behind the optional [web] extra.
- Dockerfile + .dockerignore + build-system metadata so the service installs
  and deploys cleanly, with the DB mounted as a shared volume.
- README: web/API and deployment docs.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-30 13:51:07 +00:00

438 lines
17 KiB
Python

from __future__ import annotations
import argparse
import os
import sqlite3
from pathlib import Path
from .briefs import build_daily_brief, show_brief
from .db import connect, init_db
from .feeds import poll_all_sources, poll_source
from .llm import LocalModelClient, classify_articles
from .scoring import score_article
from .sources import load_sources, upsert_sources
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_DB = ROOT / "data" / "goodnews.sqlite3"
DEFAULT_SOURCES = ROOT / "config" / "sources.toml"
def main() -> None:
parser = argparse.ArgumentParser(prog="goodnews")
parser.add_argument("--db", type=Path, default=DEFAULT_DB, help="SQLite database path")
subparsers = parser.add_subparsers(dest="command", required=True)
subparsers.add_parser("init-db", help="Create or update the SQLite schema")
import_parser = subparsers.add_parser("import-sources", help="Load sources from TOML")
import_parser.add_argument("--sources", type=Path, default=DEFAULT_SOURCES)
poll_parser = subparsers.add_parser("poll", help="Poll active RSS/Atom sources")
poll_parser.add_argument("--source", help="Poll one source by exact name")
poll_parser.add_argument("--limit", type=int, help="Poll only the first N active sources")
list_parser = subparsers.add_parser("list-recent", help="Show recently discovered articles")
list_parser.add_argument("--limit", type=int, default=20)
list_parser.add_argument("--accepted-only", action="store_true")
source_parser = subparsers.add_parser("list-sources", help="Show configured sources")
source_parser.add_argument("--active-only", action="store_true")
cat_parser = subparsers.add_parser("list-category", help="Browse articles by topic and/or flavor")
cat_parser.add_argument("--topic", help="Filter by topic, e.g. science, environment, animals")
cat_parser.add_argument("--flavor", help="Filter by flavor, e.g. breakthrough, discovery, feelgood")
cat_parser.add_argument("--limit", type=int, default=20)
cat_parser.add_argument("--all", action="store_true", help="Include not-accepted articles")
subparsers.add_parser("source-report", help="Show source-level ingestion and scoring stats")
runs_parser = subparsers.add_parser("list-runs", help="Show recent ingest runs")
runs_parser.add_argument("--limit", type=int, default=20)
subparsers.add_parser("rescore", help="Re-run heuristic scores for stored articles")
classify_parser = subparsers.add_parser("classify", help="Classify candidates with a local LLM")
classify_parser.add_argument("--limit", type=int, default=10)
classify_parser.add_argument("--include-rejected", action="store_true")
classify_parser.add_argument("--dry-run", action="store_true")
classify_parser.add_argument("--base-url", help="OpenAI-compatible base URL, e.g. http://127.0.0.1:1234/v1")
classify_parser.add_argument("--model", help="Local model name")
check_llm_parser = subparsers.add_parser("check-llm", help="Check local OpenAI-compatible model endpoint")
check_llm_parser.add_argument("--base-url", help="OpenAI-compatible base URL, e.g. http://127.0.0.1:1234/v1")
check_llm_parser.add_argument("--model", help="Expected local model name")
brief_parser = subparsers.add_parser("build-brief", help="Build/freeze a daily brief")
brief_parser.add_argument("--date", help="Brief date in YYYY-MM-DD format; defaults to today")
brief_parser.add_argument("--limit", type=int, default=5)
brief_parser.add_argument("--replace", action="store_true")
show_brief_parser = subparsers.add_parser("show-brief", help="Show a stored daily brief")
show_brief_parser.add_argument("--date", help="Brief date in YYYY-MM-DD format; defaults to latest brief")
show_brief_parser.add_argument("--limit", type=int, default=10)
serve_parser = subparsers.add_parser("serve", help="Run the web/API server (requires the 'web' extra)")
serve_parser.add_argument("--host", default="127.0.0.1", help="Bind host; use 0.0.0.0 to expose")
serve_parser.add_argument("--port", type=int, default=8000)
serve_parser.add_argument("--reload", action="store_true", help="Auto-reload on code changes (dev)")
args = parser.parse_args()
if args.command == "serve":
serve(args)
return
conn = connect(args.db)
if args.command == "init-db":
init_db(conn)
print(f"Initialized {args.db}")
elif args.command == "import-sources":
init_db(conn)
sources = load_sources(args.sources)
count = upsert_sources(conn, sources)
print(f"Imported {count} sources from {args.sources}")
elif args.command == "poll":
init_db(conn)
if args.source:
source = conn.execute("SELECT * FROM sources WHERE name = ?", (args.source,)).fetchone()
if not source:
raise SystemExit(f"No source named {args.source!r}")
result = poll_source(conn, source)
else:
result = poll_all_sources(conn, limit=args.limit)
print(_format_result(result))
elif args.command == "list-recent":
list_recent(conn, limit=args.limit, accepted_only=args.accepted_only)
elif args.command == "list-sources":
list_sources(conn, active_only=args.active_only)
elif args.command == "list-category":
list_category(conn, topic=args.topic, flavor=args.flavor, limit=args.limit, accepted_only=not args.all)
elif args.command == "source-report":
source_report(conn)
elif args.command == "list-runs":
list_runs(conn, limit=args.limit)
elif args.command == "rescore":
count = rescore_articles(conn)
print(f"Rescored {count} articles")
elif args.command == "classify":
init_db(conn)
client = llm_client_from_args(args)
results = classify_articles(
conn,
client,
limit=args.limit,
include_rejected=args.include_rejected,
dry_run=args.dry_run,
)
for article_id, scores in results:
accepted = "yes" if scores["accepted"] else "no"
print(
f"[{article_id}] accepted={accepted} {scores['topic']}/{scores['flavor']} "
f"reason={scores['reason_code']}"
)
print(f" {scores['reason_text']}")
if args.dry_run:
print("Dry run only; database was not updated.")
elif args.command == "check-llm":
client = llm_client_from_args(args)
try:
models = client.list_models()
except RuntimeError as exc:
raise SystemExit(str(exc))
print(f"Connected to {client.base_url}")
if models:
print("Models:")
for model in models:
marker = " *" if model == client.model else ""
print(f" {model}{marker}")
else:
print("Endpoint responded, but no models were listed.")
elif args.command == "build-brief":
init_db(conn)
brief_id = build_daily_brief(
conn,
brief_date=args.date,
limit=args.limit,
replace=args.replace,
)
print(f"Built brief {brief_id}")
print_brief(show_brief(conn, brief_date=args.date, limit=args.limit))
elif args.command == "show-brief":
print_brief(show_brief(conn, brief_date=args.date, limit=args.limit))
def list_recent(conn: sqlite3.Connection, limit: int, accepted_only: bool) -> None:
where = "WHERE s.accepted = 1" if accepted_only else ""
rows = conn.execute(
f"""
SELECT
a.id,
a.published_at,
src.name AS source_name,
a.title,
a.canonical_url,
s.accepted,
s.constructive_score,
s.cortisol_score,
s.ragebait_score,
s.reason_code
FROM articles a
JOIN sources src ON src.id = a.source_id
LEFT JOIN article_scores s ON s.article_id = a.id
{where}
ORDER BY COALESCE(a.published_at, a.discovered_at) DESC
LIMIT ?
""",
(limit,),
).fetchall()
for row in rows:
accepted = "yes" if row["accepted"] else "no"
print(f"[{row['id']}] {row['published_at'] or 'no date'} | {row['source_name']} | accepted={accepted}")
print(f" {row['title']}")
print(
" scores: "
f"constructive={row['constructive_score']} "
f"cortisol={row['cortisol_score']} "
f"ragebait={row['ragebait_score']} "
f"reason={row['reason_code']}"
)
print(f" {row['canonical_url']}")
def serve(args: argparse.Namespace) -> None:
try:
import uvicorn
except ModuleNotFoundError:
raise SystemExit(
"The web server needs the optional 'web' extra. Install it with:\n"
" pip install -e '.[web]'"
)
# Make sure the API reads the same database the CLI was pointed at.
os.environ.setdefault("GOODNEWS_DB", str(args.db))
print(f"Serving goodNews on http://{args.host}:{args.port} (docs at /docs)")
uvicorn.run("goodnews.api:app", host=args.host, port=args.port, reload=args.reload)
def list_category(
conn: sqlite3.Connection,
topic: str | None,
flavor: str | None,
limit: int,
accepted_only: bool,
) -> None:
clauses = []
params: list = []
if accepted_only:
clauses.append("s.accepted = 1")
if topic:
clauses.append("s.topic = ?")
params.append(topic.lower())
if flavor:
clauses.append("s.flavor = ?")
params.append(flavor.lower())
where = ("WHERE " + " AND ".join(clauses)) if clauses else ""
params.append(limit)
rows = conn.execute(
f"""
SELECT
a.id, a.title, a.canonical_url, a.published_at,
src.name AS source_name,
s.topic, s.flavor, s.accepted,
s.constructive_score, s.cortisol_score, s.reason_code,
(s.constructive_score + s.agency_score + s.human_benefit_score + src.trust_score
- s.cortisol_score - s.ragebait_score - s.pr_risk_score) AS rank_score
FROM articles a
JOIN sources src ON src.id = a.source_id
JOIN article_scores s ON s.article_id = a.id
{where}
ORDER BY rank_score DESC, COALESCE(a.published_at, a.discovered_at) DESC
LIMIT ?
""",
params,
).fetchall()
label = " / ".join(filter(None, [topic, flavor])) or "all categories"
print(f"{label} ({len(rows)} shown)")
for row in rows:
accepted = "" if row["accepted"] else " [not accepted]"
print(f"[{row['id']}] {row['topic']}/{row['flavor']} | {row['source_name']}{accepted}")
print(f" {row['title']}")
print(f" score={row['rank_score']} reason={row['reason_code']}")
print(f" {row['canonical_url']}")
def llm_client_from_args(args: argparse.Namespace) -> LocalModelClient:
client = LocalModelClient.from_env()
if getattr(args, "base_url", None):
client.base_url = args.base_url.rstrip("/")
if getattr(args, "model", None):
client.model = args.model
return client
def list_sources(conn: sqlite3.Connection, active_only: bool) -> None:
where = "WHERE active = 1" if active_only else ""
rows = conn.execute(
f"""
SELECT id, name, active, default_category, trust_score, pr_risk_score, feed_url
FROM sources
{where}
ORDER BY name
"""
).fetchall()
for row in rows:
state = "active" if row["active"] else "inactive"
print(
f"[{row['id']}] {row['name']} ({state}, {row['default_category']}, "
f"trust={row['trust_score']}, pr={row['pr_risk_score']})"
)
print(f" {row['feed_url']}")
def source_report(conn: sqlite3.Connection) -> None:
rows = conn.execute(
"""
SELECT
src.name,
src.default_category,
src.trust_score,
src.pr_risk_score AS source_pr_risk,
COUNT(a.id) AS articles,
SUM(CASE WHEN s.accepted = 1 THEN 1 ELSE 0 END) AS accepted,
ROUND(AVG(s.constructive_score), 1) AS avg_constructive,
ROUND(AVG(s.cortisol_score), 1) AS avg_cortisol,
ROUND(AVG(s.ragebait_score), 1) AS avg_ragebait,
MAX(a.published_at) AS newest_article
FROM sources src
LEFT JOIN articles a ON a.source_id = src.id
LEFT JOIN article_scores s ON s.article_id = a.id
GROUP BY src.id
ORDER BY accepted DESC, articles DESC, src.name
"""
).fetchall()
for row in rows:
articles = row["articles"] or 0
accepted = row["accepted"] or 0
rate = (accepted / articles * 100) if articles else 0
print(
f"{row['name']} | {row['default_category']} | "
f"articles={articles} accepted={accepted} ({rate:.1f}%)"
)
print(
f" trust={row['trust_score']} pr={row['source_pr_risk']} "
f"avg_constructive={row['avg_constructive']} "
f"avg_cortisol={row['avg_cortisol']} "
f"avg_ragebait={row['avg_ragebait']}"
)
print(f" newest={row['newest_article'] or 'none'}")
def list_runs(conn: sqlite3.Connection, limit: int) -> None:
rows = conn.execute(
"""
SELECT r.id, r.started_at, r.finished_at, r.status, src.name AS source_name,
r.items_seen, r.items_inserted, r.items_duplicate, r.error
FROM ingest_runs r
LEFT JOIN sources src ON src.id = r.source_id
ORDER BY r.id DESC
LIMIT ?
""",
(limit,),
).fetchall()
for row in rows:
print(
f"[{row['id']}] {row['status']} | {row['source_name'] or 'unknown'} | "
f"seen={row['items_seen']} inserted={row['items_inserted']} duplicate={row['items_duplicate']}"
)
if row["error"]:
print(f" error: {row['error']}")
def rescore_articles(conn: sqlite3.Connection) -> int:
rows = conn.execute(
"""
SELECT a.id, a.title, a.description, src.pr_risk_score
FROM articles a
JOIN sources src ON src.id = a.source_id
ORDER BY a.id
"""
).fetchall()
for row in rows:
scores = score_article(row["title"], row["description"], int(row["pr_risk_score"]))
conn.execute(
"""
INSERT INTO article_scores (
article_id, constructive_score, cortisol_score, ragebait_score,
agency_score, human_benefit_score, novelty_score, pr_risk_score,
accepted, reason_code, reason_text, model_name, scored_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
ON CONFLICT(article_id) DO UPDATE SET
constructive_score = excluded.constructive_score,
cortisol_score = excluded.cortisol_score,
ragebait_score = excluded.ragebait_score,
agency_score = excluded.agency_score,
human_benefit_score = excluded.human_benefit_score,
novelty_score = excluded.novelty_score,
pr_risk_score = excluded.pr_risk_score,
accepted = excluded.accepted,
reason_code = excluded.reason_code,
reason_text = excluded.reason_text,
model_name = excluded.model_name,
scored_at = CURRENT_TIMESTAMP
""",
(
row["id"],
scores["constructive_score"],
scores["cortisol_score"],
scores["ragebait_score"],
scores["agency_score"],
scores["human_benefit_score"],
scores["novelty_score"],
scores["pr_risk_score"],
scores["accepted"],
scores["reason_code"],
scores["reason_text"],
scores["model_name"],
),
)
conn.commit()
return len(rows)
def print_brief(rows: list[sqlite3.Row]) -> None:
if not rows:
print("No brief items found.")
return
date = rows[0]["brief_date"]
print(f"Five Good Things Today - {date}")
for row in rows:
print(f"{row['rank']}. {row['title']}")
print(f" {row['source_name']} | {row['default_category']} | {row['model_name']}")
print(f" reason: {row['reason_code']}")
print(f" {row['canonical_url']}")
def _format_result(result: dict) -> str:
if "sources" in result:
return (
f"Polled {result['sources']} sources: seen={result['seen']} "
f"inserted={result['inserted']} duplicate={result['duplicate']} failed={result['failed']}"
)
if result.get("status") == "failed":
return (
f"Poll failed: seen={result['seen']} inserted={result['inserted']} "
f"duplicate={result['duplicate']} error={result['error']}"
)
return (
f"Poll ok: seen={result['seen']} inserted={result['inserted']} "
f"duplicate={result['duplicate']}"
)
if __name__ == "__main__":
main()