Files
upbeatBytes/goodnews/cli.py
T
thejayman77 1c05554a28 Geo Stage 1-2: subject-geography model + classifier + pipeline wiring
"Closer to Home" foundation (audit greenlit by Codex). Durable geography, kept
decoupled from volatile scoring.

- Schema: article_geo (breadth/confidence/rationale/geo_version) + article_places
  (0..N ISO-coded places), separate from article_scores so re-runs/audits never
  disturb scoring or acceptance. "local" is never stored — it's relative to the
  reader; the UI computes "Near you" later.
- geo.py: LLM proposes place NAMES, code disposes to ISO codes (country alpha-2,
  US state 2-letter); region words like "Europe" can never become a country.
  'global'/placeless is first-class, not failure. Confidence calibrated so 'high'
  needs an explicit location. Geo is its OWN LLM pass, not merged into the scoring
  prompt (durable metadata, re-runnable, keeps the sensitive prompt untouched).
- store_geo replaces places (geo is re-derivable, unlike scores). tag_articles is
  idempotent by geo_version, only touches accepted non-duplicate articles.
- CLI `geo` command (cycle-locked, --limit/--reclassify) for backfill, plus a
  bounded geo step in the cycle (--geo-limit 60, --no-geo). scripts/geo_audit.py
  is the prototype audit tool.

360 tests green; live smoke tagged real articles correctly (Gaza->PS, London->GB,
placeless science->global). No UI / SEO pages yet — ranking/personalization only.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-19 16:56:49 -04:00

842 lines
36 KiB
Python

