ed814c97b9
- daily_art gains blurb + palette columns (idempotent migration). - art._palette: Pillow median-cut to ~5 hex colors from the cached image (best- effort → [] on any failure). art._blurb: a warm 2-3 sentence "what you're looking at" note grounded in the Met catalogue (title/artist/bio/date/medium/ classification/culture/tags). Prompt leans on context/significance and the title+tags for subject — explicitly NOT asserting literal composition (figure counts/poses) it can't see, since the model can't view the image. Markdown stripped from the output. - pick_daily generates both (client optional → blurb skipped when absent); cycle + art CLI pass an LLM client. /api/art/today exposes blurb + palette. - Backfilled the last 3 days on host (Veteran / Magnolia Vase / Bierstadt). - scripts/art_blurb_palette_backfill.py for in-place backfill (no re-pick). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
898 lines
39 KiB
Python
898 lines
39 KiB
Python
from __future__ import annotations
|
|
|
|
import argparse
|
|
import contextlib
|
|
import os
|
|
import sqlite3
|
|
from pathlib import Path
|
|
|
|
from .briefs import build_daily_brief, show_brief
|
|
from .db import connect, init_db
|
|
from .digest import send_due_digests
|
|
from .games import generate_daily_puzzles
|
|
from .localtime import local_today
|
|
from .dedup import DEFAULT_THRESHOLD, DEFAULT_WINDOW_DAYS, cluster_duplicates, dedup as run_dedup
|
|
from .geo import tag_articles as tag_geo
|
|
from . import art, onthisday, quote, wotd
|
|
from .enrich import enrich_brief_images, enrich_read_times, enrich_recent_images, enrich_summarized_images
|
|
from .summarize import generate_summary, get_summary
|
|
from .feeds import (
|
|
fetch_feed,
|
|
parse_feed,
|
|
poll_all_sources,
|
|
poll_due_sources,
|
|
poll_source,
|
|
preview_feed,
|
|
)
|
|
from .llm import LocalModelClient, classify_articles
|
|
from .scoring import score_article
|
|
from .sources import (
|
|
list_candidates,
|
|
load_sources,
|
|
promote_candidate,
|
|
reject_candidate,
|
|
review_sources,
|
|
save_candidate,
|
|
upsert_sources,
|
|
)
|
|
|
|
|
|
ROOT = Path(__file__).resolve().parents[1]
|
|
DEFAULT_DB = ROOT / "data" / "goodnews.sqlite3"
|
|
DEFAULT_SOURCES = ROOT / "config" / "sources.toml"
|
|
|
|
|
|
def _default_db() -> Path:
|
|
# Honor GOODNEWS_DB like the rest of the app (db.connect) does, so `GOODNEWS_DB=… `
|
|
# actually targets that DB instead of being silently ignored — otherwise a copy-DB
|
|
# maintenance run (e.g. dedup --force-recluster) can land on production by surprise.
|
|
return Path(os.environ.get("GOODNEWS_DB") or DEFAULT_DB)
|
|
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser(prog="goodnews")
|
|
parser.add_argument("--db", type=Path, default=_default_db(),
|
|
help="SQLite database path (defaults to $GOODNEWS_DB, else the bundled data/ DB)")
|
|
subparsers = parser.add_subparsers(dest="command", required=True)
|
|
|
|
subparsers.add_parser("init-db", help="Create or update the SQLite schema")
|
|
|
|
import_parser = subparsers.add_parser("import-sources", help="Load sources from TOML")
|
|
import_parser.add_argument("--sources", type=Path, default=DEFAULT_SOURCES)
|
|
|
|
poll_parser = subparsers.add_parser("poll", help="Poll active RSS/Atom sources")
|
|
poll_parser.add_argument("--source", help="Poll one source by exact name")
|
|
poll_parser.add_argument("--limit", type=int, help="Poll only the first N active sources")
|
|
|
|
list_parser = subparsers.add_parser("list-recent", help="Show recently discovered articles")
|
|
list_parser.add_argument("--limit", type=int, default=20)
|
|
list_parser.add_argument("--accepted-only", action="store_true")
|
|
|
|
source_parser = subparsers.add_parser("list-sources", help="Show configured sources")
|
|
source_parser.add_argument("--active-only", action="store_true")
|
|
|
|
cat_parser = subparsers.add_parser("list-category", help="Browse articles by topic and/or flavor")
|
|
cat_parser.add_argument("--topic", help="Filter by topic, e.g. science, environment, animals")
|
|
cat_parser.add_argument("--flavor", help="Filter by flavor, e.g. breakthrough, discovery, feelgood")
|
|
cat_parser.add_argument("--limit", type=int, default=20)
|
|
cat_parser.add_argument("--all", action="store_true", help="Include not-accepted articles")
|
|
|
|
subparsers.add_parser("source-report", help="Show source-level ingestion and scoring stats")
|
|
|
|
check_feeds_parser = subparsers.add_parser("check-feeds", help="Fetch and parse each feed, reporting health")
|
|
check_feeds_parser.add_argument("--all", action="store_true", help="Include inactive sources")
|
|
|
|
preview_parser = subparsers.add_parser("preview-source", help="Score a sample of a feed without adding it")
|
|
preview_parser.add_argument("url", help="Feed URL to preview")
|
|
preview_parser.add_argument("--sample", type=int, default=25)
|
|
preview_parser.add_argument("--classify", action="store_true", help="Also classify with the local model (slower)")
|
|
preview_parser.add_argument("--base-url", help="OpenAI-compatible base URL (with --classify)")
|
|
preview_parser.add_argument("--model", help="Local model name (with --classify)")
|
|
|
|
suggest_parser = subparsers.add_parser("suggest-source", help="Preview a feed and stage it as a candidate")
|
|
suggest_parser.add_argument("url", help="Feed URL to suggest")
|
|
suggest_parser.add_argument("--name", help="Display name for the source")
|
|
suggest_parser.add_argument("--homepage", help="Homepage URL")
|
|
suggest_parser.add_argument("--sample", type=int, default=25)
|
|
suggest_parser.add_argument("--classify", action="store_true", help="Classify the sample with the local model")
|
|
suggest_parser.add_argument("--base-url")
|
|
suggest_parser.add_argument("--model")
|
|
|
|
cand_parser = subparsers.add_parser("list-candidates", help="List staged source candidates")
|
|
cand_parser.add_argument("--status", help="Filter by status: suggested|quarantined|rejected|promoted")
|
|
|
|
promote_parser = subparsers.add_parser("promote-candidate", help="Copy a candidate into the real sources")
|
|
promote_parser.add_argument("id", type=int)
|
|
promote_parser.add_argument("--active", action="store_true", help="Activate immediately (default: inactive)")
|
|
promote_parser.add_argument("--category", help="default_category for the new source")
|
|
promote_parser.add_argument("--trust", type=int, default=5)
|
|
promote_parser.add_argument("--pr-risk", type=int, default=3)
|
|
|
|
reject_parser = subparsers.add_parser("reject-candidate", help="Mark a candidate as rejected")
|
|
reject_parser.add_argument("id", type=int)
|
|
|
|
review_parser = subparsers.add_parser(
|
|
"review-sources", help="Recompute advisory review flags (never deactivates anything)"
|
|
)
|
|
review_parser.add_argument("--stale-days", type=int, default=14)
|
|
|
|
runs_parser = subparsers.add_parser("list-runs", help="Show recent ingest runs")
|
|
runs_parser.add_argument("--limit", type=int, default=20)
|
|
|
|
subparsers.add_parser("rescore", help="Re-run heuristic scores for stored articles")
|
|
|
|
classify_parser = subparsers.add_parser("classify", help="Classify candidates with a local LLM")
|
|
classify_parser.add_argument("--limit", type=int, default=10)
|
|
classify_parser.add_argument("--include-rejected", action="store_true")
|
|
classify_parser.add_argument("--dry-run", action="store_true")
|
|
classify_parser.add_argument("--base-url", help="OpenAI-compatible base URL, e.g. http://127.0.0.1:1234/v1")
|
|
classify_parser.add_argument("--model", help="Local model name")
|
|
|
|
cycle_parser = subparsers.add_parser(
|
|
"cycle", help="Poll due sources, classify new articles, rebuild today's brief (for scheduling)"
|
|
)
|
|
cycle_parser.add_argument("--classify-limit", type=int, default=40)
|
|
cycle_parser.add_argument("--no-classify", action="store_true", help="Skip the LLM classify step")
|
|
cycle_parser.add_argument("--no-dedup", action="store_true", help="Skip the embedding dedup step")
|
|
cycle_parser.add_argument("--no-geo", action="store_true", help="Skip tagging article subject-geography")
|
|
cycle_parser.add_argument("--geo-limit", type=int, default=60, help="Max articles to geo-tag per cycle")
|
|
cycle_parser.add_argument("--no-art", action="store_true", help="Skip the Daily Art pick")
|
|
cycle_parser.add_argument("--no-joys", action="store_true", help="Skip the small-joys picks (On This Day, etc.)")
|
|
cycle_parser.add_argument("--no-brief", action="store_true", help="Skip rebuilding today's brief")
|
|
cycle_parser.add_argument("--no-review", action="store_true", help="Skip recomputing source review flags")
|
|
cycle_parser.add_argument("--no-digest", action="store_true", help="Skip sending due daily digests")
|
|
cycle_parser.add_argument("--force", action="store_true", help="Poll all active sources, ignoring intervals")
|
|
cycle_parser.add_argument("--base-url", help="OpenAI-compatible base URL for classify")
|
|
cycle_parser.add_argument("--model", help="Local model name for classify")
|
|
|
|
digest_parser = subparsers.add_parser("send-digests", help="Send today's digest to opted-in users (morning-gated)")
|
|
digest_parser.add_argument("--force", action="store_true", help="Ignore the morning send window")
|
|
|
|
enrich_images_parser = subparsers.add_parser(
|
|
"enrich-images", help="Backfill og:images for already-summarized articles that lack one"
|
|
)
|
|
enrich_images_parser.add_argument("--limit", type=int, default=50, help="Max articles to fetch this batch")
|
|
|
|
art_parser = subparsers.add_parser("art", help="Daily Art: harvest the pool and/or pick today's cached piece")
|
|
art_parser.add_argument("--harvest", action="store_true", help="(Re)harvest the curated museum pool")
|
|
art_parser.add_argument("--force", action="store_true", help="Re-pick today's art even if already chosen")
|
|
|
|
geo_parser = subparsers.add_parser("geo", help="Tag article subject-geography (backfill / manual). Cycle-locked.")
|
|
geo_parser.add_argument("--limit", type=int, default=200, help="Max articles to tag this batch")
|
|
geo_parser.add_argument("--reclassify", action="store_true", help="Re-tag even rows already at the current geo version")
|
|
geo_parser.add_argument("--base-url", help="OpenAI-compatible base URL")
|
|
geo_parser.add_argument("--model", help="Local model name")
|
|
|
|
dedup_parser = subparsers.add_parser("dedup", help="Cluster near-duplicate stories via local embeddings")
|
|
dedup_parser.add_argument("--threshold", type=float, default=DEFAULT_THRESHOLD, help="Cosine similarity cutoff")
|
|
dedup_parser.add_argument("--window-days", type=int, default=DEFAULT_WINDOW_DAYS)
|
|
dedup_parser.add_argument("--embed-limit", type=int, help="Cap how many missing embeddings to compute")
|
|
dedup_parser.add_argument("--base-url", help="OpenAI-compatible base URL")
|
|
dedup_parser.add_argument("--model", help="Chat model name (unused for embeddings)")
|
|
dedup_parser.add_argument("--force-recluster", action="store_true",
|
|
help="Re-cluster the EXISTING corpus even if no new embeddings "
|
|
"(re-applies representative policy; cycle-locked, no model needed)")
|
|
|
|
check_llm_parser = subparsers.add_parser("check-llm", help="Check local OpenAI-compatible model endpoint")
|
|
check_llm_parser.add_argument("--base-url", help="OpenAI-compatible base URL, e.g. http://127.0.0.1:1234/v1")
|
|
check_llm_parser.add_argument("--model", help="Expected local model name")
|
|
|
|
brief_parser = subparsers.add_parser("build-brief", help="Build/freeze a daily brief")
|
|
brief_parser.add_argument("--date", help="Brief date in YYYY-MM-DD format; defaults to today")
|
|
brief_parser.add_argument("--limit", type=int, default=7)
|
|
brief_parser.add_argument("--replace", action="store_true")
|
|
|
|
show_brief_parser = subparsers.add_parser("show-brief", help="Show a stored daily brief")
|
|
show_brief_parser.add_argument("--date", help="Brief date in YYYY-MM-DD format; defaults to latest brief")
|
|
show_brief_parser.add_argument("--limit", type=int, default=10)
|
|
|
|
serve_parser = subparsers.add_parser("serve", help="Run the web/API server (requires the 'web' extra)")
|
|
serve_parser.add_argument("--host", default="127.0.0.1", help="Bind host; use 0.0.0.0 to expose")
|
|
serve_parser.add_argument("--port", type=int, default=8000)
|
|
serve_parser.add_argument("--reload", action="store_true", help="Auto-reload on code changes (dev)")
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.command == "serve":
|
|
serve(args)
|
|
return
|
|
|
|
conn = connect(args.db)
|
|
|
|
if args.command == "init-db":
|
|
init_db(conn)
|
|
print(f"Initialized {args.db}")
|
|
elif args.command == "import-sources":
|
|
init_db(conn)
|
|
sources = load_sources(args.sources)
|
|
count = upsert_sources(conn, sources)
|
|
print(f"Imported {count} sources from {args.sources}")
|
|
elif args.command == "poll":
|
|
init_db(conn)
|
|
if args.source:
|
|
source = conn.execute("SELECT * FROM sources WHERE name = ?", (args.source,)).fetchone()
|
|
if not source:
|
|
raise SystemExit(f"No source named {args.source!r}")
|
|
result = poll_source(conn, source)
|
|
else:
|
|
result = poll_all_sources(conn, limit=args.limit)
|
|
print(_format_result(result))
|
|
elif args.command == "list-recent":
|
|
list_recent(conn, limit=args.limit, accepted_only=args.accepted_only)
|
|
elif args.command == "list-sources":
|
|
list_sources(conn, active_only=args.active_only)
|
|
elif args.command == "list-category":
|
|
list_category(conn, topic=args.topic, flavor=args.flavor, limit=args.limit, accepted_only=not args.all)
|
|
elif args.command == "source-report":
|
|
source_report(conn)
|
|
elif args.command == "check-feeds":
|
|
check_feeds(conn, include_inactive=args.all)
|
|
elif args.command == "preview-source":
|
|
client = llm_client_from_args(args) if args.classify else None
|
|
preview = preview_feed(args.url, sample=args.sample, client=client)
|
|
print_preview(preview)
|
|
elif args.command == "suggest-source":
|
|
init_db(conn)
|
|
client = llm_client_from_args(args) if args.classify else None
|
|
preview = preview_feed(args.url, sample=args.sample, client=client)
|
|
print_preview(preview)
|
|
cand = save_candidate(conn, args.url, preview=preview, name=args.name, homepage_url=args.homepage)
|
|
print(f"Saved as candidate #{cand['id']} (status {cand['status']}). Review with list-candidates.")
|
|
elif args.command == "list-candidates":
|
|
init_db(conn)
|
|
rows = list_candidates(conn, status=args.status)
|
|
if not rows:
|
|
print("No candidates.")
|
|
for r in rows:
|
|
line = f"[{r['id']}] {r['status']} | {r['name'] or '(unnamed)'} | {r['feed_url']}"
|
|
if r["preview_json"]:
|
|
import json as _json
|
|
|
|
p = _json.loads(r["preview_json"])
|
|
_rate = p.get("acceptance_rate")
|
|
_rate_str = f"{round(_rate * 100)}%" if _rate is not None else "—"
|
|
line += f" (accept {_rate_str}, sampled {p.get('sampled', 0)})"
|
|
print(line)
|
|
elif args.command == "promote-candidate":
|
|
init_db(conn)
|
|
try:
|
|
source_id = promote_candidate(
|
|
conn, args.id, active=args.active, default_category=args.category,
|
|
trust_score=args.trust, pr_risk_score=args.pr_risk,
|
|
)
|
|
except ValueError as exc:
|
|
raise SystemExit(str(exc))
|
|
state = "active" if args.active else "inactive"
|
|
print(f"Promoted candidate #{args.id} -> source #{source_id} ({state}).")
|
|
elif args.command == "reject-candidate":
|
|
init_db(conn)
|
|
ok = reject_candidate(conn, args.id)
|
|
print(f"Rejected candidate #{args.id}." if ok else f"No candidate #{args.id}.")
|
|
elif args.command == "review-sources":
|
|
init_db(conn)
|
|
flagged = review_sources(conn, stale_days=args.stale_days)
|
|
if not flagged:
|
|
print("All active sources look healthy.")
|
|
else:
|
|
print(f"{len(flagged)} source(s) flagged for review (advisory — none deactivated):")
|
|
for f in flagged:
|
|
print(f" [{f['id']}] {f['name']}: {f['reason']}")
|
|
elif args.command == "list-runs":
|
|
list_runs(conn, limit=args.limit)
|
|
elif args.command == "rescore":
|
|
count = rescore_articles(conn)
|
|
print(f"Rescored {count} articles")
|
|
elif args.command == "classify":
|
|
init_db(conn)
|
|
client = llm_client_from_args(args)
|
|
report = classify_articles(
|
|
conn,
|
|
client,
|
|
limit=args.limit,
|
|
include_rejected=args.include_rejected,
|
|
dry_run=args.dry_run,
|
|
)
|
|
for article_id, scores in report.results:
|
|
accepted = "yes" if scores["accepted"] else "no"
|
|
print(
|
|
f"[{article_id}] accepted={accepted} {scores['topic']}/{scores['flavor']} "
|
|
f"reason={scores['reason_code']}"
|
|
)
|
|
print(f" {scores['reason_text']}")
|
|
print(
|
|
f"classify: attempted={report.attempted} succeeded={report.succeeded} "
|
|
f"skipped={report.skipped}"
|
|
)
|
|
if args.dry_run:
|
|
print("Dry run only; database was not updated.")
|
|
elif args.command == "cycle":
|
|
run_cycle(conn, args)
|
|
elif args.command == "send-digests":
|
|
init_db(conn)
|
|
sent = send_due_digests(conn, force=args.force)
|
|
print(f"send-digests: sent {sent}")
|
|
elif args.command == "enrich-images":
|
|
found = enrich_summarized_images(conn, limit=args.limit)
|
|
print(f"enrich-images: {found} new image(s) for summarized articles")
|
|
elif args.command == "art":
|
|
init_db(conn)
|
|
if args.harvest:
|
|
h = art.harvest_pool(conn)
|
|
print(f"art harvest: found={h['found']} added={h['added']} pool={h['pool']} errors={h['errors']}")
|
|
picked = art.pick_daily(conn, force=args.force, client=LocalModelClient.from_env())
|
|
if picked:
|
|
print(f"art pick: {picked['art_date']} -> #{picked['object_id']} "
|
|
f"\"{picked['title']}\" — {picked['artist'] or 'Unknown'}")
|
|
else:
|
|
print("art pick: nothing fetched (kept the last piece)")
|
|
elif args.command == "geo":
|
|
init_db(conn)
|
|
# Cycle-locked so a manual backfill can't contend with the scheduled cycle.
|
|
with cycle_lock(args.db) as acquired:
|
|
if not acquired:
|
|
print("geo: a cycle is already running; try again after it finishes")
|
|
return
|
|
g = tag_geo(conn, llm_client_from_args(args), limit=args.limit, reclassify=args.reclassify)
|
|
print(f"geo: tagged={g['tagged']} errors={g['errors']} (of {g['candidates']} candidates)")
|
|
elif args.command == "dedup":
|
|
init_db(conn)
|
|
if args.force_recluster:
|
|
# Re-apply representative policy to the EXISTING corpus. The normal path
|
|
# fast-skips when no new embeddings exist, so it would NOT pick up a policy
|
|
# change. Cycle-locked so it can't overlap the scheduled timer; no model
|
|
# needed (pure re-cluster over stored embeddings).
|
|
with cycle_lock(args.db) as acquired:
|
|
if not acquired:
|
|
print("dedup: a cycle is already running; re-run --force-recluster after it finishes")
|
|
return
|
|
stats = cluster_duplicates(conn, threshold=args.threshold, window_days=args.window_days)
|
|
print(
|
|
f"dedup (forced recluster): articles={stats['articles']} "
|
|
f"clusters={stats['clusters']} duplicate_clusters={stats['duplicate_clusters']} "
|
|
f"duplicates_hidden={stats['duplicates']}"
|
|
)
|
|
else:
|
|
client = llm_client_from_args(args)
|
|
stats = run_dedup(
|
|
conn, client, threshold=args.threshold, window_days=args.window_days, embed_limit=args.embed_limit
|
|
)
|
|
print(
|
|
f"dedup: embedded={stats['embedded']} articles={stats['articles']} "
|
|
f"clusters={stats['clusters']} duplicate_clusters={stats['duplicate_clusters']} "
|
|
f"duplicates_hidden={stats['duplicates']}"
|
|
)
|
|
elif args.command == "check-llm":
|
|
client = llm_client_from_args(args)
|
|
try:
|
|
models = client.list_models()
|
|
except RuntimeError as exc:
|
|
raise SystemExit(str(exc))
|
|
print(f"Connected to {client.base_url}")
|
|
if models:
|
|
print("Models:")
|
|
for model in models:
|
|
marker = " *" if model == client.model else ""
|
|
print(f" {model}{marker}")
|
|
else:
|
|
print("Endpoint responded, but no models were listed.")
|
|
elif args.command == "build-brief":
|
|
init_db(conn)
|
|
brief_id = build_daily_brief(
|
|
conn,
|
|
brief_date=args.date,
|
|
limit=args.limit,
|
|
replace=args.replace,
|
|
)
|
|
print(f"Built brief {brief_id}")
|
|
bdate = args.date or local_today()
|
|
found = enrich_brief_images(conn, bdate)
|
|
if found:
|
|
print(f"Enriched {found} hero image(s)")
|
|
print_brief(show_brief(conn, brief_date=args.date, limit=args.limit))
|
|
elif args.command == "show-brief":
|
|
print_brief(show_brief(conn, brief_date=args.date, limit=args.limit))
|
|
|
|
|
|
def list_recent(conn: sqlite3.Connection, limit: int, accepted_only: bool) -> None:
|
|
where = "WHERE s.accepted = 1" if accepted_only else ""
|
|
rows = conn.execute(
|
|
f"""
|
|
SELECT
|
|
a.id,
|
|
a.published_at,
|
|
src.name AS source_name,
|
|
a.title,
|
|
a.canonical_url,
|
|
s.accepted,
|
|
s.constructive_score,
|
|
s.cortisol_score,
|
|
s.ragebait_score,
|
|
s.reason_code
|
|
FROM articles a
|
|
JOIN sources src ON src.id = a.source_id
|
|
LEFT JOIN article_scores s ON s.article_id = a.id
|
|
{where}
|
|
ORDER BY COALESCE(a.published_at, a.discovered_at) DESC
|
|
LIMIT ?
|
|
""",
|
|
(limit,),
|
|
).fetchall()
|
|
for row in rows:
|
|
accepted = "yes" if row["accepted"] else "no"
|
|
print(f"[{row['id']}] {row['published_at'] or 'no date'} | {row['source_name']} | accepted={accepted}")
|
|
print(f" {row['title']}")
|
|
print(
|
|
" scores: "
|
|
f"constructive={row['constructive_score']} "
|
|
f"cortisol={row['cortisol_score']} "
|
|
f"ragebait={row['ragebait_score']} "
|
|
f"reason={row['reason_code']}"
|
|
)
|
|
print(f" {row['canonical_url']}")
|
|
|
|
|
|
def print_preview(p: dict) -> None:
|
|
mode = "model" if p["classified"] else "heuristic"
|
|
print(f"Preview of {p['url']} ({mode})")
|
|
rate = p.get("acceptance_rate")
|
|
rate_str = f"{rate * 100:.0f}%" if rate is not None else "— (all held)"
|
|
print(f" sampled={p['sampled']} accepted={p['accepted']} ({rate_str})")
|
|
print(f" freshness: newest={p['newest_published'] or 'unknown'} in_last_7d={p['recent_7d']}")
|
|
print(f" averages: cortisol={p['avg_cortisol']} ragebait={p['avg_ragebait']} pr_risk={p['avg_pr_risk']}")
|
|
if p["topic_mix"]:
|
|
print(f" topics: {p['topic_mix']}")
|
|
print(f" flavors: {p['flavor_mix']}")
|
|
if p["examples_accepted"]:
|
|
print(" would accept:")
|
|
for t in p["examples_accepted"]:
|
|
print(f" + {t[:80]}")
|
|
if p["examples_rejected"]:
|
|
print(" would skip:")
|
|
for ex in p["examples_rejected"]:
|
|
print(f" - {ex['title'][:70]} ({ex['reason']})")
|
|
|
|
|
|
def check_feeds(conn: sqlite3.Connection, include_inactive: bool = False) -> None:
|
|
where = "" if include_inactive else "WHERE active = 1"
|
|
rows = conn.execute(f"SELECT name, feed_url FROM sources {where} ORDER BY name").fetchall()
|
|
ok = 0
|
|
for row in rows:
|
|
try:
|
|
items = parse_feed(fetch_feed(row["feed_url"]))
|
|
ok += 1
|
|
print(f"OK {row['name']}: {len(items)} items")
|
|
except Exception as exc:
|
|
print(f"FAIL {row['name']}: {exc}")
|
|
print(f"--- {ok}/{len(rows)} feeds healthy ---")
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def cycle_lock(db_path):
|
|
"""Exclusive, non-blocking lock shared by the scheduled cycle and any manual job
|
|
that mutates the corpus (e.g. a forced dedup re-cluster), so they can never overlap
|
|
and contend on the database/model. Yields True if acquired, False if already held."""
|
|
import fcntl
|
|
|
|
lock_path = Path(db_path).parent / ".goodnews-cycle.lock"
|
|
lock_file = open(lock_path, "w")
|
|
try:
|
|
fcntl.flock(lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
|
except OSError:
|
|
lock_file.close()
|
|
yield False
|
|
return
|
|
try:
|
|
yield True
|
|
finally:
|
|
fcntl.flock(lock_file, fcntl.LOCK_UN)
|
|
lock_file.close()
|
|
|
|
|
|
def run_cycle(conn: sqlite3.Connection, args: argparse.Namespace) -> None:
|
|
"""One end-to-end pass for a scheduler: poll due sources, classify the new
|
|
arrivals, dedup, rebuild today's brief. Each step is independent and
|
|
non-fatal so a down model endpoint or empty day never aborts the cycle.
|
|
|
|
Holds an exclusive lock so a manual run and the systemd timer (or two timer
|
|
ticks) can never overlap and contend on the database and model.
|
|
"""
|
|
with cycle_lock(args.db) as acquired:
|
|
if not acquired:
|
|
print("cycle: another cycle is already running; skipping")
|
|
return
|
|
_run_cycle_locked(conn, args)
|
|
|
|
|
|
def _run_cycle_locked(conn: sqlite3.Connection, args: argparse.Namespace) -> None:
|
|
init_db(conn)
|
|
|
|
if args.force:
|
|
poll_result = poll_all_sources(conn)
|
|
else:
|
|
poll_result = poll_due_sources(conn)
|
|
print(f"poll: {_format_result(poll_result)}", flush=True)
|
|
|
|
if not args.no_classify:
|
|
client = llm_client_from_args(args)
|
|
|
|
def _progress(done: int, total: int, article_id: int) -> None:
|
|
print(f" classify {done}/{total} (article {article_id})", flush=True)
|
|
|
|
try:
|
|
report = classify_articles(
|
|
conn,
|
|
client,
|
|
limit=args.classify_limit,
|
|
include_rejected=True,
|
|
only_unclassified=True,
|
|
progress=_progress,
|
|
)
|
|
print(
|
|
f"classify: attempted={report.attempted} succeeded={report.succeeded} "
|
|
f"skipped={report.skipped} (model {client.model})",
|
|
flush=True,
|
|
)
|
|
except Exception as exc: # endpoint down, timeout, etc. — keep going
|
|
print(f"classify: skipped ({exc})", flush=True)
|
|
|
|
if not args.no_dedup:
|
|
try:
|
|
stats = run_dedup(conn, llm_client_from_args(args))
|
|
print(f"dedup: embedded={stats['embedded']} duplicates_hidden={stats['duplicates']}")
|
|
except Exception as exc:
|
|
print(f"dedup: skipped ({exc})")
|
|
|
|
# Geo: tag newly-accepted, non-duplicate articles with subject geography (its own
|
|
# LLM pass, decoupled from scoring). Bounded per cycle; idempotent (skips rows
|
|
# already at the current GEO_VERSION). Non-fatal like every other step.
|
|
if not args.no_geo:
|
|
try:
|
|
g = tag_geo(conn, llm_client_from_args(args), limit=args.geo_limit)
|
|
print(f"geo: tagged={g['tagged']} errors={g['errors']} (of {g['candidates']} untagged)")
|
|
except Exception as exc:
|
|
print(f"geo: skipped ({exc})")
|
|
|
|
# Daily Art: ensure the pool exists, then ensure today has a cached piece. No-ops
|
|
# once the day is picked; non-fatal like every other step.
|
|
if not args.no_art:
|
|
try:
|
|
a = art.run_daily(conn, client=LocalModelClient.from_env()) # client → the guide blurb
|
|
print(f"art: pool={a['pool']} picked={a['picked_object']}")
|
|
except Exception as exc:
|
|
print(f"art: skipped ({exc})")
|
|
|
|
# Small joys: On This Day (history), Quote of the Day, Word of the Day. Each is
|
|
# bounded + non-fatal; one shared LLM client for tone/explainer/word proposals.
|
|
if not args.no_joys:
|
|
joy_client = LocalModelClient.from_env()
|
|
try:
|
|
o = onthisday.run_daily(conn, client=joy_client)
|
|
print(f"onthisday: md={o['md']} picked={'yes' if o['picked'] else 'no'}")
|
|
except Exception as exc:
|
|
print(f"onthisday: skipped ({exc})")
|
|
try:
|
|
q = quote.run_daily(conn, client=joy_client)
|
|
print(f"quote: pool={q['pool']} picked={q['picked']}")
|
|
except Exception as exc:
|
|
print(f"quote: skipped ({exc})")
|
|
try:
|
|
w = wotd.run_daily(conn, client=joy_client)
|
|
print(f"word: pool={w['pool']} picked={w['picked']}")
|
|
except Exception as exc:
|
|
print(f"word: skipped ({exc})")
|
|
|
|
if not args.no_brief:
|
|
today = local_today()
|
|
try:
|
|
brief_id = build_daily_brief(conn, brief_date=today, limit=7, replace=True)
|
|
found = enrich_brief_images(conn, today)
|
|
print(f"brief: rebuilt {today} (id {brief_id}); {found} hero image(s) enriched")
|
|
except Exception as exc:
|
|
print(f"brief: skipped ({exc})")
|
|
|
|
# Keep the Latest feed photo-rich: fetch quality og:images for the newest
|
|
# accepted articles that lack one (bounded per cycle).
|
|
try:
|
|
recent = enrich_recent_images(conn)
|
|
if recent:
|
|
print(f"recent images: {recent} enriched")
|
|
except Exception as exc:
|
|
print(f"recent images: skipped ({exc})")
|
|
|
|
# Full-article read-times: count words for recent accepted articles so the
|
|
# front door can show "Full story · ~N min" next to our gist (bounded per cycle).
|
|
try:
|
|
reads = enrich_read_times(conn)
|
|
if reads:
|
|
print(f"read-times: {reads} counted")
|
|
except Exception as exc:
|
|
print(f"read-times: skipped ({exc})")
|
|
|
|
# Pre-warm summaries for today's brief so Today reads as a calm briefing.
|
|
# Idempotent: cached items are skipped, so this only hits the LLM for new ones.
|
|
try:
|
|
client = llm_client_from_args(args)
|
|
ids = [r[0] for r in conn.execute(
|
|
"SELECT bi.article_id FROM daily_briefs b JOIN daily_brief_items bi "
|
|
"ON bi.brief_id = b.id WHERE b.brief_date = ?", (today,)
|
|
)]
|
|
made = 0
|
|
for aid in ids:
|
|
had = get_summary(conn, aid)
|
|
generate_summary(conn, aid, client=client)
|
|
if not had and get_summary(conn, aid):
|
|
made += 1
|
|
print(f"summaries: {made} new (of {len(ids)} brief items)")
|
|
except Exception as exc:
|
|
print(f"summaries: skipped ({exc})")
|
|
|
|
if not args.no_review:
|
|
try:
|
|
flagged = review_sources(conn)
|
|
print(f"review: {len(flagged)} source(s) flagged for review (advisory)")
|
|
except Exception as exc:
|
|
print(f"review: skipped ({exc})")
|
|
|
|
try:
|
|
from .queries import reindex_search
|
|
print(f"search: indexed {reindex_search(conn)} articles")
|
|
except Exception as exc: # noqa: BLE001 — search index is non-critical
|
|
print(f"search: skipped ({exc})")
|
|
|
|
if not args.no_digest:
|
|
try:
|
|
sent = send_due_digests(conn) # morning-gated + deduped internally
|
|
if sent:
|
|
print(f"digest: sent {sent}")
|
|
except Exception as exc:
|
|
print(f"digest: skipped ({exc})")
|
|
|
|
# Pre-generate today's daily puzzles (with the LLM 'why'); idempotent.
|
|
try:
|
|
made = generate_daily_puzzles(conn, local_today(), client=llm_client_from_args(args))
|
|
if made:
|
|
print(f"puzzles: generated {made}")
|
|
except Exception as exc:
|
|
print(f"puzzles: skipped ({exc})")
|
|
|
|
|
|
def serve(args: argparse.Namespace) -> None:
|
|
try:
|
|
import uvicorn
|
|
except ModuleNotFoundError:
|
|
raise SystemExit(
|
|
"The web server needs the optional 'web' extra. Install it with:\n"
|
|
" pip install -e '.[web]'"
|
|
)
|
|
# Make sure the API reads the same database the CLI was pointed at.
|
|
os.environ.setdefault("GOODNEWS_DB", str(args.db))
|
|
print(f"Serving goodNews on http://{args.host}:{args.port} (docs at /docs)")
|
|
uvicorn.run("goodnews.api:app", host=args.host, port=args.port, reload=args.reload)
|
|
|
|
|
|
def list_category(
|
|
conn: sqlite3.Connection,
|
|
topic: str | None,
|
|
flavor: str | None,
|
|
limit: int,
|
|
accepted_only: bool,
|
|
) -> None:
|
|
clauses = []
|
|
params: list = []
|
|
if accepted_only:
|
|
clauses.append("s.accepted = 1")
|
|
if topic:
|
|
clauses.append("s.topic = ?")
|
|
params.append(topic.lower())
|
|
if flavor:
|
|
clauses.append("s.flavor = ?")
|
|
params.append(flavor.lower())
|
|
where = ("WHERE " + " AND ".join(clauses)) if clauses else ""
|
|
params.append(limit)
|
|
|
|
rows = conn.execute(
|
|
f"""
|
|
SELECT
|
|
a.id, a.title, a.canonical_url, a.published_at,
|
|
src.name AS source_name,
|
|
s.topic, s.flavor, s.accepted,
|
|
s.constructive_score, s.cortisol_score, s.reason_code,
|
|
(s.constructive_score + s.agency_score + s.human_benefit_score + src.trust_score
|
|
- s.cortisol_score - s.ragebait_score - s.pr_risk_score) AS rank_score
|
|
FROM articles a
|
|
JOIN sources src ON src.id = a.source_id
|
|
JOIN article_scores s ON s.article_id = a.id
|
|
{where}
|
|
ORDER BY rank_score DESC, COALESCE(a.published_at, a.discovered_at) DESC
|
|
LIMIT ?
|
|
""",
|
|
params,
|
|
).fetchall()
|
|
|
|
label = " / ".join(filter(None, [topic, flavor])) or "all categories"
|
|
print(f"{label} ({len(rows)} shown)")
|
|
for row in rows:
|
|
accepted = "" if row["accepted"] else " [not accepted]"
|
|
print(f"[{row['id']}] {row['topic']}/{row['flavor']} | {row['source_name']}{accepted}")
|
|
print(f" {row['title']}")
|
|
print(f" score={row['rank_score']} reason={row['reason_code']}")
|
|
print(f" {row['canonical_url']}")
|
|
|
|
|
|
def llm_client_from_args(args: argparse.Namespace) -> LocalModelClient:
|
|
client = LocalModelClient.from_env()
|
|
if getattr(args, "base_url", None):
|
|
client.base_url = args.base_url.rstrip("/")
|
|
if getattr(args, "model", None):
|
|
client.model = args.model
|
|
return client
|
|
|
|
|
|
def list_sources(conn: sqlite3.Connection, active_only: bool) -> None:
|
|
where = "WHERE active = 1" if active_only else ""
|
|
rows = conn.execute(
|
|
f"""
|
|
SELECT id, name, active, default_category, trust_score, pr_risk_score, feed_url
|
|
FROM sources
|
|
{where}
|
|
ORDER BY name
|
|
"""
|
|
).fetchall()
|
|
for row in rows:
|
|
state = "active" if row["active"] else "inactive"
|
|
print(
|
|
f"[{row['id']}] {row['name']} ({state}, {row['default_category']}, "
|
|
f"trust={row['trust_score']}, pr={row['pr_risk_score']})"
|
|
)
|
|
print(f" {row['feed_url']}")
|
|
|
|
|
|
def source_report(conn: sqlite3.Connection) -> None:
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT
|
|
src.name,
|
|
src.default_category,
|
|
src.trust_score,
|
|
src.pr_risk_score AS source_pr_risk,
|
|
src.review_flag,
|
|
src.review_reason,
|
|
src.consecutive_failures,
|
|
COUNT(a.id) AS articles,
|
|
SUM(CASE WHEN s.accepted = 1 THEN 1 ELSE 0 END) AS accepted,
|
|
ROUND(AVG(s.constructive_score), 1) AS avg_constructive,
|
|
ROUND(AVG(s.cortisol_score), 1) AS avg_cortisol,
|
|
ROUND(AVG(s.ragebait_score), 1) AS avg_ragebait,
|
|
MAX(a.published_at) AS newest_article
|
|
FROM sources src
|
|
LEFT JOIN articles a ON a.source_id = src.id
|
|
LEFT JOIN article_scores s ON s.article_id = a.id
|
|
GROUP BY src.id
|
|
ORDER BY accepted DESC, articles DESC, src.name
|
|
"""
|
|
).fetchall()
|
|
for row in rows:
|
|
articles = row["articles"] or 0
|
|
accepted = row["accepted"] or 0
|
|
rate = (accepted / articles * 100) if articles else 0
|
|
print(
|
|
f"{row['name']} | {row['default_category']} | "
|
|
f"articles={articles} accepted={accepted} ({rate:.1f}%)"
|
|
)
|
|
print(
|
|
f" trust={row['trust_score']} pr={row['source_pr_risk']} "
|
|
f"avg_constructive={row['avg_constructive']} "
|
|
f"avg_cortisol={row['avg_cortisol']} "
|
|
f"avg_ragebait={row['avg_ragebait']}"
|
|
)
|
|
print(f" newest={row['newest_article'] or 'none'}")
|
|
if row["review_flag"]:
|
|
print(f" ⚑ review: {row['review_reason']}")
|
|
|
|
|
|
def list_runs(conn: sqlite3.Connection, limit: int) -> None:
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT r.id, r.started_at, r.finished_at, r.status, src.name AS source_name,
|
|
r.items_seen, r.items_inserted, r.items_duplicate, r.error
|
|
FROM ingest_runs r
|
|
LEFT JOIN sources src ON src.id = r.source_id
|
|
ORDER BY r.id DESC
|
|
LIMIT ?
|
|
""",
|
|
(limit,),
|
|
).fetchall()
|
|
for row in rows:
|
|
print(
|
|
f"[{row['id']}] {row['status']} | {row['source_name'] or 'unknown'} | "
|
|
f"seen={row['items_seen']} inserted={row['items_inserted']} duplicate={row['items_duplicate']}"
|
|
)
|
|
if row["error"]:
|
|
print(f" error: {row['error']}")
|
|
|
|
|
|
def rescore_articles(conn: sqlite3.Connection) -> int:
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT a.id, a.title, a.description, src.pr_risk_score
|
|
FROM articles a
|
|
JOIN sources src ON src.id = a.source_id
|
|
ORDER BY a.id
|
|
"""
|
|
).fetchall()
|
|
for row in rows:
|
|
scores = score_article(row["title"], row["description"], int(row["pr_risk_score"]))
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO article_scores (
|
|
article_id, constructive_score, cortisol_score, ragebait_score,
|
|
agency_score, human_benefit_score, novelty_score, pr_risk_score,
|
|
accepted, reason_code, reason_text, model_name, scored_at
|
|
)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
|
|
ON CONFLICT(article_id) DO UPDATE SET
|
|
constructive_score = excluded.constructive_score,
|
|
cortisol_score = excluded.cortisol_score,
|
|
ragebait_score = excluded.ragebait_score,
|
|
agency_score = excluded.agency_score,
|
|
human_benefit_score = excluded.human_benefit_score,
|
|
novelty_score = excluded.novelty_score,
|
|
pr_risk_score = excluded.pr_risk_score,
|
|
accepted = excluded.accepted,
|
|
reason_code = excluded.reason_code,
|
|
reason_text = excluded.reason_text,
|
|
model_name = excluded.model_name,
|
|
scored_at = CURRENT_TIMESTAMP
|
|
""",
|
|
(
|
|
row["id"],
|
|
scores["constructive_score"],
|
|
scores["cortisol_score"],
|
|
scores["ragebait_score"],
|
|
scores["agency_score"],
|
|
scores["human_benefit_score"],
|
|
scores["novelty_score"],
|
|
scores["pr_risk_score"],
|
|
scores["accepted"],
|
|
scores["reason_code"],
|
|
scores["reason_text"],
|
|
scores["model_name"],
|
|
),
|
|
)
|
|
conn.commit()
|
|
return len(rows)
|
|
|
|
|
|
def print_brief(rows: list[sqlite3.Row]) -> None:
|
|
if not rows:
|
|
print("No brief items found.")
|
|
return
|
|
date = rows[0]["brief_date"]
|
|
print(f"Highlights from Today - {date}")
|
|
for row in rows:
|
|
print(f"{row['rank']}. {row['title']}")
|
|
print(f" {row['source_name']} | {row['default_category']} | {row['model_name']}")
|
|
print(f" reason: {row['reason_code']}")
|
|
print(f" {row['canonical_url']}")
|
|
|
|
|
|
def _format_result(result: dict) -> str:
|
|
if "sources" in result:
|
|
return (
|
|
f"Polled {result['sources']} sources: seen={result['seen']} "
|
|
f"inserted={result['inserted']} duplicate={result['duplicate']} failed={result['failed']}"
|
|
)
|
|
if result.get("status") == "failed":
|
|
return (
|
|
f"Poll failed: seen={result['seen']} inserted={result['inserted']} "
|
|
f"duplicate={result['duplicate']} error={result['error']}"
|
|
)
|
|
return (
|
|
f"Poll ok: seen={result['seen']} inserted={result['inserted']} "
|
|
f"duplicate={result['duplicate']}"
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|