diff --git a/README.md b/README.md index f058c9d..cb92c02 100644 --- a/README.md +++ b/README.md @@ -22,6 +22,7 @@ python3 -m goodnews suggest-source https://example.com/feed/ --name "Example" -- python3 -m goodnews list-candidates python3 -m goodnews promote-candidate 1 # copies into sources (inactive by default) python3 -m goodnews reject-candidate 1 +python3 -m goodnews review-sources # advisory health flags (never deactivates) python3 -m goodnews build-brief --date 2026-05-27 --replace python3 -m goodnews show-brief python3 -m goodnews list-recent --limit 10 @@ -161,7 +162,7 @@ often as you like — it only polls sources that are *due* (per each source's rebuilds the current day's brief: ```bash -python3 -m goodnews cycle # poll due -> classify new -> dedup -> rebuild today's brief +python3 -m goodnews cycle # poll due -> classify -> dedup -> brief -> review flags python3 -m goodnews cycle --force # poll every active source regardless of interval python3 -m goodnews cycle --no-classify # skip the LLM step (e.g. model box offline) ``` @@ -195,9 +196,11 @@ Still ahead: previews a feed and stages it in the `source_candidates` table (status suggested/quarantined/rejected/promoted); `promote-candidate` copies it into `sources` (inactive by default — active on approval); promotion is never - automatic. Still ahead: advisory auto-degrade of stale/rejecting feeds (flag - for review, never auto-block), and an authenticated POST surface so the website - can accept public suggestions once accounts exist. + automatic. Advisory health is done too: `review-sources` (also run at the end + of `cycle`) flags stale, failing, low-acceptance, duplicate-heavy, or + doom-skewed feeds for human review — it never deactivates anything. Still + ahead: an authenticated POST surface so the website can accept public + suggestions once accounts exist. 2. **Learned "Less like this" weighting** — replace the interim flavor-pause with real preference down-ranking. 3. **Corpus rebalancing** — add calm/feelgood sources (currently science-heavy). diff --git a/goodnews/cli.py b/goodnews/cli.py index 8863054..9654ed0 100644 --- a/goodnews/cli.py +++ b/goodnews/cli.py @@ -24,6 +24,7 @@ from .sources import ( load_sources, promote_candidate, reject_candidate, + review_sources, save_candidate, upsert_sources, ) @@ -95,6 +96,11 @@ def main() -> None: reject_parser = subparsers.add_parser("reject-candidate", help="Mark a candidate as rejected") reject_parser.add_argument("id", type=int) + review_parser = subparsers.add_parser( + "review-sources", help="Recompute advisory review flags (never deactivates anything)" + ) + review_parser.add_argument("--stale-days", type=int, default=14) + runs_parser = subparsers.add_parser("list-runs", help="Show recent ingest runs") runs_parser.add_argument("--limit", type=int, default=20) @@ -114,6 +120,7 @@ def main() -> None: cycle_parser.add_argument("--no-classify", action="store_true", help="Skip the LLM classify step") cycle_parser.add_argument("--no-dedup", action="store_true", help="Skip the embedding dedup step") cycle_parser.add_argument("--no-brief", action="store_true", help="Skip rebuilding today's brief") + cycle_parser.add_argument("--no-review", action="store_true", help="Skip recomputing source review flags") cycle_parser.add_argument("--force", action="store_true", help="Poll all active sources, ignoring intervals") cycle_parser.add_argument("--base-url", help="OpenAI-compatible base URL for classify") cycle_parser.add_argument("--model", help="Local model name for classify") @@ -218,6 +225,15 @@ def main() -> None: init_db(conn) ok = reject_candidate(conn, args.id) print(f"Rejected candidate #{args.id}." if ok else f"No candidate #{args.id}.") + elif args.command == "review-sources": + init_db(conn) + flagged = review_sources(conn, stale_days=args.stale_days) + if not flagged: + print("All active sources look healthy.") + else: + print(f"{len(flagged)} source(s) flagged for review (advisory — none deactivated):") + for f in flagged: + print(f" [{f['id']}] {f['name']}: {f['reason']}") elif args.command == "list-runs": list_runs(conn, limit=args.limit) elif args.command == "rescore": @@ -430,6 +446,13 @@ def _run_cycle_locked(conn: sqlite3.Connection, args: argparse.Namespace) -> Non except Exception as exc: print(f"brief: skipped ({exc})") + if not args.no_review: + try: + flagged = review_sources(conn) + print(f"review: {len(flagged)} source(s) flagged for review (advisory)") + except Exception as exc: + print(f"review: skipped ({exc})") + def serve(args: argparse.Namespace) -> None: try: @@ -530,6 +553,9 @@ def source_report(conn: sqlite3.Connection) -> None: src.default_category, src.trust_score, src.pr_risk_score AS source_pr_risk, + src.review_flag, + src.review_reason, + src.consecutive_failures, COUNT(a.id) AS articles, SUM(CASE WHEN s.accepted = 1 THEN 1 ELSE 0 END) AS accepted, ROUND(AVG(s.constructive_score), 1) AS avg_constructive, @@ -558,6 +584,8 @@ def source_report(conn: sqlite3.Connection) -> None: f"avg_ragebait={row['avg_ragebait']}" ) print(f" newest={row['newest_article'] or 'none'}") + if row["review_flag"]: + print(f" ⚑ review: {row['review_reason']}") def list_runs(conn: sqlite3.Connection, limit: int) -> None: diff --git a/goodnews/db.py b/goodnews/db.py index 7a8ae69..50973ec 100644 --- a/goodnews/db.py +++ b/goodnews/db.py @@ -19,6 +19,12 @@ CREATE TABLE IF NOT EXISTS sources ( active INTEGER NOT NULL DEFAULT 1, poll_interval_minutes INTEGER NOT NULL DEFAULT 60, notes TEXT, + last_success_at TEXT, + last_error_at TEXT, + last_error TEXT, + consecutive_failures INTEGER NOT NULL DEFAULT 0, + review_flag INTEGER NOT NULL DEFAULT 0, + review_reason TEXT, created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP ); @@ -148,3 +154,16 @@ def _migrate(conn: sqlite3.Connection) -> None: ) # Created here (not in SCHEMA) so it runs after the column exists on upgrades. conn.execute("CREATE INDEX IF NOT EXISTS idx_articles_duplicate_of ON articles(duplicate_of)") + + source_cols = {row["name"] for row in conn.execute("PRAGMA table_info(sources)")} + health_columns = { + "last_success_at": "TEXT", + "last_error_at": "TEXT", + "last_error": "TEXT", + "consecutive_failures": "INTEGER NOT NULL DEFAULT 0", + "review_flag": "INTEGER NOT NULL DEFAULT 0", + "review_reason": "TEXT", + } + for column, decl in health_columns.items(): + if column not in source_cols: + conn.execute(f"ALTER TABLE sources ADD COLUMN {column} {decl}") diff --git a/goodnews/feeds.py b/goodnews/feeds.py index 62582ac..2988b03 100644 --- a/goodnews/feeds.py +++ b/goodnews/feeds.py @@ -108,6 +108,15 @@ def poll_source(conn: sqlite3.Connection, source: sqlite3.Row) -> dict: """, (seen, inserted, duplicate, run_id), ) + # A clean poll resets the failure streak and records the success time. + conn.execute( + """ + UPDATE sources + SET last_success_at = CURRENT_TIMESTAMP, consecutive_failures = 0 + WHERE id = ? + """, + (source["id"],), + ) conn.commit() return {"status": "ok", "seen": seen, "inserted": inserted, "duplicate": duplicate} except Exception as exc: @@ -124,6 +133,17 @@ def poll_source(conn: sqlite3.Connection, source: sqlite3.Row) -> dict: """, (seen, inserted, duplicate, str(exc), run_id), ) + # Track the failure streak and latest error for advisory review flags. + conn.execute( + """ + UPDATE sources + SET consecutive_failures = consecutive_failures + 1, + last_error_at = CURRENT_TIMESTAMP, + last_error = ? + WHERE id = ? + """, + (str(exc), source["id"]), + ) conn.commit() return { "status": "failed", diff --git a/goodnews/sources.py b/goodnews/sources.py index 576753c..1f80530 100644 --- a/goodnews/sources.py +++ b/goodnews/sources.py @@ -3,6 +3,7 @@ from __future__ import annotations import json import sqlite3 import tomllib +from datetime import datetime, timezone from pathlib import Path from urllib.parse import urlsplit @@ -153,3 +154,82 @@ def promote_candidate( row = conn.execute("SELECT id FROM sources WHERE feed_url = ?", (cand["feed_url"],)).fetchone() return int(row["id"]) + +# --- Advisory source health: flag for review, never auto-deactivate ----------- + + +def review_sources( + conn: sqlite3.Connection, + stale_days: int = 14, + min_recent: int = 15, + recent_window: int = 40, +) -> list[dict]: + """Recompute advisory review flags for active sources. + + Sets review_flag/review_reason but NEVER changes `active` — the human stays + in the loop. Returns the list of newly-flagged sources. + """ + now = datetime.now(timezone.utc) + flagged = [] + sources = conn.execute( + "SELECT id, name, consecutive_failures FROM sources WHERE active = 1" + ).fetchall() + + for s in sources: + reasons: list[str] = [] + if (s["consecutive_failures"] or 0) >= 3: + reasons.append(f"failing ({s['consecutive_failures']} consecutive)") + + recent = conn.execute( + """ + SELECT sc.accepted, sc.cortisol_score, sc.ragebait_score, a.duplicate_of, + COALESCE(a.published_at, a.discovered_at) AS dt + FROM articles a + JOIN article_scores sc ON sc.article_id = a.id + WHERE a.source_id = ? + ORDER BY COALESCE(a.published_at, a.discovered_at) DESC + LIMIT ? + """, + (s["id"], recent_window), + ).fetchall() + n = len(recent) + + if n == 0: + reasons.append("no articles yet") + else: + try: + newest = datetime.fromisoformat(recent[0]["dt"]) + if newest.tzinfo is None: + newest = newest.replace(tzinfo=timezone.utc) + age = (now - newest).days + if age > stale_days: + reasons.append(f"stale (newest {age}d ago)") + except (ValueError, TypeError): + pass + + if n >= min_recent: + acc = sum(r["accepted"] or 0 for r in recent) / n + if acc < 0.10: + reasons.append(f"low acceptance ({acc * 100:.0f}%)") + dup = sum(1 for r in recent if r["duplicate_of"] is not None) / n + if dup > 0.5: + reasons.append(f"duplicate-heavy ({dup * 100:.0f}%)") + avg_cort = sum(r["cortisol_score"] or 0 for r in recent) / n + if avg_cort > 5: + reasons.append(f"high cortisol (avg {avg_cort:.1f})") + avg_rage = sum(r["ragebait_score"] or 0 for r in recent) / n + if avg_rage > 3: + reasons.append(f"high ragebait (avg {avg_rage:.1f})") + + flag = 1 if reasons else 0 + reason = "; ".join(reasons) if reasons else None + conn.execute( + "UPDATE sources SET review_flag = ?, review_reason = ? WHERE id = ?", + (flag, reason, s["id"]), + ) + if flag: + flagged.append({"id": s["id"], "name": s["name"], "reason": reason}) + + conn.commit() + return flagged + diff --git a/tests/test_review.py b/tests/test_review.py new file mode 100644 index 0000000..fa3b6f7 --- /dev/null +++ b/tests/test_review.py @@ -0,0 +1,89 @@ +from datetime import datetime, timedelta, timezone + +import pytest + +from goodnews.db import connect, init_db +from goodnews.sources import review_sources + + +@pytest.fixture +def conn(): + c = connect(":memory:") + init_db(c) + yield c + c.close() + + +def _source(conn, sid, name, failures=0): + conn.execute( + "INSERT INTO sources (id, name, feed_url, trust_score, consecutive_failures) VALUES (?,?,?,5,?)", + (sid, name, f"http://s{sid}/feed", failures), + ) + + +def _article(conn, sid, aid, when, accepted=1, cortisol=1, ragebait=0, dup=None): + conn.execute( + "INSERT INTO articles (id, source_id, canonical_url, title, published_at, url_hash, duplicate_of) " + "VALUES (?,?,?,?,?,?,?)", + (aid, sid, f"http://s{sid}/{aid}", f"t{aid}", when, f"h{aid}", dup), + ) + conn.execute( + "INSERT INTO article_scores (article_id, cortisol_score, ragebait_score, accepted) VALUES (?,?,?,?)", + (aid, cortisol, ragebait, accepted), + ) + + +def test_healthy_source_not_flagged(conn): + _source(conn, 1, "Healthy") + now = datetime.now(timezone.utc) + for i in range(20): + _article(conn, 1, i, now.isoformat(), accepted=1, cortisol=1, ragebait=0) + conn.commit() + assert review_sources(conn) == [] + assert conn.execute("SELECT review_flag FROM sources WHERE id=1").fetchone()["review_flag"] == 0 + + +def test_repeated_failures_flagged(conn): + _source(conn, 1, "Flaky", failures=4) + conn.commit() + flagged = review_sources(conn) + assert len(flagged) == 1 and "failing" in flagged[0]["reason"] + + +def test_stale_source_flagged(conn): + _source(conn, 1, "Stale") + old = (datetime.now(timezone.utc) - timedelta(days=40)).isoformat() + _article(conn, 1, 1, old) + conn.commit() + flagged = review_sources(conn) + assert "stale" in flagged[0]["reason"] + + +def test_low_acceptance_and_cortisol_flagged(conn): + _source(conn, 1, "Doomy") + now = datetime.now(timezone.utc).isoformat() + for i in range(20): + _article(conn, 1, i, now, accepted=0, cortisol=8, ragebait=0) + conn.commit() + reason = review_sources(conn)[0]["reason"] + assert "low acceptance" in reason and "high cortisol" in reason + + +def test_review_never_deactivates(conn): + _source(conn, 1, "Flaky", failures=9) + conn.commit() + review_sources(conn) + assert conn.execute("SELECT active FROM sources WHERE id=1").fetchone()["active"] == 1 + + +def test_recovered_source_flag_cleared(conn): + _source(conn, 1, "Recovered", failures=5) + conn.commit() + review_sources(conn) # flagged + conn.execute("UPDATE sources SET consecutive_failures=0 WHERE id=1") + now = datetime.now(timezone.utc).isoformat() + for i in range(20): + _article(conn, 1, i, now, accepted=1, cortisol=1) + conn.commit() + review_sources(conn) # should clear + assert conn.execute("SELECT review_flag FROM sources WHERE id=1").fetchone()["review_flag"] == 0