from __future__ import annotations import json import sqlite3 import tomllib from datetime import datetime, timezone from pathlib import Path from urllib.parse import urlsplit from .paywall import is_paywalled, is_paywalled_for_source def load_sources(path: Path | str) -> list[dict]: data = tomllib.loads(Path(path).read_text(encoding="utf-8")) sources = data.get("sources", []) if not isinstance(sources, list): raise ValueError("sources.toml must contain [[sources]] entries") return sources def upsert_sources(conn: sqlite3.Connection, source_defs: list[dict]) -> int: count = 0 for source in source_defs: # Keep status and the legacy `active` mirror in lockstep (Phase 1 rule): # derive status from an explicit value or from active, then mirror active. status = source.get("status") or ("active" if source.get("active", True) else "paused") if status not in ("active", "paused", "retired"): status = "active" active = 1 if status == "active" else 0 conn.execute( """ INSERT INTO sources ( name, homepage_url, feed_url, source_type, default_category, trust_score, pr_risk_score, active, status, poll_interval_minutes, notes, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP) ON CONFLICT(feed_url) DO UPDATE SET name = excluded.name, homepage_url = excluded.homepage_url, source_type = excluded.source_type, default_category = excluded.default_category, trust_score = excluded.trust_score, pr_risk_score = excluded.pr_risk_score, active = excluded.active, status = excluded.status, poll_interval_minutes = excluded.poll_interval_minutes, notes = excluded.notes, updated_at = CURRENT_TIMESTAMP """, ( source["name"], source.get("homepage_url"), source["feed_url"], source.get("source_type", "rss"), source.get("default_category"), int(source.get("trust_score", 5)), int(source.get("pr_risk_score", 3)), active, status, int(source.get("poll_interval_minutes", 60)), source.get("notes"), ), ) count += 1 conn.commit() return count # --- Duplicate detection (catch the same feed added twice) -------------------- class DuplicateFeedError(Exception): """Raised when an operation would create a second copy of an existing feed. Carries the existing match so the caller can name it in the response.""" def __init__(self, existing: dict): self.existing = existing super().__init__(f"feed already exists as {existing['kind']} “{existing['name']}”") def feed_key(url: str) -> str: """A loose comparison key for spotting the same feed added twice despite trivial differences (scheme, www, trailing slash, host case). Compare-only — the feed_url is always STORED exactly as entered; this just powers dup checks. Only the host is lowercased: URL paths/queries can be case-significant.""" try: p = urlsplit((url or "").strip()) host = p.netloc.lower().removeprefix("www.") path = p.path.rstrip("/") return host + path + (("?" + p.query) if p.query else "") except Exception: # noqa: BLE001 — never let a weird URL break add return (url or "").strip().lower() def find_existing_feed(conn: sqlite3.Connection, url: str) -> dict | None: """Is this feed already a live source or a pending candidate? Matches on the loose key, so http/https + www + trailing-slash variants are all caught.""" key = feed_key(url) for r in conn.execute("SELECT id, name, feed_url, status FROM sources"): if feed_key(r["feed_url"]) == key: return {"kind": "source", "id": r["id"], "name": r["name"], "status": r["status"]} for r in conn.execute( "SELECT id, name, feed_url, status FROM source_candidates WHERE status NOT IN ('rejected','promoted')" ): if feed_key(r["feed_url"]) == key: return {"kind": "candidate", "id": r["id"], "name": r["name"] or r["feed_url"], "status": r["status"]} return None # --- Supervised source candidates (staging before the real sources table) ---- def save_candidate( conn: sqlite3.Connection, feed_url: str, preview: dict | None = None, name: str | None = None, homepage_url: str | None = None, status: str = "quarantined", notes: str | None = None, ) -> sqlite3.Row: """Stage a suggested feed (with an optional preview snapshot) for review. Re-previewing an existing candidate refreshes its snapshot but never changes a status a curator already set (e.g. a rejected feed stays rejected). """ conn.execute( """ INSERT INTO source_candidates ( feed_url, homepage_url, name, status, preview_json, notes, last_previewed_at, updated_at ) VALUES (?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) ON CONFLICT(feed_url) DO UPDATE SET preview_json = excluded.preview_json, name = COALESCE(excluded.name, source_candidates.name), homepage_url = COALESCE(excluded.homepage_url, source_candidates.homepage_url), notes = COALESCE(excluded.notes, source_candidates.notes), last_previewed_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP """, (feed_url, homepage_url, name, status, json.dumps(preview) if preview else None, notes), ) conn.commit() return conn.execute("SELECT * FROM source_candidates WHERE feed_url = ?", (feed_url,)).fetchone() def list_candidates(conn: sqlite3.Connection, status: str | None = None) -> list[sqlite3.Row]: if status: return conn.execute( "SELECT * FROM source_candidates WHERE status = ? ORDER BY updated_at DESC", (status,) ).fetchall() return conn.execute("SELECT * FROM source_candidates ORDER BY updated_at DESC").fetchall() def rename_candidate(conn: sqlite3.Connection, candidate_id: int, name: str | None) -> sqlite3.Row: """Fix a staged candidate's display name without re-fetching it. An empty name clears it (promote then derives one from the feed host).""" cur = conn.execute( "UPDATE source_candidates SET name = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ?", (name or None, candidate_id), ) conn.commit() if cur.rowcount == 0: raise ValueError(f"no candidate with id {candidate_id}") return conn.execute("SELECT * FROM source_candidates WHERE id = ?", (candidate_id,)).fetchone() def reject_candidate(conn: sqlite3.Connection, candidate_id: int) -> bool: cur = conn.execute( "UPDATE source_candidates SET status = 'rejected', updated_at = CURRENT_TIMESTAMP WHERE id = ?", (candidate_id,), ) conn.commit() return cur.rowcount > 0 def restore_candidate(conn: sqlite3.Connection, candidate_id: int) -> bool: """Send a REJECTED candidate back to staging ('suggested') so it re-enters the queue for another look. Only un-rejects — a promoted candidate is untouched.""" cur = conn.execute( "UPDATE source_candidates SET status = 'suggested', updated_at = CURRENT_TIMESTAMP " "WHERE id = ? AND status = 'rejected'", (candidate_id,), ) conn.commit() return cur.rowcount > 0 def promote_candidate( conn: sqlite3.Connection, candidate_id: int, active: bool = False, default_category: str | None = None, trust_score: int = 5, pr_risk_score: int = 3, poll_interval_minutes: int = 180, ) -> int: """Copy a reviewed candidate into the real sources table. Inactive by default (active-on-approval): a promoted feed is wired up but won't be polled until explicitly activated. Never called automatically. """ cand = conn.execute("SELECT * FROM source_candidates WHERE id = ?", (candidate_id,)).fetchone() if cand is None: raise ValueError(f"no candidate with id {candidate_id}") # Re-check duplicates at promote time too — the add-time guard can be bypassed # by old/CLI/direct-DB candidates or a race, and upsert_sources would silently # overwrite the existing source's settings. (sources are scanned first, so a # real source collision wins over this candidate matching itself.) existing = find_existing_feed(conn, cand["feed_url"]) if existing and existing["kind"] == "source": raise DuplicateFeedError(existing) name = cand["name"] or urlsplit(cand["feed_url"]).netloc or cand["feed_url"] upsert_sources( conn, [ { "name": name, "feed_url": cand["feed_url"], "homepage_url": cand["homepage_url"], "default_category": default_category, "trust_score": trust_score, "pr_risk_score": pr_risk_score, "active": active, "poll_interval_minutes": poll_interval_minutes, "notes": f"promoted from candidate {candidate_id}", } ], ) conn.execute( "UPDATE source_candidates SET status = 'promoted', updated_at = CURRENT_TIMESTAMP WHERE id = ?", (candidate_id,), ) conn.commit() row = conn.execute("SELECT id FROM sources WHERE feed_url = ?", (cand["feed_url"],)).fetchone() return int(row["id"]) # --- Advisory source health: flag for review, never auto-deactivate ----------- def review_sources( conn: sqlite3.Connection, stale_days: int = 14, min_recent: int = 15, recent_window: int = 40, ) -> list[dict]: """Recompute advisory review flags for active sources. Sets review_flag/review_reason but NEVER changes `active` — the human stays in the loop. Returns the list of newly-flagged sources. """ now = datetime.now(timezone.utc) flagged = [] sources = conn.execute( "SELECT id, name, consecutive_failures, paywall_override FROM sources WHERE active = 1" ).fetchall() for s in sources: reasons: list[str] = [] if (s["consecutive_failures"] or 0) >= 3: reasons.append(f"failing ({s['consecutive_failures']} consecutive)") recent = conn.execute( """ SELECT sc.accepted, sc.cortisol_score, sc.ragebait_score, a.duplicate_of, a.canonical_url, COALESCE(a.published_at, a.discovered_at) AS dt FROM articles a JOIN article_scores sc ON sc.article_id = a.id WHERE a.source_id = ? ORDER BY COALESCE(a.published_at, a.discovered_at) DESC LIMIT ? """, (s["id"], recent_window), ).fetchall() n = len(recent) if n == 0: reasons.append("no articles yet") else: try: newest = datetime.fromisoformat(recent[0]["dt"]) if newest.tzinfo is None: newest = newest.replace(tzinfo=timezone.utc) age = (now - newest).days if age > stale_days: reasons.append(f"stale (newest {age}d ago)") except (ValueError, TypeError): pass if n >= min_recent: acc = sum(r["accepted"] or 0 for r in recent) / n if acc < 0.10: reasons.append(f"low acceptance ({acc * 100:.0f}%)") dup = sum(1 for r in recent if r["duplicate_of"] is not None) / n if dup > 0.5: reasons.append(f"duplicate-heavy ({dup * 100:.0f}%)") avg_cort = sum(r["cortisol_score"] or 0 for r in recent) / n if avg_cort > 5: reasons.append(f"high cortisol (avg {avg_cort:.1f})") avg_rage = sum(r["ragebait_score"] or 0 for r in recent) / n if avg_rage > 3: reasons.append(f"high ragebait (avg {avg_rage:.1f})") paywalled = sum(1 for r in recent if is_paywalled_for_source(r["canonical_url"], s["paywall_override"])) / n if paywalled > 0.5: reasons.append(f"paywall-heavy ({paywalled * 100:.0f}%)") flag = 1 if reasons else 0 reason = "; ".join(reasons) if reasons else None conn.execute( "UPDATE sources SET review_flag = ?, review_reason = ? WHERE id = ?", (flag, reason, s["id"]), ) if flag: flagged.append({"id": s["id"], "name": s["name"], "reason": reason}) conn.commit() return flagged