Files
upbeatBytes/goodnews/cli.py
T
thejayman77 50dc2167cd Durable image quality: stop trusting feed thumbnails; cycle enriches Latest
Make "no blurry images" sustainable, not a one-off cleanup. RSS feed thumbnails
(~44% were ~90px) were stored at ingest and upscaled to mush, so new articles
would reintroduce them. Now image_url is filled ONLY by the quality-gated
og:image enrichment:

* insert_article no longer stores the feed image (was canonicalize_url(item...)).
* enrich_recent_images(): the cycle fetches a quality og:image for the newest
  accepted, imageless articles each run (bounded), keeping Latest photo-rich.
* Brief + on-open enrichment unchanged.

Net: every stored image is a validated, ≥450px og:image; the rest are clean
placeholders.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 15:55:57 -04:00

738 lines
31 KiB
Python

from __future__ import annotations
import argparse
import os
import sqlite3
from pathlib import Path
from .briefs import build_daily_brief, show_brief
from .db import connect, init_db
from .localtime import local_today
from .dedup import DEFAULT_THRESHOLD, DEFAULT_WINDOW_DAYS, dedup as run_dedup
from .enrich import enrich_brief_images, enrich_recent_images, enrich_summarized_images
from .summarize import generate_summary, get_summary
from .feeds import (
fetch_feed,
parse_feed,
poll_all_sources,
poll_due_sources,
poll_source,
preview_feed,
)
from .llm import LocalModelClient, classify_articles
from .scoring import score_article
from .sources import (
list_candidates,
load_sources,
promote_candidate,
reject_candidate,
review_sources,
save_candidate,
upsert_sources,
)
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_DB = ROOT / "data" / "goodnews.sqlite3"
DEFAULT_SOURCES = ROOT / "config" / "sources.toml"
def main() -> None:
parser = argparse.ArgumentParser(prog="goodnews")
parser.add_argument("--db", type=Path, default=DEFAULT_DB, help="SQLite database path")
subparsers = parser.add_subparsers(dest="command", required=True)
subparsers.add_parser("init-db", help="Create or update the SQLite schema")
import_parser = subparsers.add_parser("import-sources", help="Load sources from TOML")
import_parser.add_argument("--sources", type=Path, default=DEFAULT_SOURCES)
poll_parser = subparsers.add_parser("poll", help="Poll active RSS/Atom sources")
poll_parser.add_argument("--source", help="Poll one source by exact name")
poll_parser.add_argument("--limit", type=int, help="Poll only the first N active sources")
list_parser = subparsers.add_parser("list-recent", help="Show recently discovered articles")
list_parser.add_argument("--limit", type=int, default=20)
list_parser.add_argument("--accepted-only", action="store_true")
source_parser = subparsers.add_parser("list-sources", help="Show configured sources")
source_parser.add_argument("--active-only", action="store_true")
cat_parser = subparsers.add_parser("list-category", help="Browse articles by topic and/or flavor")
cat_parser.add_argument("--topic", help="Filter by topic, e.g. science, environment, animals")
cat_parser.add_argument("--flavor", help="Filter by flavor, e.g. breakthrough, discovery, feelgood")
cat_parser.add_argument("--limit", type=int, default=20)
cat_parser.add_argument("--all", action="store_true", help="Include not-accepted articles")
subparsers.add_parser("source-report", help="Show source-level ingestion and scoring stats")
check_feeds_parser = subparsers.add_parser("check-feeds", help="Fetch and parse each feed, reporting health")
check_feeds_parser.add_argument("--all", action="store_true", help="Include inactive sources")
preview_parser = subparsers.add_parser("preview-source", help="Score a sample of a feed without adding it")
preview_parser.add_argument("url", help="Feed URL to preview")
preview_parser.add_argument("--sample", type=int, default=25)
preview_parser.add_argument("--classify", action="store_true", help="Also classify with the local model (slower)")
preview_parser.add_argument("--base-url", help="OpenAI-compatible base URL (with --classify)")
preview_parser.add_argument("--model", help="Local model name (with --classify)")
suggest_parser = subparsers.add_parser("suggest-source", help="Preview a feed and stage it as a candidate")
suggest_parser.add_argument("url", help="Feed URL to suggest")
suggest_parser.add_argument("--name", help="Display name for the source")
suggest_parser.add_argument("--homepage", help="Homepage URL")
suggest_parser.add_argument("--sample", type=int, default=25)
suggest_parser.add_argument("--classify", action="store_true", help="Classify the sample with the local model")
suggest_parser.add_argument("--base-url")
suggest_parser.add_argument("--model")
cand_parser = subparsers.add_parser("list-candidates", help="List staged source candidates")
cand_parser.add_argument("--status", help="Filter by status: suggested|quarantined|rejected|promoted")
promote_parser = subparsers.add_parser("promote-candidate", help="Copy a candidate into the real sources")
promote_parser.add_argument("id", type=int)
promote_parser.add_argument("--active", action="store_true", help="Activate immediately (default: inactive)")
promote_parser.add_argument("--category", help="default_category for the new source")
promote_parser.add_argument("--trust", type=int, default=5)
promote_parser.add_argument("--pr-risk", type=int, default=3)
reject_parser = subparsers.add_parser("reject-candidate", help="Mark a candidate as rejected")
reject_parser.add_argument("id", type=int)
review_parser = subparsers.add_parser(
"review-sources", help="Recompute advisory review flags (never deactivates anything)"
)
review_parser.add_argument("--stale-days", type=int, default=14)
runs_parser = subparsers.add_parser("list-runs", help="Show recent ingest runs")
runs_parser.add_argument("--limit", type=int, default=20)
subparsers.add_parser("rescore", help="Re-run heuristic scores for stored articles")
classify_parser = subparsers.add_parser("classify", help="Classify candidates with a local LLM")
classify_parser.add_argument("--limit", type=int, default=10)
classify_parser.add_argument("--include-rejected", action="store_true")
classify_parser.add_argument("--dry-run", action="store_true")
classify_parser.add_argument("--base-url", help="OpenAI-compatible base URL, e.g. http://127.0.0.1:1234/v1")
classify_parser.add_argument("--model", help="Local model name")
cycle_parser = subparsers.add_parser(
"cycle", help="Poll due sources, classify new articles, rebuild today's brief (for scheduling)"
)
cycle_parser.add_argument("--classify-limit", type=int, default=40)
cycle_parser.add_argument("--no-classify", action="store_true", help="Skip the LLM classify step")
cycle_parser.add_argument("--no-dedup", action="store_true", help="Skip the embedding dedup step")
cycle_parser.add_argument("--no-brief", action="store_true", help="Skip rebuilding today's brief")
cycle_parser.add_argument("--no-review", action="store_true", help="Skip recomputing source review flags")
cycle_parser.add_argument("--force", action="store_true", help="Poll all active sources, ignoring intervals")
cycle_parser.add_argument("--base-url", help="OpenAI-compatible base URL for classify")
cycle_parser.add_argument("--model", help="Local model name for classify")
enrich_images_parser = subparsers.add_parser(
"enrich-images", help="Backfill og:images for already-summarized articles that lack one"
)
enrich_images_parser.add_argument("--limit", type=int, default=50, help="Max articles to fetch this batch")
dedup_parser = subparsers.add_parser("dedup", help="Cluster near-duplicate stories via local embeddings")
dedup_parser.add_argument("--threshold", type=float, default=DEFAULT_THRESHOLD, help="Cosine similarity cutoff")
dedup_parser.add_argument("--window-days", type=int, default=DEFAULT_WINDOW_DAYS)
dedup_parser.add_argument("--embed-limit", type=int, help="Cap how many missing embeddings to compute")
dedup_parser.add_argument("--base-url", help="OpenAI-compatible base URL")
dedup_parser.add_argument("--model", help="Chat model name (unused for embeddings)")
check_llm_parser = subparsers.add_parser("check-llm", help="Check local OpenAI-compatible model endpoint")
check_llm_parser.add_argument("--base-url", help="OpenAI-compatible base URL, e.g. http://127.0.0.1:1234/v1")
check_llm_parser.add_argument("--model", help="Expected local model name")
brief_parser = subparsers.add_parser("build-brief", help="Build/freeze a daily brief")
brief_parser.add_argument("--date", help="Brief date in YYYY-MM-DD format; defaults to today")
brief_parser.add_argument("--limit", type=int, default=7)
brief_parser.add_argument("--replace", action="store_true")
show_brief_parser = subparsers.add_parser("show-brief", help="Show a stored daily brief")
show_brief_parser.add_argument("--date", help="Brief date in YYYY-MM-DD format; defaults to latest brief")
show_brief_parser.add_argument("--limit", type=int, default=10)
serve_parser = subparsers.add_parser("serve", help="Run the web/API server (requires the 'web' extra)")
serve_parser.add_argument("--host", default="127.0.0.1", help="Bind host; use 0.0.0.0 to expose")
serve_parser.add_argument("--port", type=int, default=8000)
serve_parser.add_argument("--reload", action="store_true", help="Auto-reload on code changes (dev)")
args = parser.parse_args()
if args.command == "serve":
serve(args)
return
conn = connect(args.db)
if args.command == "init-db":
init_db(conn)
print(f"Initialized {args.db}")
elif args.command == "import-sources":
init_db(conn)
sources = load_sources(args.sources)
count = upsert_sources(conn, sources)
print(f"Imported {count} sources from {args.sources}")
elif args.command == "poll":
init_db(conn)
if args.source:
source = conn.execute("SELECT * FROM sources WHERE name = ?", (args.source,)).fetchone()
if not source:
raise SystemExit(f"No source named {args.source!r}")
result = poll_source(conn, source)
else:
result = poll_all_sources(conn, limit=args.limit)
print(_format_result(result))
elif args.command == "list-recent":
list_recent(conn, limit=args.limit, accepted_only=args.accepted_only)
elif args.command == "list-sources":
list_sources(conn, active_only=args.active_only)
elif args.command == "list-category":
list_category(conn, topic=args.topic, flavor=args.flavor, limit=args.limit, accepted_only=not args.all)
elif args.command == "source-report":
source_report(conn)
elif args.command == "check-feeds":
check_feeds(conn, include_inactive=args.all)
elif args.command == "preview-source":
client = llm_client_from_args(args) if args.classify else None
preview = preview_feed(args.url, sample=args.sample, client=client)
print_preview(preview)
elif args.command == "suggest-source":
init_db(conn)
client = llm_client_from_args(args) if args.classify else None
preview = preview_feed(args.url, sample=args.sample, client=client)
print_preview(preview)
cand = save_candidate(conn, args.url, preview=preview, name=args.name, homepage_url=args.homepage)
print(f"Saved as candidate #{cand['id']} (status {cand['status']}). Review with list-candidates.")
elif args.command == "list-candidates":
init_db(conn)
rows = list_candidates(conn, status=args.status)
if not rows:
print("No candidates.")
for r in rows:
line = f"[{r['id']}] {r['status']} | {r['name'] or '(unnamed)'} | {r['feed_url']}"
if r["preview_json"]:
import json as _json
p = _json.loads(r["preview_json"])
line += f" (accept {round(p.get('acceptance_rate', 0) * 100)}%, sampled {p.get('sampled', 0)})"
print(line)
elif args.command == "promote-candidate":
init_db(conn)
try:
source_id = promote_candidate(
conn, args.id, active=args.active, default_category=args.category,
trust_score=args.trust, pr_risk_score=args.pr_risk,
)
except ValueError as exc:
raise SystemExit(str(exc))
state = "active" if args.active else "inactive"
print(f"Promoted candidate #{args.id} -> source #{source_id} ({state}).")
elif args.command == "reject-candidate":
init_db(conn)
ok = reject_candidate(conn, args.id)
print(f"Rejected candidate #{args.id}." if ok else f"No candidate #{args.id}.")
elif args.command == "review-sources":
init_db(conn)
flagged = review_sources(conn, stale_days=args.stale_days)
if not flagged:
print("All active sources look healthy.")
else:
print(f"{len(flagged)} source(s) flagged for review (advisory — none deactivated):")
for f in flagged:
print(f" [{f['id']}] {f['name']}: {f['reason']}")
elif args.command == "list-runs":
list_runs(conn, limit=args.limit)
elif args.command == "rescore":
count = rescore_articles(conn)
print(f"Rescored {count} articles")
elif args.command == "classify":
init_db(conn)
client = llm_client_from_args(args)
report = classify_articles(
conn,
client,
limit=args.limit,
include_rejected=args.include_rejected,
dry_run=args.dry_run,
)
for article_id, scores in report.results:
accepted = "yes" if scores["accepted"] else "no"
print(
f"[{article_id}] accepted={accepted} {scores['topic']}/{scores['flavor']} "
f"reason={scores['reason_code']}"
)
print(f" {scores['reason_text']}")
print(
f"classify: attempted={report.attempted} succeeded={report.succeeded} "
f"skipped={report.skipped}"
)
if args.dry_run:
print("Dry run only; database was not updated.")
elif args.command == "cycle":
run_cycle(conn, args)
elif args.command == "enrich-images":
found = enrich_summarized_images(conn, limit=args.limit)
print(f"enrich-images: {found} new image(s) for summarized articles")
elif args.command == "dedup":
init_db(conn)
client = llm_client_from_args(args)
stats = run_dedup(
conn, client, threshold=args.threshold, window_days=args.window_days, embed_limit=args.embed_limit
)
print(
f"dedup: embedded={stats['embedded']} articles={stats['articles']} "
f"clusters={stats['clusters']} duplicate_clusters={stats['duplicate_clusters']} "
f"duplicates_hidden={stats['duplicates']}"
)
elif args.command == "check-llm":
client = llm_client_from_args(args)
try:
models = client.list_models()
except RuntimeError as exc:
raise SystemExit(str(exc))
print(f"Connected to {client.base_url}")
if models:
print("Models:")
for model in models:
marker = " *" if model == client.model else ""
print(f" {model}{marker}")
else:
print("Endpoint responded, but no models were listed.")
elif args.command == "build-brief":
init_db(conn)
brief_id = build_daily_brief(
conn,
brief_date=args.date,
limit=args.limit,
replace=args.replace,
)
print(f"Built brief {brief_id}")
bdate = args.date or local_today()
found = enrich_brief_images(conn, bdate)
if found:
print(f"Enriched {found} hero image(s)")
print_brief(show_brief(conn, brief_date=args.date, limit=args.limit))
elif args.command == "show-brief":
print_brief(show_brief(conn, brief_date=args.date, limit=args.limit))
def list_recent(conn: sqlite3.Connection, limit: int, accepted_only: bool) -> None:
where = "WHERE s.accepted = 1" if accepted_only else ""
rows = conn.execute(
f"""
SELECT
a.id,
a.published_at,
src.name AS source_name,
a.title,
a.canonical_url,
s.accepted,
s.constructive_score,
s.cortisol_score,
s.ragebait_score,
s.reason_code
FROM articles a
JOIN sources src ON src.id = a.source_id
LEFT JOIN article_scores s ON s.article_id = a.id
{where}
ORDER BY COALESCE(a.published_at, a.discovered_at) DESC
LIMIT ?
""",
(limit,),
).fetchall()
for row in rows:
accepted = "yes" if row["accepted"] else "no"
print(f"[{row['id']}] {row['published_at'] or 'no date'} | {row['source_name']} | accepted={accepted}")
print(f" {row['title']}")
print(
" scores: "
f"constructive={row['constructive_score']} "
f"cortisol={row['cortisol_score']} "
f"ragebait={row['ragebait_score']} "
f"reason={row['reason_code']}"
)
print(f" {row['canonical_url']}")
def print_preview(p: dict) -> None:
mode = "model" if p["classified"] else "heuristic"
print(f"Preview of {p['url']} ({mode})")
print(f" sampled={p['sampled']} accepted={p['accepted']} ({p['acceptance_rate']*100:.0f}%)")
print(f" freshness: newest={p['newest_published'] or 'unknown'} in_last_7d={p['recent_7d']}")
print(f" averages: cortisol={p['avg_cortisol']} ragebait={p['avg_ragebait']} pr_risk={p['avg_pr_risk']}")
if p["topic_mix"]:
print(f" topics: {p['topic_mix']}")
print(f" flavors: {p['flavor_mix']}")
if p["examples_accepted"]:
print(" would accept:")
for t in p["examples_accepted"]:
print(f" + {t[:80]}")
if p["examples_rejected"]:
print(" would skip:")
for ex in p["examples_rejected"]:
print(f" - {ex['title'][:70]} ({ex['reason']})")
def check_feeds(conn: sqlite3.Connection, include_inactive: bool = False) -> None:
where = "" if include_inactive else "WHERE active = 1"
rows = conn.execute(f"SELECT name, feed_url FROM sources {where} ORDER BY name").fetchall()
ok = 0
for row in rows:
try:
items = parse_feed(fetch_feed(row["feed_url"]))
ok += 1
print(f"OK {row['name']}: {len(items)} items")
except Exception as exc:
print(f"FAIL {row['name']}: {exc}")
print(f"--- {ok}/{len(rows)} feeds healthy ---")
def run_cycle(conn: sqlite3.Connection, args: argparse.Namespace) -> None:
"""One end-to-end pass for a scheduler: poll due sources, classify the new
arrivals, dedup, rebuild today's brief. Each step is independent and
non-fatal so a down model endpoint or empty day never aborts the cycle.
Holds an exclusive lock so a manual run and the systemd timer (or two timer
ticks) can never overlap and contend on the database and model.
"""
import fcntl
lock_path = Path(args.db).parent / ".goodnews-cycle.lock"
lock_file = open(lock_path, "w")
try:
fcntl.flock(lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB)
except OSError:
print("cycle: another cycle is already running; skipping")
lock_file.close()
return
try:
_run_cycle_locked(conn, args)
finally:
fcntl.flock(lock_file, fcntl.LOCK_UN)
lock_file.close()
def _run_cycle_locked(conn: sqlite3.Connection, args: argparse.Namespace) -> None:
init_db(conn)
if args.force:
poll_result = poll_all_sources(conn)
else:
poll_result = poll_due_sources(conn)
print(f"poll: {_format_result(poll_result)}", flush=True)
if not args.no_classify:
client = llm_client_from_args(args)
def _progress(done: int, total: int, article_id: int) -> None:
print(f" classify {done}/{total} (article {article_id})", flush=True)
try:
report = classify_articles(
conn,
client,
limit=args.classify_limit,
include_rejected=True,
only_unclassified=True,
progress=_progress,
)
print(
f"classify: attempted={report.attempted} succeeded={report.succeeded} "
f"skipped={report.skipped} (model {client.model})",
flush=True,
)
except Exception as exc: # endpoint down, timeout, etc. — keep going
print(f"classify: skipped ({exc})", flush=True)
if not args.no_dedup:
try:
stats = run_dedup(conn, llm_client_from_args(args))
print(f"dedup: embedded={stats['embedded']} duplicates_hidden={stats['duplicates']}")
except Exception as exc:
print(f"dedup: skipped ({exc})")
if not args.no_brief:
today = local_today()
try:
brief_id = build_daily_brief(conn, brief_date=today, limit=7, replace=True)
found = enrich_brief_images(conn, today)
print(f"brief: rebuilt {today} (id {brief_id}); {found} hero image(s) enriched")
except Exception as exc:
print(f"brief: skipped ({exc})")
# Keep the Latest feed photo-rich: fetch quality og:images for the newest
# accepted articles that lack one (bounded per cycle).
try:
recent = enrich_recent_images(conn)
if recent:
print(f"recent images: {recent} enriched")
except Exception as exc:
print(f"recent images: skipped ({exc})")
# Pre-warm summaries for today's brief so Today reads as a calm briefing.
# Idempotent: cached items are skipped, so this only hits the LLM for new ones.
try:
client = llm_client_from_args(args)
ids = [r[0] for r in conn.execute(
"SELECT bi.article_id FROM daily_briefs b JOIN daily_brief_items bi "
"ON bi.brief_id = b.id WHERE b.brief_date = ?", (today,)
)]
made = 0
for aid in ids:
had = get_summary(conn, aid)
generate_summary(conn, aid, client=client)
if not had and get_summary(conn, aid):
made += 1
print(f"summaries: {made} new (of {len(ids)} brief items)")
except Exception as exc:
print(f"summaries: skipped ({exc})")
if not args.no_review:
try:
flagged = review_sources(conn)
print(f"review: {len(flagged)} source(s) flagged for review (advisory)")
except Exception as exc:
print(f"review: skipped ({exc})")
def serve(args: argparse.Namespace) -> None:
try:
import uvicorn
except ModuleNotFoundError:
raise SystemExit(
"The web server needs the optional 'web' extra. Install it with:\n"
" pip install -e '.[web]'"
)
# Make sure the API reads the same database the CLI was pointed at.
os.environ.setdefault("GOODNEWS_DB", str(args.db))
print(f"Serving goodNews on http://{args.host}:{args.port} (docs at /docs)")
uvicorn.run("goodnews.api:app", host=args.host, port=args.port, reload=args.reload)
def list_category(
conn: sqlite3.Connection,
topic: str | None,
flavor: str | None,
limit: int,
accepted_only: bool,
) -> None:
clauses = []
params: list = []
if accepted_only:
clauses.append("s.accepted = 1")
if topic:
clauses.append("s.topic = ?")
params.append(topic.lower())
if flavor:
clauses.append("s.flavor = ?")
params.append(flavor.lower())
where = ("WHERE " + " AND ".join(clauses)) if clauses else ""
params.append(limit)
rows = conn.execute(
f"""
SELECT
a.id, a.title, a.canonical_url, a.published_at,
src.name AS source_name,
s.topic, s.flavor, s.accepted,
s.constructive_score, s.cortisol_score, s.reason_code,
(s.constructive_score + s.agency_score + s.human_benefit_score + src.trust_score
- s.cortisol_score - s.ragebait_score - s.pr_risk_score) AS rank_score
FROM articles a
JOIN sources src ON src.id = a.source_id
JOIN article_scores s ON s.article_id = a.id
{where}
ORDER BY rank_score DESC, COALESCE(a.published_at, a.discovered_at) DESC
LIMIT ?
""",
params,
).fetchall()
label = " / ".join(filter(None, [topic, flavor])) or "all categories"
print(f"{label} ({len(rows)} shown)")
for row in rows:
accepted = "" if row["accepted"] else " [not accepted]"
print(f"[{row['id']}] {row['topic']}/{row['flavor']} | {row['source_name']}{accepted}")
print(f" {row['title']}")
print(f" score={row['rank_score']} reason={row['reason_code']}")
print(f" {row['canonical_url']}")
def llm_client_from_args(args: argparse.Namespace) -> LocalModelClient:
client = LocalModelClient.from_env()
if getattr(args, "base_url", None):
client.base_url = args.base_url.rstrip("/")
if getattr(args, "model", None):
client.model = args.model
return client
def list_sources(conn: sqlite3.Connection, active_only: bool) -> None:
where = "WHERE active = 1" if active_only else ""
rows = conn.execute(
f"""
SELECT id, name, active, default_category, trust_score, pr_risk_score, feed_url
FROM sources
{where}
ORDER BY name
"""
).fetchall()
for row in rows:
state = "active" if row["active"] else "inactive"
print(
f"[{row['id']}] {row['name']} ({state}, {row['default_category']}, "
f"trust={row['trust_score']}, pr={row['pr_risk_score']})"
)
print(f" {row['feed_url']}")
def source_report(conn: sqlite3.Connection) -> None:
rows = conn.execute(
"""
SELECT
src.name,
src.default_category,
src.trust_score,
src.pr_risk_score AS source_pr_risk,
src.review_flag,
src.review_reason,
src.consecutive_failures,
COUNT(a.id) AS articles,
SUM(CASE WHEN s.accepted = 1 THEN 1 ELSE 0 END) AS accepted,
ROUND(AVG(s.constructive_score), 1) AS avg_constructive,
ROUND(AVG(s.cortisol_score), 1) AS avg_cortisol,
ROUND(AVG(s.ragebait_score), 1) AS avg_ragebait,
MAX(a.published_at) AS newest_article
FROM sources src
LEFT JOIN articles a ON a.source_id = src.id
LEFT JOIN article_scores s ON s.article_id = a.id
GROUP BY src.id
ORDER BY accepted DESC, articles DESC, src.name
"""
).fetchall()
for row in rows:
articles = row["articles"] or 0
accepted = row["accepted"] or 0
rate = (accepted / articles * 100) if articles else 0
print(
f"{row['name']} | {row['default_category']} | "
f"articles={articles} accepted={accepted} ({rate:.1f}%)"
)
print(
f" trust={row['trust_score']} pr={row['source_pr_risk']} "
f"avg_constructive={row['avg_constructive']} "
f"avg_cortisol={row['avg_cortisol']} "
f"avg_ragebait={row['avg_ragebait']}"
)
print(f" newest={row['newest_article'] or 'none'}")
if row["review_flag"]:
print(f" ⚑ review: {row['review_reason']}")
def list_runs(conn: sqlite3.Connection, limit: int) -> None:
rows = conn.execute(
"""
SELECT r.id, r.started_at, r.finished_at, r.status, src.name AS source_name,
r.items_seen, r.items_inserted, r.items_duplicate, r.error
FROM ingest_runs r
LEFT JOIN sources src ON src.id = r.source_id
ORDER BY r.id DESC
LIMIT ?
""",
(limit,),
).fetchall()
for row in rows:
print(
f"[{row['id']}] {row['status']} | {row['source_name'] or 'unknown'} | "
f"seen={row['items_seen']} inserted={row['items_inserted']} duplicate={row['items_duplicate']}"
)
if row["error"]:
print(f" error: {row['error']}")
def rescore_articles(conn: sqlite3.Connection) -> int:
rows = conn.execute(
"""
SELECT a.id, a.title, a.description, src.pr_risk_score
FROM articles a
JOIN sources src ON src.id = a.source_id
ORDER BY a.id
"""
).fetchall()
for row in rows:
scores = score_article(row["title"], row["description"], int(row["pr_risk_score"]))
conn.execute(
"""
INSERT INTO article_scores (
article_id, constructive_score, cortisol_score, ragebait_score,
agency_score, human_benefit_score, novelty_score, pr_risk_score,
accepted, reason_code, reason_text, model_name, scored_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
ON CONFLICT(article_id) DO UPDATE SET
constructive_score = excluded.constructive_score,
cortisol_score = excluded.cortisol_score,
ragebait_score = excluded.ragebait_score,
agency_score = excluded.agency_score,
human_benefit_score = excluded.human_benefit_score,
novelty_score = excluded.novelty_score,
pr_risk_score = excluded.pr_risk_score,
accepted = excluded.accepted,
reason_code = excluded.reason_code,
reason_text = excluded.reason_text,
model_name = excluded.model_name,
scored_at = CURRENT_TIMESTAMP
""",
(
row["id"],
scores["constructive_score"],
scores["cortisol_score"],
scores["ragebait_score"],
scores["agency_score"],
scores["human_benefit_score"],
scores["novelty_score"],
scores["pr_risk_score"],
scores["accepted"],
scores["reason_code"],
scores["reason_text"],
scores["model_name"],
),
)
conn.commit()
return len(rows)
def print_brief(rows: list[sqlite3.Row]) -> None:
if not rows:
print("No brief items found.")
return
date = rows[0]["brief_date"]
print(f"Highlights from Today - {date}")
for row in rows:
print(f"{row['rank']}. {row['title']}")
print(f" {row['source_name']} | {row['default_category']} | {row['model_name']}")
print(f" reason: {row['reason_code']}")
print(f" {row['canonical_url']}")
def _format_result(result: dict) -> str:
if "sources" in result:
return (
f"Polled {result['sources']} sources: seen={result['seen']} "
f"inserted={result['inserted']} duplicate={result['duplicate']} failed={result['failed']}"
)
if result.get("status") == "failed":
return (
f"Poll failed: seen={result['seen']} inserted={result['inserted']} "
f"duplicate={result['duplicate']} error={result['error']}"
)
return (
f"Poll ok: seen={result['seen']} inserted={result['inserted']} "
f"duplicate={result['duplicate']}"
)
if __name__ == "__main__":
main()