from __future__ import annotations
import argparse
import contextlib
import os
import sqlite3
from pathlib import Path
from .briefs import build_daily_brief, show_brief
from .db import connect, init_db
from .digest import send_due_digests
from .games import generate_daily_puzzles
from .localtime import local_today
from .dedup import DEFAULT_THRESHOLD, DEFAULT_WINDOW_DAYS, cluster_duplicates, dedup as run_dedup
from .geo import tag_articles as tag_geo
from .enrich import enrich_brief_images, enrich_recent_images, enrich_summarized_images
from .summarize import generate_summary, get_summary
from .feeds import (
fetch_feed,
parse_feed,
poll_all_sources,
poll_due_sources,
poll_source,
preview_feed,
)
from .llm import LocalModelClient, classify_articles
from .scoring import score_article
from .sources import (
list_candidates,
load_sources,
promote_candidate,
reject_candidate,
review_sources,
save_candidate,
upsert_sources,
)
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_DB = ROOT / "data" / "goodnews.sqlite3"
DEFAULT_SOURCES = ROOT / "config" / "sources.toml"
def _default_db() -> Path:
# Honor GOODNEWS_DB like the rest of the app (db.connect) does, so `GOODNEWS_DB=… `
# actually targets that DB instead of being silently ignored — otherwise a copy-DB
# maintenance run (e.g. dedup --force-recluster) can land on production by surprise.
return Path(os.environ.get("GOODNEWS_DB") or DEFAULT_DB)
def main() -> None:
parser = argparse.ArgumentParser(prog="goodnews")
parser.add_argument("--db", type=Path, default=_default_db(),
help="SQLite database path (defaults to $GOODNEWS_DB, else the bundled data/ DB)")
subparsers = parser.add_subparsers(dest="command", required=True)
subparsers.add_parser("init-db", help="Create or update the SQLite schema")
import_parser = subparsers.add_parser("import-sources", help="Load sources from TOML")
import_parser.add_argument("--sources", type=Path, default=DEFAULT_SOURCES)
poll_parser = subparsers.add_parser("poll", help="Poll active RSS/Atom sources")
poll_parser.add_argument("--source", help="Poll one source by exact name")
poll_parser.add_argument("--limit", type=int, help="Poll only the first N active sources")
list_parser = subparsers.add_parser("list-recent", help="Show recently discovered articles")
list_parser.add_argument("--limit", type=int, default=20)
list_parser.add_argument("--accepted-only", action="store_true")
source_parser = subparsers.add_parser("list-sources", help="Show configured sources")
source_parser.add_argument("--active-only", action="store_true")
cat_parser = subparsers.add_parser("list-category", help="Browse articles by topic and/or flavor")
cat_parser.add_argument("--topic", help="Filter by topic, e.g. science, environment, animals")
cat_parser.add_argument("--flavor", help="Filter by flavor, e.g. breakthrough, discovery, feelgood")
cat_parser.add_argument("--limit", type=int, default=20)
cat_parser.add_argument("--all", action="store_true", help="Include not-accepted articles")
subparsers.add_parser("source-report", help="Show source-level ingestion and scoring stats")
check_feeds_parser = subparsers.add_parser("check-feeds", help="Fetch and parse each feed, reporting health")
check_feeds_parser.add_argument("--all", action="store_true", help="Include inactive sources")
preview_parser = subparsers.add_parser("preview-source", help="Score a sample of a feed without adding it")
preview_parser.add_argument("url", help="Feed URL to preview")
preview_parser.add_argument("--sample", type=int, default=25)
preview_parser.add_argument("--classify", action="store_true", help="Also classify with the local model (slower)")
preview_parser.add_argument("--base-url", help="OpenAI-compatible base URL (with --classify)")
preview_parser.add_argument("--model", help="Local model name (with --classify)")
suggest_parser = subparsers.add_parser("suggest-source", help="Preview a feed and stage it as a candidate")
suggest_parser.add_argument("url", help="Feed URL to suggest")
suggest_parser.add_argument("--name", help="Display name for the source")
suggest_parser.add_argument("--homepage", help="Homepage URL")
suggest_parser.add_argument("--sample", type=int, default=25)
suggest_parser.add_argument("--classify", action="store_true", help="Classify the sample with the local model")
suggest_parser.add_argument("--base-url")
suggest_parser.add_argument("--model")
cand_parser = subparsers.add_parser("list-candidates", help="List staged source candidates")
cand_parser.add_argument("--status", help="Filter by status: suggested|quarantined|rejected|promoted")
promote_parser = subparsers.add_parser("promote-candidate", help="Copy a candidate into the real sources")
promote_parser.add_argument("id", type=int)
promote_parser.add_argument("--active", action="store_true", help="Activate immediately (default: inactive)")
promote_parser.add_argument("--category", help="default_category for the new source")
promote_parser.add_argument("--trust", type=int, default=5)
promote_parser.add_argument("--pr-risk", type=int, default=3)
reject_parser = subparsers.add_parser("reject-candidate", help="Mark a candidate as rejected")
reject_parser.add_argument("id", type=int)
review_parser = subparsers.add_parser(
"review-sources", help="Recompute advisory review flags (never deactivates anything)"
)
review_parser.add_argument("--stale-days", type=int, default=14)
runs_parser = subparsers.add_parser("list-runs", help="Show recent ingest runs")
runs_parser.add_argument("--limit", type=int, default=20)
subparsers.add_parser("rescore", help="Re-run heuristic scores for stored articles")
classify_parser = subparsers.add_parser("classify", help="Classify candidates with a local LLM")
classify_parser.add_argument("--limit", type=int, default=10)
classify_parser.add_argument("--include-rejected", action="store_true")
classify_parser.add_argument("--dry-run", action="store_true")
classify_parser.add_argument("--base-url", help="OpenAI-compatible base URL, e.g. http://127.0.0.1:1234/v1")
classify_parser.add_argument("--model", help="Local model name")
cycle_parser = subparsers.add_parser(
"cycle", help="Poll due sources, classify new articles, rebuild today's brief (for scheduling)"
)
cycle_parser.add_argument("--classify-limit", type=int, default=40)
cycle_parser.add_argument("--no-classify", action="store_true", help="Skip the LLM classify step")
cycle_parser.add_argument("--no-dedup", action="store_true", help="Skip the embedding dedup step")
cycle_parser.add_argument("--no-geo", action="store_true", help="Skip tagging article subject-geography")
cycle_parser.add_argument("--geo-limit", type=int, default=60, help="Max articles to geo-tag per cycle")
cycle_parser.add_argument("--no-brief", action="store_true", help="Skip rebuilding today's brief")
cycle_parser.add_argument("--no-review", action="store_true", help="Skip recomputing source review flags")
cycle_parser.add_argument("--no-digest", action="store_true", help="Skip sending due daily digests")
cycle_parser.add_argument("--force", action="store_true", help="Poll all active sources, ignoring intervals")
cycle_parser.add_argument("--base-url", help="OpenAI-compatible base URL for classify")
cycle_parser.add_argument("--model", help="Local model name for classify")
digest_parser = subparsers.add_parser("send-digests", help="Send today's digest to opted-in users (morning-gated)")
digest_parser.add_argument("--force", action="store_true", help="Ignore the morning send window")
enrich_images_parser = subparsers.add_parser(
"enrich-images", help="Backfill og:images for already-summarized articles that lack one"
)
enrich_images_parser.add_argument("--limit", type=int, default=50, help="Max articles to fetch this batch")
geo_parser = subparsers.add_parser("geo", help="Tag article subject-geography (backfill / manual). Cycle-locked.")
geo_parser.add_argument("--limit", type=int, default=200, help="Max articles to tag this batch")
geo_parser.add_argument("--reclassify", action="store_true", help="Re-tag even rows already at the current geo version")
geo_parser.add_argument("--base-url", help="OpenAI-compatible base URL")
geo_parser.add_argument("--model", help="Local model name")
dedup_parser = subparsers.add_parser("dedup", help="Cluster near-duplicate stories via local embeddings")
dedup_parser.add_argument("--threshold", type=float, default=DEFAULT_THRESHOLD, help="Cosine similarity cutoff")
dedup_parser.add_argument("--window-days", type=int, default=DEFAULT_WINDOW_DAYS)
dedup_parser.add_argument("--embed-limit", type=int, help="Cap how many missing embeddings to compute")
dedup_parser.add_argument("--base-url", help="OpenAI-compatible base URL")
dedup_parser.add_argument("--model", help="Chat model name (unused for embeddings)")
dedup_parser.add_argument("--force-recluster", action="store_true",
help="Re-cluster the EXISTING corpus even if no new embeddings "
"(re-applies representative policy; cycle-locked, no model needed)")
check_llm_parser = subparsers.add_parser("check-llm", help="Check local OpenAI-compatible model endpoint")
check_llm_parser.add_argument("--base-url", help="OpenAI-compatible base URL, e.g. http://127.0.0.1:1234/v1")
check_llm_parser.add_argument("--model", help="Expected local model name")
brief_parser = subparsers.add_parser("build-brief", help="Build/freeze a daily brief")
brief_parser.add_argument("--date", help="Brief date in YYYY-MM-DD format; defaults to today")
brief_parser.add_argument("--limit", type=int, default=7)
brief_parser.add_argument("--replace", action="store_true")
show_brief_parser = subparsers.add_parser("show-brief", help="Show a stored daily brief")
show_brief_parser.add_argument("--date", help="Brief date in YYYY-MM-DD format; defaults to latest brief")
show_brief_parser.add_argument("--limit", type=int, default=10)
serve_parser = subparsers.add_parser("serve", help="Run the web/API server (requires the 'web' extra)")
serve_parser.add_argument("--host", default="127.0.0.1", help="Bind host; use 0.0.0.0 to expose")
serve_parser.add_argument("--port", type=int, default=8000)
serve_parser.add_argument("--reload", action="store_true", help="Auto-reload on code changes (dev)")
args = parser.parse_args()
if args.command == "serve":
serve(args)
return
conn = connect(args.db)
if args.command == "init-db":
init_db(conn)
print(f"Initialized {args.db}")
elif args.command == "import-sources":
init_db(conn)
sources = load_sources(args.sources)
count = upsert_sources(conn, sources)
print(f"Imported {count} sources from {args.sources}")
elif args.command == "poll":
init_db(conn)
if args.source:
source = conn.execute("SELECT * FROM sources WHERE name = ?", (args.source,)).fetchone()
if not source:
raise SystemExit(f"No source named {args.source!r}")
result = poll_source(conn, source)
else:
result = poll_all_sources(conn, limit=args.limit)
print(_format_result(result))
elif args.command == "list-recent":
list_recent(conn, limit=args.limit, accepted_only=args.accepted_only)
elif args.command == "list-sources":
list_sources(conn, active_only=args.active_only)
elif args.command == "list-category":
list_category(conn, topic=args.topic, flavor=args.flavor, limit=args.limit, accepted_only=not args.all)
elif args.command == "source-report":
source_report(conn)
elif args.command == "check-feeds":
check_feeds(conn, include_inactive=args.all)
elif args.command == "preview-source":
client = llm_client_from_args(args) if args.classify else None
preview = preview_feed(args.url, sample=args.sample, client=client)
print_preview(preview)
elif args.command == "suggest-source":
init_db(conn)
client = llm_client_from_args(args) if args.classify else None
preview = preview_feed(args.url, sample=args.sample, client=client)
print_preview(preview)
cand = save_candidate(conn, args.url, preview=preview, name=args.name, homepage_url=args.homepage)
print(f"Saved as candidate #{cand['id']} (status {cand['status']}). Review with list-candidates.")
elif args.command == "list-candidates":
init_db(conn)
rows = list_candidates(conn, status=args.status)
if not rows:
print("No candidates.")
for r in rows:
line = f"[{r['id']}] {r['status']} | {r['name'] or '(unnamed)'} | {r['feed_url']}"
if r["preview_json"]:
import json as _json
p = _json.loads(r["preview_json"])
_rate = p.get("acceptance_rate")
_rate_str = f"{round(_rate * 100)}%" if _rate is not None else ""
line += f" (accept {_rate_str}, sampled {p.get('sampled', 0)})"
print(line)
elif args.command == "promote-candidate":
init_db(conn)
try:
source_id = promote_candidate(
conn, args.id, active=args.active, default_category=args.category,
trust_score=args.trust, pr_risk_score=args.pr_risk,
)
except ValueError as exc:
raise SystemExit(str(exc))
state = "active" if args.active else "inactive"
print(f"Promoted candidate #{args.id} -> source #{source_id} ({state}).")
elif args.command == "reject-candidate":
init_db(conn)
ok = reject_candidate(conn, args.id)
print(f"Rejected candidate #{args.id}." if ok else f"No candidate #{args.id}.")
elif args.command == "review-sources":
init_db(conn)
flagged = review_sources(conn, stale_days=args.stale_days)
if not flagged:
print("All active sources look healthy.")
else:
print(f"{len(flagged)} source(s) flagged for review (advisory — none deactivated):")
for f in flagged:
print(f" [{f['id']}] {f['name']}: {f['reason']}")
elif args.command == "list-runs":
list_runs(conn, limit=args.limit)
elif args.command == "rescore":
count = rescore_articles(conn)
print(f"Rescored {count} articles")
elif args.command == "classify":
init_db(conn)
client = llm_client_from_args(args)
report = classify_articles(
conn,
client,
limit=args.limit,
include_rejected=args.include_rejected,
dry_run=args.dry_run,
)
for article_id, scores in report.results:
accepted = "yes" if scores["accepted"] else "no"
print(
f"[{article_id}] accepted={accepted} {scores['topic']}/{scores['flavor']} "
f"reason={scores['reason_code']}"
)
print(f" {scores['reason_text']}")
print(
f"classify: attempted={report.attempted} succeeded={report.succeeded} "
f"skipped={report.skipped}"
)
if args.dry_run:
print("Dry run only; database was not updated.")
elif args.command == "cycle":
run_cycle(conn, args)
elif args.command == "send-digests":
init_db(conn)
sent = send_due_digests(conn, force=args.force)
print(f"send-digests: sent {sent}")
elif args.command == "enrich-images":
found = enrich_summarized_images(conn, limit=args.limit)
print(f"enrich-images: {found} new image(s) for summarized articles")
elif args.command == "geo":
init_db(conn)
# Cycle-locked so a manual backfill can't contend with the scheduled cycle.
with cycle_lock(args.db) as acquired:
if not acquired:
print("geo: a cycle is already running; try again after it finishes")
return
g = tag_geo(conn, llm_client_from_args(args), limit=args.limit, reclassify=args.reclassify)
print(f"geo: tagged={g['tagged']} errors={g['errors']} (of {g['candidates']} candidates)")
elif args.command == "dedup":
init_db(conn)
if args.force_recluster:
# Re-apply representative policy to the EXISTING corpus. The normal path
# fast-skips when no new embeddings exist, so it would NOT pick up a policy
# change. Cycle-locked so it can't overlap the scheduled timer; no model
# needed (pure re-cluster over stored embeddings).
with cycle_lock(args.db) as acquired:
if not acquired:
print("dedup: a cycle is already running; re-run --force-recluster after it finishes")
return
stats = cluster_duplicates(conn, threshold=args.threshold, window_days=args.window_days)
print(
f"dedup (forced recluster): articles={stats['articles']} "
f"clusters={stats['clusters']} duplicate_clusters={stats['duplicate_clusters']} "
f"duplicates_hidden={stats['duplicates']}"
)
else:
client = llm_client_from_args(args)
stats = run_dedup(
conn, client, threshold=args.threshold, window_days=args.window_days, embed_limit=args.embed_limit
)
print(
f"dedup: embedded={stats['embedded']} articles={stats['articles']} "
f"clusters={stats['clusters']} duplicate_clusters={stats['duplicate_clusters']} "
f"duplicates_hidden={stats['duplicates']}"
)
elif args.command == "check-llm":
client = llm_client_from_args(args)
try:
models = client.list_models()
except RuntimeError as exc:
raise SystemExit(str(exc))
print(f"Connected to {client.base_url}")
if models:
print("Models:")
for model in models:
marker = " *" if model == client.model else ""
print(f" {model}{marker}")
else:
print("Endpoint responded, but no models were listed.")
elif args.command == "build-brief":
init_db(conn)
brief_id = build_daily_brief(
conn,
brief_date=args.date,
limit=args.limit,
replace=args.replace,
)
print(f"Built brief {brief_id}")
bdate = args.date or local_today()
found = enrich_brief_images(conn, bdate)
if found:
print(f"Enriched {found} hero image(s)")
print_brief(show_brief(conn, brief_date=args.date, limit=args.limit))
elif args.command == "show-brief":
print_brief(show_brief(conn, brief_date=args.date, limit=args.limit))
def list_recent(conn: sqlite3.Connection, limit: int, accepted_only: bool) -> None:
where = "WHERE s.accepted = 1" if accepted_only else ""
rows = conn.execute(
f"""
SELECT
a.id,
a.published_at,
src.name AS source_name,
a.title,
a.canonical_url,
s.accepted,
s.constructive_score,
s.cortisol_score,
s.ragebait_score,
s.reason_code
FROM articles a
JOIN sources src ON src.id = a.source_id
LEFT JOIN article_scores s ON s.article_id = a.id
{where}
ORDER BY COALESCE(a.published_at, a.discovered_at) DESC
LIMIT ?
""",
(limit,),
).fetchall()
for row in rows:
accepted = "yes" if row["accepted"] else "no"
print(f"[{row['id']}] {row['published_at'] or 'no date'} | {row['source_name']} | accepted={accepted}")
print(f" {row['title']}")
print(
" scores: "
f"constructive={row['constructive_score']} "
f"cortisol={row['cortisol_score']} "
f"ragebait={row['ragebait_score']} "
f"reason={row['reason_code']}"
)
print(f" {row['canonical_url']}")
def print_preview(p: dict) -> None:
mode = "model" if p["classified"] else "heuristic"
print(f"Preview of {p['url']} ({mode})")
rate = p.get("acceptance_rate")
rate_str = f"{rate * 100:.0f}%" if rate is not None else "— (all held)"
print(f" sampled={p['sampled']} accepted={p['accepted']} ({rate_str})")
print(f" freshness: newest={p['newest_published'] or 'unknown'} in_last_7d={p['recent_7d']}")
print(f" averages: cortisol={p['avg_cortisol']} ragebait={p['avg_ragebait']} pr_risk={p['avg_pr_risk']}")
if p["topic_mix"]:
print(f" topics: {p['topic_mix']}")
print(f" flavors: {p['flavor_mix']}")
if p["examples_accepted"]:
print(" would accept:")
for t in p["examples_accepted"]:
print(f" + {t[:80]}")
if p["examples_rejected"]:
print(" would skip:")
for ex in p["examples_rejected"]:
print(f" - {ex['title'][:70]} ({ex['reason']})")
def check_feeds(conn: sqlite3.Connection, include_inactive: bool = False) -> None:
where = "" if include_inactive else "WHERE active = 1"
rows = conn.execute(f"SELECT name, feed_url FROM sources {where} ORDER BY name").fetchall()
ok = 0
for row in rows:
try:
items = parse_feed(fetch_feed(row["feed_url"]))
ok += 1
print(f"OK {row['name']}: {len(items)} items")
except Exception as exc:
print(f"FAIL {row['name']}: {exc}")
print(f"--- {ok}/{len(rows)} feeds healthy ---")
@contextlib.contextmanager
def cycle_lock(db_path):
"""Exclusive, non-blocking lock shared by the scheduled cycle and any manual job
that mutates the corpus (e.g. a forced dedup re-cluster), so they can never overlap
and contend on the database/model. Yields True if acquired, False if already held."""
import fcntl
lock_path = Path(db_path).parent / ".goodnews-cycle.lock"
lock_file = open(lock_path, "w")
try:
fcntl.flock(lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB)
except OSError:
lock_file.close()
yield False
return
try:
yield True
finally:
fcntl.flock(lock_file, fcntl.LOCK_UN)
lock_file.close()
def run_cycle(conn: sqlite3.Connection, args: argparse.Namespace) -> None:
"""One end-to-end pass for a scheduler: poll due sources, classify the new
arrivals, dedup, rebuild today's brief. Each step is independent and
non-fatal so a down model endpoint or empty day never aborts the cycle.
Holds an exclusive lock so a manual run and the systemd timer (or two timer
ticks) can never overlap and contend on the database and model.
"""
with cycle_lock(args.db) as acquired:
if not acquired:
print("cycle: another cycle is already running; skipping")
return
_run_cycle_locked(conn, args)
def _run_cycle_locked(conn: sqlite3.Connection, args: argparse.Namespace) -> None:
init_db(conn)
if args.force:
poll_result = poll_all_sources(conn)
else:
poll_result = poll_due_sources(conn)
print(f"poll: {_format_result(poll_result)}", flush=True)
if not args.no_classify:
client = llm_client_from_args(args)
def _progress(done: int, total: int, article_id: int) -> None:
print(f" classify {done}/{total} (article {article_id})", flush=True)
try:
report = classify_articles(
conn,
client,
limit=args.classify_limit,
include_rejected=True,
only_unclassified=True,
progress=_progress,
)
print(
f"classify: attempted={report.attempted} succeeded={report.succeeded} "
f"skipped={report.skipped} (model {client.model})",
flush=True,
)
except Exception as exc: # endpoint down, timeout, etc. — keep going
print(f"classify: skipped ({exc})", flush=True)
if not args.no_dedup:
try:
stats = run_dedup(conn, llm_client_from_args(args))
print(f"dedup: embedded={stats['embedded']} duplicates_hidden={stats['duplicates']}")
except Exception as exc:
print(f"dedup: skipped ({exc})")
# Geo: tag newly-accepted, non-duplicate articles with subject geography (its own
# LLM pass, decoupled from scoring). Bounded per cycle; idempotent (skips rows
# already at the current GEO_VERSION). Non-fatal like every other step.
if not args.no_geo:
try:
g = tag_geo(conn, llm_client_from_args(args), limit=args.geo_limit)
print(f"geo: tagged={g['tagged']} errors={g['errors']} (of {g['candidates']} untagged)")
except Exception as exc:
print(f"geo: skipped ({exc})")
if not args.no_brief:
today = local_today()
try:
brief_id = build_daily_brief(conn, brief_date=today, limit=7, replace=True)
found = enrich_brief_images(conn, today)
print(f"brief: rebuilt {today} (id {brief_id}); {found} hero image(s) enriched")
except Exception as exc:
print(f"brief: skipped ({exc})")
# Keep the Latest feed photo-rich: fetch quality og:images for the newest
# accepted articles that lack one (bounded per cycle).
try:
recent = enrich_recent_images(conn)
if recent:
print(f"recent images: {recent} enriched")
except Exception as exc:
print(f"recent images: skipped ({exc})")
# Pre-warm summaries for today's brief so Today reads as a calm briefing.
# Idempotent: cached items are skipped, so this only hits the LLM for new ones.
try:
client = llm_client_from_args(args)
ids = [r[0] for r in conn.execute(
"SELECT bi.article_id FROM daily_briefs b JOIN daily_brief_items bi "
"ON bi.brief_id = b.id WHERE b.brief_date = ?", (today,)
)]
made = 0
for aid in ids:
had = get_summary(conn, aid)
generate_summary(conn, aid, client=client)
if not had and get_summary(conn, aid):
made += 1
print(f"summaries: {made} new (of {len(ids)} brief items)")
except Exception as exc:
print(f"summaries: skipped ({exc})")
if not args.no_review:
try:
flagged = review_sources(conn)
print(f"review: {len(flagged)} source(s) flagged for review (advisory)")
except Exception as exc:
print(f"review: skipped ({exc})")
try:
from .queries import reindex_search
print(f"search: indexed {reindex_search(conn)} articles")
except Exception as exc: # noqa: BLE001 — search index is non-critical
print(f"search: skipped ({exc})")
if not args.no_digest:
try:
sent = send_due_digests(conn) # morning-gated + deduped internally
if sent:
print(f"digest: sent {sent}")
except Exception as exc:
print(f"digest: skipped ({exc})")
# Pre-generate today's daily puzzles (with the LLM 'why'); idempotent.
try:
made = generate_daily_puzzles(conn, local_today(), client=llm_client_from_args(args))
if made:
print(f"puzzles: generated {made}")
except Exception as exc:
print(f"puzzles: skipped ({exc})")
def serve(args: argparse.Namespace) -> None:
try:
import uvicorn
except ModuleNotFoundError:
raise SystemExit(
"The web server needs the optional 'web' extra. Install it with:\n"
" pip install -e '.[web]'"
)
# Make sure the API reads the same database the CLI was pointed at.
os.environ.setdefault("GOODNEWS_DB", str(args.db))
print(f"Serving goodNews on http://{args.host}:{args.port} (docs at /docs)")
uvicorn.run("goodnews.api:app", host=args.host, port=args.port, reload=args.reload)
def list_category(
conn: sqlite3.Connection,
topic: str | None,
flavor: str | None,
limit: int,
accepted_only: bool,
) -> None:
clauses = []
params: list = []
if accepted_only:
clauses.append("s.accepted = 1")
if topic:
clauses.append("s.topic = ?")
params.append(topic.lower())
if flavor:
clauses.append("s.flavor = ?")
params.append(flavor.lower())
where = ("WHERE " + " AND ".join(clauses)) if clauses else ""
params.append(limit)
rows = conn.execute(
f"""
SELECT
a.id, a.title, a.canonical_url, a.published_at,
src.name AS source_name,
s.topic, s.flavor, s.accepted,
s.constructive_score, s.cortisol_score, s.reason_code,
(s.constructive_score + s.agency_score + s.human_benefit_score + src.trust_score
- s.cortisol_score - s.ragebait_score - s.pr_risk_score) AS rank_score
FROM articles a
JOIN sources src ON src.id = a.source_id
JOIN article_scores s ON s.article_id = a.id
{where}
ORDER BY rank_score DESC, COALESCE(a.published_at, a.discovered_at) DESC
LIMIT ?
""",
params,
).fetchall()
label = " / ".join(filter(None, [topic, flavor])) or "all categories"
print(f"{label} ({len(rows)} shown)")
for row in rows:
accepted = "" if row["accepted"] else " [not accepted]"
print(f"[{row['id']}] {row['topic']}/{row['flavor']} | {row['source_name']}{accepted}")
print(f" {row['title']}")
print(f" score={row['rank_score']} reason={row['reason_code']}")
print(f" {row['canonical_url']}")
def llm_client_from_args(args: argparse.Namespace) -> LocalModelClient:
client = LocalModelClient.from_env()
if getattr(args, "base_url", None):
client.base_url = args.base_url.rstrip("/")
if getattr(args, "model", None):
client.model = args.model
return client
def list_sources(conn: sqlite3.Connection, active_only: bool) -> None:
where = "WHERE active = 1" if active_only else ""
rows = conn.execute(
f"""
SELECT id, name, active, default_category, trust_score, pr_risk_score, feed_url
FROM sources
{where}
ORDER BY name
"""
).fetchall()
for row in rows:
state = "active" if row["active"] else "inactive"
print(
f"[{row['id']}] {row['name']} ({state}, {row['default_category']}, "
f"trust={row['trust_score']}, pr={row['pr_risk_score']})"
)
print(f" {row['feed_url']}")
def source_report(conn: sqlite3.Connection) -> None:
rows = conn.execute(
"""
SELECT
src.name,
src.default_category,
src.trust_score,
src.pr_risk_score AS source_pr_risk,
src.review_flag,
src.review_reason,
src.consecutive_failures,
COUNT(a.id) AS articles,
SUM(CASE WHEN s.accepted = 1 THEN 1 ELSE 0 END) AS accepted,
ROUND(AVG(s.constructive_score), 1) AS avg_constructive,
ROUND(AVG(s.cortisol_score), 1) AS avg_cortisol,
ROUND(AVG(s.ragebait_score), 1) AS avg_ragebait,
MAX(a.published_at) AS newest_article
FROM sources src
LEFT JOIN articles a ON a.source_id = src.id
LEFT JOIN article_scores s ON s.article_id = a.id
GROUP BY src.id
ORDER BY accepted DESC, articles DESC, src.name
"""
).fetchall()
for row in rows:
articles = row["articles"] or 0
accepted = row["accepted"] or 0
rate = (accepted / articles * 100) if articles else 0
print(
f"{row['name']} | {row['default_category']} | "
f"articles={articles} accepted={accepted} ({rate:.1f}%)"
)
print(
f" trust={row['trust_score']} pr={row['source_pr_risk']} "
f"avg_constructive={row['avg_constructive']} "
f"avg_cortisol={row['avg_cortisol']} "
f"avg_ragebait={row['avg_ragebait']}"
)
print(f" newest={row['newest_article'] or 'none'}")
if row["review_flag"]:
print(f" ⚑ review: {row['review_reason']}")
def list_runs(conn: sqlite3.Connection, limit: int) -> None:
rows = conn.execute(
"""
SELECT r.id, r.started_at, r.finished_at, r.status, src.name AS source_name,
r.items_seen, r.items_inserted, r.items_duplicate, r.error
FROM ingest_runs r
LEFT JOIN sources src ON src.id = r.source_id
ORDER BY r.id DESC
LIMIT ?
""",
(limit,),
).fetchall()
for row in rows:
print(
f"[{row['id']}] {row['status']} | {row['source_name'] or 'unknown'} | "
f"seen={row['items_seen']} inserted={row['items_inserted']} duplicate={row['items_duplicate']}"
)
if row["error"]:
print(f" error: {row['error']}")
def rescore_articles(conn: sqlite3.Connection) -> int:
rows = conn.execute(
"""
SELECT a.id, a.title, a.description, src.pr_risk_score
FROM articles a
JOIN sources src ON src.id = a.source_id
ORDER BY a.id
"""
).fetchall()
for row in rows:
scores = score_article(row["title"], row["description"], int(row["pr_risk_score"]))
conn.execute(
"""
INSERT INTO article_scores (
article_id, constructive_score, cortisol_score, ragebait_score,
agency_score, human_benefit_score, novelty_score, pr_risk_score,
accepted, reason_code, reason_text, model_name, scored_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
ON CONFLICT(article_id) DO UPDATE SET
constructive_score = excluded.constructive_score,
cortisol_score = excluded.cortisol_score,
ragebait_score = excluded.ragebait_score,
agency_score = excluded.agency_score,
human_benefit_score = excluded.human_benefit_score,
novelty_score = excluded.novelty_score,
pr_risk_score = excluded.pr_risk_score,
accepted = excluded.accepted,
reason_code = excluded.reason_code,
reason_text = excluded.reason_text,
model_name = excluded.model_name,
scored_at = CURRENT_TIMESTAMP
""",
(
row["id"],
scores["constructive_score"],
scores["cortisol_score"],
scores["ragebait_score"],
scores["agency_score"],
scores["human_benefit_score"],
scores["novelty_score"],
scores["pr_risk_score"],
scores["accepted"],
scores["reason_code"],
scores["reason_text"],
scores["model_name"],
),
)
conn.commit()
return len(rows)
def print_brief(rows: list[sqlite3.Row]) -> None:
if not rows:
print("No brief items found.")
return
date = rows[0]["brief_date"]
print(f"Highlights from Today - {date}")
for row in rows:
print(f"{row['rank']}. {row['title']}")
print(f" {row['source_name']} | {row['default_category']} | {row['model_name']}")
print(f" reason: {row['reason_code']}")
print(f" {row['canonical_url']}")
def _format_result(result: dict) -> str:
if "sources" in result:
return (
f"Polled {result['sources']} sources: seen={result['seen']} "
f"inserted={result['inserted']} duplicate={result['duplicate']} failed={result['failed']}"
)
if result.get("status") == "failed":
return (
f"Poll failed: seen={result['seen']} inserted={result['inserted']} "
f"duplicate={result['duplicate']} error={result['error']}"
)
return (
f"Poll ok: seen={result['seen']} inserted={result['inserted']} "
f"duplicate={result['duplicate']}"
)
if __name__ == "__main__":
main()