from __future__ import annotations import argparse import contextlib import os import sqlite3 from pathlib import Path from .briefs import build_daily_brief, show_brief from .db import connect, init_db from .digest import send_due_digests from .games import generate_daily_puzzles from .localtime import local_today from .dedup import DEFAULT_THRESHOLD, DEFAULT_WINDOW_DAYS, cluster_duplicates, dedup as run_dedup from .geo import tag_articles as tag_geo from . import art, newsimg, onthisday, quote, wotd from .enrich import enrich_brief_images, enrich_read_times, enrich_recent_images, enrich_summarized_images from .summarize import generate_summary, get_summary from .feeds import ( fetch_feed, parse_feed, poll_all_sources, poll_due_sources, poll_source, preview_feed, ) from .llm import LocalModelClient, classify_articles from .scoring import score_article from .sources import ( list_candidates, load_sources, promote_candidate, reject_candidate, review_sources, save_candidate, upsert_sources, ) ROOT = Path(__file__).resolve().parents[1] DEFAULT_DB = ROOT / "data" / "goodnews.sqlite3" DEFAULT_SOURCES = ROOT / "config" / "sources.toml" def _default_db() -> Path: # Honor GOODNEWS_DB like the rest of the app (db.connect) does, so `GOODNEWS_DB=… ` # actually targets that DB instead of being silently ignored — otherwise a copy-DB # maintenance run (e.g. dedup --force-recluster) can land on production by surprise. return Path(os.environ.get("GOODNEWS_DB") or DEFAULT_DB) def main() -> None: parser = argparse.ArgumentParser(prog="goodnews") parser.add_argument("--db", type=Path, default=_default_db(), help="SQLite database path (defaults to $GOODNEWS_DB, else the bundled data/ DB)") subparsers = parser.add_subparsers(dest="command", required=True) subparsers.add_parser("init-db", help="Create or update the SQLite schema") import_parser = subparsers.add_parser("import-sources", help="Load sources from TOML") import_parser.add_argument("--sources", type=Path, default=DEFAULT_SOURCES) poll_parser = subparsers.add_parser("poll", help="Poll active RSS/Atom sources") poll_parser.add_argument("--source", help="Poll one source by exact name") poll_parser.add_argument("--limit", type=int, help="Poll only the first N active sources") list_parser = subparsers.add_parser("list-recent", help="Show recently discovered articles") list_parser.add_argument("--limit", type=int, default=20) list_parser.add_argument("--accepted-only", action="store_true") source_parser = subparsers.add_parser("list-sources", help="Show configured sources") source_parser.add_argument("--active-only", action="store_true") cat_parser = subparsers.add_parser("list-category", help="Browse articles by topic and/or flavor") cat_parser.add_argument("--topic", help="Filter by topic, e.g. science, environment, animals") cat_parser.add_argument("--flavor", help="Filter by flavor, e.g. breakthrough, discovery, feelgood") cat_parser.add_argument("--limit", type=int, default=20) cat_parser.add_argument("--all", action="store_true", help="Include not-accepted articles") subparsers.add_parser("source-report", help="Show source-level ingestion and scoring stats") check_feeds_parser = subparsers.add_parser("check-feeds", help="Fetch and parse each feed, reporting health") check_feeds_parser.add_argument("--all", action="store_true", help="Include inactive sources") preview_parser = subparsers.add_parser("preview-source", help="Score a sample of a feed without adding it") preview_parser.add_argument("url", help="Feed URL to preview") preview_parser.add_argument("--sample", type=int, default=25) preview_parser.add_argument("--classify", action="store_true", help="Also classify with the local model (slower)") preview_parser.add_argument("--base-url", help="OpenAI-compatible base URL (with --classify)") preview_parser.add_argument("--model", help="Local model name (with --classify)") suggest_parser = subparsers.add_parser("suggest-source", help="Preview a feed and stage it as a candidate") suggest_parser.add_argument("url", help="Feed URL to suggest") suggest_parser.add_argument("--name", help="Display name for the source") suggest_parser.add_argument("--homepage", help="Homepage URL") suggest_parser.add_argument("--sample", type=int, default=25) suggest_parser.add_argument("--classify", action="store_true", help="Classify the sample with the local model") suggest_parser.add_argument("--base-url") suggest_parser.add_argument("--model") cand_parser = subparsers.add_parser("list-candidates", help="List staged source candidates") cand_parser.add_argument("--status", help="Filter by status: suggested|quarantined|rejected|promoted") promote_parser = subparsers.add_parser("promote-candidate", help="Copy a candidate into the real sources") promote_parser.add_argument("id", type=int) promote_parser.add_argument("--active", action="store_true", help="Activate immediately (default: inactive)") promote_parser.add_argument("--category", help="default_category for the new source") promote_parser.add_argument("--trust", type=int, default=5) promote_parser.add_argument("--pr-risk", type=int, default=3) reject_parser = subparsers.add_parser("reject-candidate", help="Mark a candidate as rejected") reject_parser.add_argument("id", type=int) review_parser = subparsers.add_parser( "review-sources", help="Recompute advisory review flags (never deactivates anything)" ) review_parser.add_argument("--stale-days", type=int, default=14) runs_parser = subparsers.add_parser("list-runs", help="Show recent ingest runs") runs_parser.add_argument("--limit", type=int, default=20) subparsers.add_parser("rescore", help="Re-run heuristic scores for stored articles") classify_parser = subparsers.add_parser("classify", help="Classify candidates with a local LLM") classify_parser.add_argument("--limit", type=int, default=10) classify_parser.add_argument("--include-rejected", action="store_true") classify_parser.add_argument("--dry-run", action="store_true") classify_parser.add_argument("--base-url", help="OpenAI-compatible base URL, e.g. http://127.0.0.1:1234/v1") classify_parser.add_argument("--model", help="Local model name") cycle_parser = subparsers.add_parser( "cycle", help="Poll due sources, classify new articles, rebuild today's brief (for scheduling)" ) cycle_parser.add_argument("--classify-limit", type=int, default=40) cycle_parser.add_argument("--no-classify", action="store_true", help="Skip the LLM classify step") cycle_parser.add_argument("--no-dedup", action="store_true", help="Skip the embedding dedup step") cycle_parser.add_argument("--no-geo", action="store_true", help="Skip tagging article subject-geography") cycle_parser.add_argument("--geo-limit", type=int, default=60, help="Max articles to geo-tag per cycle") cycle_parser.add_argument("--no-art", action="store_true", help="Skip the Daily Art pick") cycle_parser.add_argument("--no-joys", action="store_true", help="Skip the small-joys picks (On This Day, etc.)") cycle_parser.add_argument("--no-brief", action="store_true", help="Skip rebuilding today's brief") cycle_parser.add_argument("--no-review", action="store_true", help="Skip recomputing source review flags") cycle_parser.add_argument("--no-digest", action="store_true", help="Skip sending due daily digests") cycle_parser.add_argument("--force", action="store_true", help="Poll all active sources, ignoring intervals") cycle_parser.add_argument("--base-url", help="OpenAI-compatible base URL for classify") cycle_parser.add_argument("--model", help="Local model name for classify") digest_parser = subparsers.add_parser("send-digests", help="Send today's digest to opted-in users (morning-gated)") digest_parser.add_argument("--force", action="store_true", help="Ignore the morning send window") enrich_images_parser = subparsers.add_parser( "enrich-images", help="Backfill og:images for already-summarized articles that lack one" ) enrich_images_parser.add_argument("--limit", type=int, default=50, help="Max articles to fetch this batch") art_parser = subparsers.add_parser("art", help="Daily Art: harvest the pool and/or pick today's cached piece") art_parser.add_argument("--harvest", action="store_true", help="(Re)harvest the curated museum pool") art_parser.add_argument("--force", action="store_true", help="Re-pick today's art even if already chosen") geo_parser = subparsers.add_parser("geo", help="Tag article subject-geography (backfill / manual). Cycle-locked.") geo_parser.add_argument("--limit", type=int, default=200, help="Max articles to tag this batch") geo_parser.add_argument("--reclassify", action="store_true", help="Re-tag even rows already at the current geo version") geo_parser.add_argument("--base-url", help="OpenAI-compatible base URL") geo_parser.add_argument("--model", help="Local model name") dedup_parser = subparsers.add_parser("dedup", help="Cluster near-duplicate stories via local embeddings") dedup_parser.add_argument("--threshold", type=float, default=DEFAULT_THRESHOLD, help="Cosine similarity cutoff") dedup_parser.add_argument("--window-days", type=int, default=DEFAULT_WINDOW_DAYS) dedup_parser.add_argument("--embed-limit", type=int, help="Cap how many missing embeddings to compute") dedup_parser.add_argument("--base-url", help="OpenAI-compatible base URL") dedup_parser.add_argument("--model", help="Chat model name (unused for embeddings)") dedup_parser.add_argument("--force-recluster", action="store_true", help="Re-cluster the EXISTING corpus even if no new embeddings " "(re-applies representative policy; cycle-locked, no model needed)") check_llm_parser = subparsers.add_parser("check-llm", help="Check local OpenAI-compatible model endpoint") check_llm_parser.add_argument("--base-url", help="OpenAI-compatible base URL, e.g. http://127.0.0.1:1234/v1") check_llm_parser.add_argument("--model", help="Expected local model name") brief_parser = subparsers.add_parser("build-brief", help="Build/freeze a daily brief") brief_parser.add_argument("--date", help="Brief date in YYYY-MM-DD format; defaults to today") brief_parser.add_argument("--limit", type=int, default=7) brief_parser.add_argument("--replace", action="store_true") show_brief_parser = subparsers.add_parser("show-brief", help="Show a stored daily brief") show_brief_parser.add_argument("--date", help="Brief date in YYYY-MM-DD format; defaults to latest brief") show_brief_parser.add_argument("--limit", type=int, default=10) serve_parser = subparsers.add_parser("serve", help="Run the web/API server (requires the 'web' extra)") serve_parser.add_argument("--host", default="127.0.0.1", help="Bind host; use 0.0.0.0 to expose") serve_parser.add_argument("--port", type=int, default=8000) serve_parser.add_argument("--reload", action="store_true", help="Auto-reload on code changes (dev)") args = parser.parse_args() if args.command == "serve": serve(args) return conn = connect(args.db) if args.command == "init-db": init_db(conn) print(f"Initialized {args.db}") elif args.command == "import-sources": init_db(conn) sources = load_sources(args.sources) count = upsert_sources(conn, sources) print(f"Imported {count} sources from {args.sources}") elif args.command == "poll": init_db(conn) if args.source: source = conn.execute("SELECT * FROM sources WHERE name = ?", (args.source,)).fetchone() if not source: raise SystemExit(f"No source named {args.source!r}") result = poll_source(conn, source) else: result = poll_all_sources(conn, limit=args.limit) print(_format_result(result)) elif args.command == "list-recent": list_recent(conn, limit=args.limit, accepted_only=args.accepted_only) elif args.command == "list-sources": list_sources(conn, active_only=args.active_only) elif args.command == "list-category": list_category(conn, topic=args.topic, flavor=args.flavor, limit=args.limit, accepted_only=not args.all) elif args.command == "source-report": source_report(conn) elif args.command == "check-feeds": check_feeds(conn, include_inactive=args.all) elif args.command == "preview-source": client = llm_client_from_args(args) if args.classify else None preview = preview_feed(args.url, sample=args.sample, client=client) print_preview(preview) elif args.command == "suggest-source": init_db(conn) client = llm_client_from_args(args) if args.classify else None preview = preview_feed(args.url, sample=args.sample, client=client) print_preview(preview) cand = save_candidate(conn, args.url, preview=preview, name=args.name, homepage_url=args.homepage) print(f"Saved as candidate #{cand['id']} (status {cand['status']}). Review with list-candidates.") elif args.command == "list-candidates": init_db(conn) rows = list_candidates(conn, status=args.status) if not rows: print("No candidates.") for r in rows: line = f"[{r['id']}] {r['status']} | {r['name'] or '(unnamed)'} | {r['feed_url']}" if r["preview_json"]: import json as _json p = _json.loads(r["preview_json"]) _rate = p.get("acceptance_rate") _rate_str = f"{round(_rate * 100)}%" if _rate is not None else "—" line += f" (accept {_rate_str}, sampled {p.get('sampled', 0)})" print(line) elif args.command == "promote-candidate": init_db(conn) try: source_id = promote_candidate( conn, args.id, active=args.active, default_category=args.category, trust_score=args.trust, pr_risk_score=args.pr_risk, ) except ValueError as exc: raise SystemExit(str(exc)) state = "active" if args.active else "inactive" print(f"Promoted candidate #{args.id} -> source #{source_id} ({state}).") elif args.command == "reject-candidate": init_db(conn) ok = reject_candidate(conn, args.id) print(f"Rejected candidate #{args.id}." if ok else f"No candidate #{args.id}.") elif args.command == "review-sources": init_db(conn) flagged = review_sources(conn, stale_days=args.stale_days) if not flagged: print("All active sources look healthy.") else: print(f"{len(flagged)} source(s) flagged for review (advisory — none deactivated):") for f in flagged: print(f" [{f['id']}] {f['name']}: {f['reason']}") elif args.command == "list-runs": list_runs(conn, limit=args.limit) elif args.command == "rescore": count = rescore_articles(conn) print(f"Rescored {count} articles") elif args.command == "classify": init_db(conn) client = llm_client_from_args(args) report = classify_articles( conn, client, limit=args.limit, include_rejected=args.include_rejected, dry_run=args.dry_run, ) for article_id, scores in report.results: accepted = "yes" if scores["accepted"] else "no" print( f"[{article_id}] accepted={accepted} {scores['topic']}/{scores['flavor']} " f"reason={scores['reason_code']}" ) print(f" {scores['reason_text']}") print( f"classify: attempted={report.attempted} succeeded={report.succeeded} " f"skipped={report.skipped}" ) if args.dry_run: print("Dry run only; database was not updated.") elif args.command == "cycle": run_cycle(conn, args) elif args.command == "send-digests": init_db(conn) sent = send_due_digests(conn, force=args.force) print(f"send-digests: sent {sent}") elif args.command == "enrich-images": found = enrich_summarized_images(conn, limit=args.limit) print(f"enrich-images: {found} new image(s) for summarized articles") elif args.command == "art": init_db(conn) if args.harvest: h = art.harvest_pool(conn) print(f"art harvest: found={h['found']} added={h['added']} pool={h['pool']} errors={h['errors']}") picked = art.pick_daily(conn, force=args.force, client=LocalModelClient.from_env()) if picked: print(f"art pick: {picked['art_date']} -> #{picked['object_id']} " f"\"{picked['title']}\" — {picked['artist'] or 'Unknown'}") else: print("art pick: nothing fetched (kept the last piece)") elif args.command == "geo": init_db(conn) # Cycle-locked so a manual backfill can't contend with the scheduled cycle. with cycle_lock(args.db) as acquired: if not acquired: print("geo: a cycle is already running; try again after it finishes") return g = tag_geo(conn, llm_client_from_args(args), limit=args.limit, reclassify=args.reclassify) print(f"geo: tagged={g['tagged']} errors={g['errors']} (of {g['candidates']} candidates)") elif args.command == "dedup": init_db(conn) if args.force_recluster: # Re-apply representative policy to the EXISTING corpus. The normal path # fast-skips when no new embeddings exist, so it would NOT pick up a policy # change. Cycle-locked so it can't overlap the scheduled timer; no model # needed (pure re-cluster over stored embeddings). with cycle_lock(args.db) as acquired: if not acquired: print("dedup: a cycle is already running; re-run --force-recluster after it finishes") return stats = cluster_duplicates(conn, threshold=args.threshold, window_days=args.window_days) print( f"dedup (forced recluster): articles={stats['articles']} " f"clusters={stats['clusters']} duplicate_clusters={stats['duplicate_clusters']} " f"duplicates_hidden={stats['duplicates']}" ) else: client = llm_client_from_args(args) stats = run_dedup( conn, client, threshold=args.threshold, window_days=args.window_days, embed_limit=args.embed_limit ) print( f"dedup: embedded={stats['embedded']} articles={stats['articles']} " f"clusters={stats['clusters']} duplicate_clusters={stats['duplicate_clusters']} " f"duplicates_hidden={stats['duplicates']}" ) elif args.command == "check-llm": client = llm_client_from_args(args) try: models = client.list_models() except RuntimeError as exc: raise SystemExit(str(exc)) print(f"Connected to {client.base_url}") if models: print("Models:") for model in models: marker = " *" if model == client.model else "" print(f" {model}{marker}") else: print("Endpoint responded, but no models were listed.") elif args.command == "build-brief": init_db(conn) brief_id = build_daily_brief( conn, brief_date=args.date, limit=args.limit, replace=args.replace, ) print(f"Built brief {brief_id}") bdate = args.date or local_today() found = enrich_brief_images(conn, bdate) if found: print(f"Enriched {found} hero image(s)") print_brief(show_brief(conn, brief_date=args.date, limit=args.limit)) elif args.command == "show-brief": print_brief(show_brief(conn, brief_date=args.date, limit=args.limit)) def list_recent(conn: sqlite3.Connection, limit: int, accepted_only: bool) -> None: where = "WHERE s.accepted = 1" if accepted_only else "" rows = conn.execute( f""" SELECT a.id, a.published_at, src.name AS source_name, a.title, a.canonical_url, s.accepted, s.constructive_score, s.cortisol_score, s.ragebait_score, s.reason_code FROM articles a JOIN sources src ON src.id = a.source_id LEFT JOIN article_scores s ON s.article_id = a.id {where} ORDER BY COALESCE(a.published_at, a.discovered_at) DESC LIMIT ? """, (limit,), ).fetchall() for row in rows: accepted = "yes" if row["accepted"] else "no" print(f"[{row['id']}] {row['published_at'] or 'no date'} | {row['source_name']} | accepted={accepted}") print(f" {row['title']}") print( " scores: " f"constructive={row['constructive_score']} " f"cortisol={row['cortisol_score']} " f"ragebait={row['ragebait_score']} " f"reason={row['reason_code']}" ) print(f" {row['canonical_url']}") def print_preview(p: dict) -> None: mode = "model" if p["classified"] else "heuristic" print(f"Preview of {p['url']} ({mode})") rate = p.get("acceptance_rate") rate_str = f"{rate * 100:.0f}%" if rate is not None else "— (all held)" print(f" sampled={p['sampled']} accepted={p['accepted']} ({rate_str})") print(f" freshness: newest={p['newest_published'] or 'unknown'} in_last_7d={p['recent_7d']}") print(f" averages: cortisol={p['avg_cortisol']} ragebait={p['avg_ragebait']} pr_risk={p['avg_pr_risk']}") if p["topic_mix"]: print(f" topics: {p['topic_mix']}") print(f" flavors: {p['flavor_mix']}") if p["examples_accepted"]: print(" would accept:") for t in p["examples_accepted"]: print(f" + {t[:80]}") if p["examples_rejected"]: print(" would skip:") for ex in p["examples_rejected"]: print(f" - {ex['title'][:70]} ({ex['reason']})") def check_feeds(conn: sqlite3.Connection, include_inactive: bool = False) -> None: where = "" if include_inactive else "WHERE active = 1" rows = conn.execute(f"SELECT name, feed_url FROM sources {where} ORDER BY name").fetchall() ok = 0 for row in rows: try: items = parse_feed(fetch_feed(row["feed_url"])) ok += 1 print(f"OK {row['name']}: {len(items)} items") except Exception as exc: print(f"FAIL {row['name']}: {exc}") print(f"--- {ok}/{len(rows)} feeds healthy ---") @contextlib.contextmanager def cycle_lock(db_path): """Exclusive, non-blocking lock shared by the scheduled cycle and any manual job that mutates the corpus (e.g. a forced dedup re-cluster), so they can never overlap and contend on the database/model. Yields True if acquired, False if already held.""" import fcntl lock_path = Path(db_path).parent / ".goodnews-cycle.lock" lock_file = open(lock_path, "w") try: fcntl.flock(lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB) except OSError: lock_file.close() yield False return try: yield True finally: fcntl.flock(lock_file, fcntl.LOCK_UN) lock_file.close() def run_cycle(conn: sqlite3.Connection, args: argparse.Namespace) -> None: """One end-to-end pass for a scheduler: poll due sources, classify the new arrivals, dedup, rebuild today's brief. Each step is independent and non-fatal so a down model endpoint or empty day never aborts the cycle. Holds an exclusive lock so a manual run and the systemd timer (or two timer ticks) can never overlap and contend on the database and model. """ with cycle_lock(args.db) as acquired: if not acquired: print("cycle: another cycle is already running; skipping") return _run_cycle_locked(conn, args) def _run_cycle_locked(conn: sqlite3.Connection, args: argparse.Namespace) -> None: init_db(conn) if args.force: poll_result = poll_all_sources(conn) else: poll_result = poll_due_sources(conn) print(f"poll: {_format_result(poll_result)}", flush=True) if not args.no_classify: client = llm_client_from_args(args) def _progress(done: int, total: int, article_id: int) -> None: print(f" classify {done}/{total} (article {article_id})", flush=True) try: report = classify_articles( conn, client, limit=args.classify_limit, include_rejected=True, only_unclassified=True, progress=_progress, ) print( f"classify: attempted={report.attempted} succeeded={report.succeeded} " f"skipped={report.skipped} (model {client.model})", flush=True, ) except Exception as exc: # endpoint down, timeout, etc. — keep going print(f"classify: skipped ({exc})", flush=True) if not args.no_dedup: try: stats = run_dedup(conn, llm_client_from_args(args)) print(f"dedup: embedded={stats['embedded']} duplicates_hidden={stats['duplicates']}") except Exception as exc: print(f"dedup: skipped ({exc})") # Geo: tag newly-accepted, non-duplicate articles with subject geography (its own # LLM pass, decoupled from scoring). Bounded per cycle; idempotent (skips rows # already at the current GEO_VERSION). Non-fatal like every other step. if not args.no_geo: try: g = tag_geo(conn, llm_client_from_args(args), limit=args.geo_limit) print(f"geo: tagged={g['tagged']} errors={g['errors']} (of {g['candidates']} untagged)") except Exception as exc: print(f"geo: skipped ({exc})") # Daily Art: ensure the pool exists, then ensure today has a cached piece. No-ops # once the day is picked; non-fatal like every other step. if not args.no_art: try: a = art.run_daily(conn, client=LocalModelClient.from_env()) # client → the guide blurb print(f"art: pool={a['pool']} picked={a['picked_object']}") except Exception as exc: print(f"art: skipped ({exc})") # Small joys: On This Day (history), Quote of the Day, Word of the Day. Each is # bounded + non-fatal; one shared LLM client for tone/explainer/word proposals. if not args.no_joys: joy_client = LocalModelClient.from_env() try: o = onthisday.run_daily(conn, client=joy_client) print(f"onthisday: md={o['md']} picked={'yes' if o['picked'] else 'no'}") except Exception as exc: print(f"onthisday: skipped ({exc})") try: q = quote.run_daily(conn, client=joy_client) print(f"quote: pool={q['pool']} picked={q['picked']}") except Exception as exc: print(f"quote: skipped ({exc})") try: w = wotd.run_daily(conn, client=joy_client) print(f"word: pool={w['pool']} picked={w['picked']}") except Exception as exc: print(f"word: skipped ({exc})") if not args.no_brief: today = local_today() try: brief_id = build_daily_brief(conn, brief_date=today, limit=7, replace=True) found = enrich_brief_images(conn, today) print(f"brief: rebuilt {today} (id {brief_id}); {found} hero image(s) enriched") except Exception as exc: print(f"brief: skipped ({exc})") # Keep the Latest feed photo-rich: fetch quality og:images for the newest # accepted articles that lack one (bounded per cycle). try: recent = enrich_recent_images(conn) if recent: print(f"recent images: {recent} enriched") except Exception as exc: print(f"recent images: skipped ({exc})") # Cache downscaled display copies of those images on our own origin (so the # hub/feed serve local files, not flaky third-party hotlinks), then enforce the # size ceiling with LRU eviction. Both bounded + non-fatal. try: warmed = newsimg.warm(conn) pr = newsimg.prune() print(f"images: cached={warmed} size={pr['after'] // 1048576}MB/" f"{pr['cap'] // 1048576}MB evicted={pr['removed']}") except Exception as exc: print(f"images: skipped ({exc})") # Full-article read-times: count words for recent accepted articles so the # front door can show "Full story · ~N min" next to our gist (bounded per cycle). try: reads = enrich_read_times(conn) if reads: print(f"read-times: {reads} counted") except Exception as exc: print(f"read-times: skipped ({exc})") # Pre-warm summaries for today's brief so Today reads as a calm briefing. # Idempotent: cached items are skipped, so this only hits the LLM for new ones. try: client = llm_client_from_args(args) ids = [r[0] for r in conn.execute( "SELECT bi.article_id FROM daily_briefs b JOIN daily_brief_items bi " "ON bi.brief_id = b.id WHERE b.brief_date = ?", (today,) )] made = 0 for aid in ids: had = get_summary(conn, aid) generate_summary(conn, aid, client=client) if not had and get_summary(conn, aid): made += 1 print(f"summaries: {made} new (of {len(ids)} brief items)") except Exception as exc: print(f"summaries: skipped ({exc})") if not args.no_review: try: flagged = review_sources(conn) print(f"review: {len(flagged)} source(s) flagged for review (advisory)") except Exception as exc: print(f"review: skipped ({exc})") try: from .queries import reindex_search print(f"search: indexed {reindex_search(conn)} articles") except Exception as exc: # noqa: BLE001 — search index is non-critical print(f"search: skipped ({exc})") if not args.no_digest: try: sent = send_due_digests(conn) # morning-gated + deduped internally if sent: print(f"digest: sent {sent}") except Exception as exc: print(f"digest: skipped ({exc})") # Pre-generate today's daily puzzles (with the LLM 'why'); idempotent. try: made = generate_daily_puzzles(conn, local_today(), client=llm_client_from_args(args)) if made: print(f"puzzles: generated {made}") except Exception as exc: print(f"puzzles: skipped ({exc})") def serve(args: argparse.Namespace) -> None: try: import uvicorn except ModuleNotFoundError: raise SystemExit( "The web server needs the optional 'web' extra. Install it with:\n" " pip install -e '.[web]'" ) # Make sure the API reads the same database the CLI was pointed at. os.environ.setdefault("GOODNEWS_DB", str(args.db)) print(f"Serving goodNews on http://{args.host}:{args.port} (docs at /docs)") uvicorn.run("goodnews.api:app", host=args.host, port=args.port, reload=args.reload) def list_category( conn: sqlite3.Connection, topic: str | None, flavor: str | None, limit: int, accepted_only: bool, ) -> None: clauses = [] params: list = [] if accepted_only: clauses.append("s.accepted = 1") if topic: clauses.append("s.topic = ?") params.append(topic.lower()) if flavor: clauses.append("s.flavor = ?") params.append(flavor.lower()) where = ("WHERE " + " AND ".join(clauses)) if clauses else "" params.append(limit) rows = conn.execute( f""" SELECT a.id, a.title, a.canonical_url, a.published_at, src.name AS source_name, s.topic, s.flavor, s.accepted, s.constructive_score, s.cortisol_score, s.reason_code, (s.constructive_score + s.agency_score + s.human_benefit_score + src.trust_score - s.cortisol_score - s.ragebait_score - s.pr_risk_score) AS rank_score FROM articles a JOIN sources src ON src.id = a.source_id JOIN article_scores s ON s.article_id = a.id {where} ORDER BY rank_score DESC, COALESCE(a.published_at, a.discovered_at) DESC LIMIT ? """, params, ).fetchall() label = " / ".join(filter(None, [topic, flavor])) or "all categories" print(f"{label} ({len(rows)} shown)") for row in rows: accepted = "" if row["accepted"] else " [not accepted]" print(f"[{row['id']}] {row['topic']}/{row['flavor']} | {row['source_name']}{accepted}") print(f" {row['title']}") print(f" score={row['rank_score']} reason={row['reason_code']}") print(f" {row['canonical_url']}") def llm_client_from_args(args: argparse.Namespace) -> LocalModelClient: client = LocalModelClient.from_env() if getattr(args, "base_url", None): client.base_url = args.base_url.rstrip("/") if getattr(args, "model", None): client.model = args.model return client def list_sources(conn: sqlite3.Connection, active_only: bool) -> None: where = "WHERE active = 1" if active_only else "" rows = conn.execute( f""" SELECT id, name, active, default_category, trust_score, pr_risk_score, feed_url FROM sources {where} ORDER BY name """ ).fetchall() for row in rows: state = "active" if row["active"] else "inactive" print( f"[{row['id']}] {row['name']} ({state}, {row['default_category']}, " f"trust={row['trust_score']}, pr={row['pr_risk_score']})" ) print(f" {row['feed_url']}") def source_report(conn: sqlite3.Connection) -> None: rows = conn.execute( """ SELECT src.name, src.default_category, src.trust_score, src.pr_risk_score AS source_pr_risk, src.review_flag, src.review_reason, src.consecutive_failures, COUNT(a.id) AS articles, SUM(CASE WHEN s.accepted = 1 THEN 1 ELSE 0 END) AS accepted, ROUND(AVG(s.constructive_score), 1) AS avg_constructive, ROUND(AVG(s.cortisol_score), 1) AS avg_cortisol, ROUND(AVG(s.ragebait_score), 1) AS avg_ragebait, MAX(a.published_at) AS newest_article FROM sources src LEFT JOIN articles a ON a.source_id = src.id LEFT JOIN article_scores s ON s.article_id = a.id GROUP BY src.id ORDER BY accepted DESC, articles DESC, src.name """ ).fetchall() for row in rows: articles = row["articles"] or 0 accepted = row["accepted"] or 0 rate = (accepted / articles * 100) if articles else 0 print( f"{row['name']} | {row['default_category']} | " f"articles={articles} accepted={accepted} ({rate:.1f}%)" ) print( f" trust={row['trust_score']} pr={row['source_pr_risk']} " f"avg_constructive={row['avg_constructive']} " f"avg_cortisol={row['avg_cortisol']} " f"avg_ragebait={row['avg_ragebait']}" ) print(f" newest={row['newest_article'] or 'none'}") if row["review_flag"]: print(f" ⚑ review: {row['review_reason']}") def list_runs(conn: sqlite3.Connection, limit: int) -> None: rows = conn.execute( """ SELECT r.id, r.started_at, r.finished_at, r.status, src.name AS source_name, r.items_seen, r.items_inserted, r.items_duplicate, r.error FROM ingest_runs r LEFT JOIN sources src ON src.id = r.source_id ORDER BY r.id DESC LIMIT ? """, (limit,), ).fetchall() for row in rows: print( f"[{row['id']}] {row['status']} | {row['source_name'] or 'unknown'} | " f"seen={row['items_seen']} inserted={row['items_inserted']} duplicate={row['items_duplicate']}" ) if row["error"]: print(f" error: {row['error']}") def rescore_articles(conn: sqlite3.Connection) -> int: rows = conn.execute( """ SELECT a.id, a.title, a.description, src.pr_risk_score FROM articles a JOIN sources src ON src.id = a.source_id ORDER BY a.id """ ).fetchall() for row in rows: scores = score_article(row["title"], row["description"], int(row["pr_risk_score"])) conn.execute( """ INSERT INTO article_scores ( article_id, constructive_score, cortisol_score, ragebait_score, agency_score, human_benefit_score, novelty_score, pr_risk_score, accepted, reason_code, reason_text, model_name, scored_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP) ON CONFLICT(article_id) DO UPDATE SET constructive_score = excluded.constructive_score, cortisol_score = excluded.cortisol_score, ragebait_score = excluded.ragebait_score, agency_score = excluded.agency_score, human_benefit_score = excluded.human_benefit_score, novelty_score = excluded.novelty_score, pr_risk_score = excluded.pr_risk_score, accepted = excluded.accepted, reason_code = excluded.reason_code, reason_text = excluded.reason_text, model_name = excluded.model_name, scored_at = CURRENT_TIMESTAMP """, ( row["id"], scores["constructive_score"], scores["cortisol_score"], scores["ragebait_score"], scores["agency_score"], scores["human_benefit_score"], scores["novelty_score"], scores["pr_risk_score"], scores["accepted"], scores["reason_code"], scores["reason_text"], scores["model_name"], ), ) conn.commit() return len(rows) def print_brief(rows: list[sqlite3.Row]) -> None: if not rows: print("No brief items found.") return date = rows[0]["brief_date"] print(f"Highlights from Today - {date}") for row in rows: print(f"{row['rank']}. {row['title']}") print(f" {row['source_name']} | {row['default_category']} | {row['model_name']}") print(f" reason: {row['reason_code']}") print(f" {row['canonical_url']}") def _format_result(result: dict) -> str: if "sources" in result: return ( f"Polled {result['sources']} sources: seen={result['seen']} " f"inserted={result['inserted']} duplicate={result['duplicate']} failed={result['failed']}" ) if result.get("status") == "failed": return ( f"Poll failed: seen={result['seen']} inserted={result['inserted']} " f"duplicate={result['duplicate']} error={result['error']}" ) return ( f"Poll ok: seen={result['seen']} inserted={result['inserted']} " f"duplicate={result['duplicate']}" ) if __name__ == "__main__": main()