Files
upbeatBytes/goodnews/feeds.py
T
thejayman77 1e190c5e88 Advisory source health: review flags, never auto-deactivate
- Add source health columns (last_success_at, last_error_at, last_error,
  consecutive_failures, review_flag, review_reason) via SCHEMA + migration.
- poll_source maintains them: success resets the failure streak and records the
  success time; failure increments it and stores the latest error.
- review_sources() flags active sources that are stale, repeatedly failing,
  low-acceptance, duplicate-heavy, or doom-skewed (high cortisol/ragebait) over
  a recent window. It is purely advisory: it sets review_flag/review_reason and
  never changes the active column (human stays in the loop), clearing the flag
  when a source recovers.
- CLI review-sources; cycle runs it as a final step (--no-review to skip);
  source-report shows a review line for flagged feeds.
- Tests: healthy/failing/stale/low-acceptance/recovery and never-deactivates.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-30 20:28:35 +00:00

475 lines
15 KiB
Python

from __future__ import annotations
import email.utils
import sqlite3
import urllib.error
import urllib.request
import xml.etree.ElementTree as ET
from collections import Counter
from dataclasses import dataclass
from datetime import UTC, datetime
from .scoring import score_article
from .text import canonicalize_url, clean_text, sha256_text
USER_AGENT = "goodNews/0.1 (+local constructive news prototype)"
@dataclass
class FeedItem:
title: str
url: str
description: str | None = None
author: str | None = None
published_at: str | None = None
image_url: str | None = None
language: str | None = None
raw_guid: str | None = None
def poll_all_sources(conn: sqlite3.Connection, limit: int | None = None) -> dict:
return _poll_rows(conn, conn.execute(
"SELECT * FROM sources WHERE active = 1 ORDER BY id"
).fetchall(), limit)
def poll_due_sources(conn: sqlite3.Connection, limit: int | None = None) -> dict:
"""Poll only active sources whose last successful poll is older than their
poll_interval_minutes (or that have never been polled successfully).
This is what makes poll_interval_minutes meaningful and lets a scheduler run
frequently without re-hitting feeds that are not yet due.
"""
rows = conn.execute(
"""
SELECT s.*
FROM sources s
WHERE s.active = 1
AND (
NOT EXISTS (
SELECT 1 FROM ingest_runs r
WHERE r.source_id = s.id AND r.status = 'ok'
)
OR (
SELECT MAX(r.finished_at) FROM ingest_runs r
WHERE r.source_id = s.id AND r.status = 'ok'
) <= datetime('now', '-' || s.poll_interval_minutes || ' minutes')
)
ORDER BY s.id
"""
).fetchall()
return _poll_rows(conn, rows, limit)
def _poll_rows(conn: sqlite3.Connection, rows: list[sqlite3.Row], limit: int | None) -> dict:
if limit is not None:
rows = rows[:limit]
totals = {"sources": 0, "seen": 0, "inserted": 0, "duplicate": 0, "failed": 0}
for source in rows:
result = poll_source(conn, source)
totals["sources"] += 1
totals["seen"] += result["seen"]
totals["inserted"] += result["inserted"]
totals["duplicate"] += result["duplicate"]
totals["failed"] += 1 if result["status"] == "failed" else 0
return totals
def poll_source(conn: sqlite3.Connection, source: sqlite3.Row) -> dict:
run_id = conn.execute(
"INSERT INTO ingest_runs (source_id) VALUES (?)",
(source["id"],),
).lastrowid
conn.commit()
seen = inserted = duplicate = 0
try:
xml = fetch_feed(source["feed_url"])
items = parse_feed(xml)
seen = len(items)
for item in items:
inserted_now = insert_article(conn, source, item)
if inserted_now:
inserted += 1
else:
duplicate += 1
conn.execute(
"""
UPDATE ingest_runs
SET finished_at = CURRENT_TIMESTAMP,
status = 'ok',
items_seen = ?,
items_inserted = ?,
items_duplicate = ?
WHERE id = ?
""",
(seen, inserted, duplicate, run_id),
)
# A clean poll resets the failure streak and records the success time.
conn.execute(
"""
UPDATE sources
SET last_success_at = CURRENT_TIMESTAMP, consecutive_failures = 0
WHERE id = ?
""",
(source["id"],),
)
conn.commit()
return {"status": "ok", "seen": seen, "inserted": inserted, "duplicate": duplicate}
except Exception as exc:
conn.execute(
"""
UPDATE ingest_runs
SET finished_at = CURRENT_TIMESTAMP,
status = 'failed',
items_seen = ?,
items_inserted = ?,
items_duplicate = ?,
error = ?
WHERE id = ?
""",
(seen, inserted, duplicate, str(exc), run_id),
)
# Track the failure streak and latest error for advisory review flags.
conn.execute(
"""
UPDATE sources
SET consecutive_failures = consecutive_failures + 1,
last_error_at = CURRENT_TIMESTAMP,
last_error = ?
WHERE id = ?
""",
(str(exc), source["id"]),
)
conn.commit()
return {
"status": "failed",
"seen": seen,
"inserted": inserted,
"duplicate": duplicate,
"error": str(exc),
}
def preview_feed(url: str, sample: int = 25, pr_risk_default: int = 3, client=None) -> dict:
"""Fetch and score a sample of a feed WITHOUT persisting anything.
Read-only: lets a user vet a candidate source before it is ever added. By
default it uses the fast heuristic; pass an LLM client to also get the
topic/flavor mix and the model's acceptance view (slower).
"""
items = parse_feed(fetch_feed(url))
rows = []
for item in items[:sample]:
title = clean_text(item.title, max_len=500)
if not title:
continue
description = clean_text(item.description, max_len=1000)
s = score_article(title, description, pr_risk_default)
rows.append(
{
"title": title,
"description": description,
"url": canonicalize_url(item.url),
"published_at": item.published_at,
"accepted": bool(s["accepted"]),
"cortisol": s["cortisol_score"],
"ragebait": s["ragebait_score"],
"pr_risk": s["pr_risk_score"],
"reason_code": s["reason_code"],
"topic": None,
"flavor": None,
}
)
classified = False
if client and rows:
from .llm import normalize_scores
classified = True
for r in rows:
try:
raw = client.classify(
{
"source_name": "preview",
"default_category": None,
"source_trust_score": 5,
"source_pr_risk_score": pr_risk_default,
"published_at": r["published_at"],
"title": r["title"],
"description": r["description"] or "",
"canonical_url": r["url"],
}
)
ns = normalize_scores(raw, model_name=client.model)
r.update(
accepted=bool(ns["accepted"]),
topic=ns["topic"],
flavor=ns["flavor"],
cortisol=ns["cortisol_score"],
ragebait=ns["ragebait_score"],
pr_risk=ns["pr_risk_score"],
)
except Exception:
pass # one bad item shouldn't sink the whole preview
total = len(rows)
accepted = sum(1 for r in rows if r["accepted"])
def _avg(key: str) -> float:
return round(sum(r[key] for r in rows) / total, 1) if total else 0.0
# Freshness: newest item and how many landed in the last week.
now = datetime.now(UTC)
dates = []
for r in rows:
if r["published_at"]:
try:
dates.append(datetime.fromisoformat(r["published_at"]))
except ValueError:
pass
newest = max(dates).isoformat() if dates else None
recent_7d = sum(1 for d in dates if (now - d).days <= 7)
return {
"url": url,
"sampled": total,
"classified": classified,
"accepted": accepted,
"acceptance_rate": round(accepted / total, 2) if total else 0.0,
"avg_cortisol": _avg("cortisol"),
"avg_ragebait": _avg("ragebait"),
"avg_pr_risk": _avg("pr_risk"),
"newest_published": newest,
"recent_7d": recent_7d,
"topic_mix": dict(Counter(r["topic"] for r in rows if r["topic"])),
"flavor_mix": dict(Counter(r["flavor"] for r in rows if r["flavor"])),
"examples_accepted": [r["title"] for r in rows if r["accepted"]][:5],
"examples_rejected": [
{"title": r["title"], "reason": r["reason_code"]} for r in rows if not r["accepted"]
][:5],
}
def fetch_feed(url: str, timeout: int = 20) -> bytes:
request = urllib.request.Request(url, headers={"User-Agent": USER_AGENT})
try:
with urllib.request.urlopen(request, timeout=timeout) as response:
return response.read()
except urllib.error.HTTPError as exc:
raise RuntimeError(f"HTTP {exc.code} fetching {url}") from exc
except urllib.error.URLError as exc:
raise RuntimeError(f"failed fetching {url}: {exc.reason}") from exc
def parse_feed(xml: bytes) -> list[FeedItem]:
root = ET.fromstring(xml)
root_name = _local_name(root.tag)
if root_name == "feed":
return _parse_atom(root)
return _parse_rss(root)
def insert_article(conn: sqlite3.Connection, source: sqlite3.Row, item: FeedItem) -> bool:
canonical_url = canonicalize_url(item.url)
if not canonical_url or not item.title:
return False
title = clean_text(item.title, max_len=500)
description = clean_text(item.description, max_len=1000)
if not title:
return False
url_hash = sha256_text(canonical_url)
title_hash = sha256_text(title)
try:
cursor = conn.execute(
"""
INSERT INTO articles (
source_id, canonical_url, title, description, author,
published_at, image_url, language, raw_guid, url_hash, title_hash
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
source["id"],
canonical_url,
title,
description,
clean_text(item.author, max_len=250),
item.published_at,
canonicalize_url(item.image_url),
item.language,
item.raw_guid,
url_hash,
title_hash,
),
)
except sqlite3.IntegrityError:
return False
scores = score_article(title, description, int(source["pr_risk_score"]))
conn.execute(
"""
INSERT INTO article_scores (
article_id, constructive_score, cortisol_score, ragebait_score,
agency_score, human_benefit_score, novelty_score, pr_risk_score,
accepted, reason_code, reason_text, model_name
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
cursor.lastrowid,
scores["constructive_score"],
scores["cortisol_score"],
scores["ragebait_score"],
scores["agency_score"],
scores["human_benefit_score"],
scores["novelty_score"],
scores["pr_risk_score"],
scores["accepted"],
scores["reason_code"],
scores["reason_text"],
scores["model_name"],
),
)
conn.commit()
return True
def _parse_rss(root: ET.Element) -> list[FeedItem]:
channel = _first_child(root, "channel")
if channel is None:
channel = root
language = _first_text(channel, "language")
items = [element for element in root.iter() if _local_name(element.tag) == "item"]
parsed = []
for item in items:
title = _first_text(item, "title")
link = _first_text(item, "link")
guid = _first_text(item, "guid")
url = link or guid
if not title or not url:
continue
parsed.append(
FeedItem(
title=title,
url=url,
description=_first_text(item, "description", "summary", "encoded"),
author=_first_text(item, "author", "creator"),
published_at=_parse_date(_first_text(item, "pubDate", "published", "updated", "date")),
image_url=_find_image_url(item),
language=language,
raw_guid=guid,
)
)
return parsed
def _parse_atom(root: ET.Element) -> list[FeedItem]:
language = root.attrib.get("{http://www.w3.org/XML/1998/namespace}lang")
entries = [element for element in root if _local_name(element.tag) == "entry"]
parsed = []
for entry in entries:
title = _first_text(entry, "title")
url = _atom_link(entry)
if not title or not url:
continue
author = None
author_el = _first_child(entry, "author")
if author_el is not None:
author = _first_text(author_el, "name") or _text(author_el)
parsed.append(
FeedItem(
title=title,
url=url,
description=_first_text(entry, "summary", "content"),
author=author,
published_at=_parse_date(_first_text(entry, "published", "updated")),
image_url=_find_image_url(entry),
language=language,
raw_guid=_first_text(entry, "id"),
)
)
return parsed
def _atom_link(entry: ET.Element) -> str | None:
fallback = None
for child in entry:
if _local_name(child.tag) != "link":
continue
href = child.attrib.get("href")
if not href:
continue
if child.attrib.get("rel", "alternate") == "alternate":
return href
fallback = fallback or href
return fallback
def _find_image_url(element: ET.Element) -> str | None:
for child in element.iter():
name = _local_name(child.tag)
if name in {"thumbnail", "content"} and child.attrib.get("url"):
if child.attrib.get("medium") in {None, "image"}:
return child.attrib["url"]
if name == "enclosure" and child.attrib.get("url"):
mime = child.attrib.get("type", "")
if mime.startswith("image/"):
return child.attrib["url"]
return None
def _parse_date(value: str | None) -> str | None:
if not value:
return None
value = value.strip()
try:
parsed = email.utils.parsedate_to_datetime(value)
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=UTC)
return parsed.astimezone(UTC).isoformat()
except (TypeError, ValueError):
pass
try:
parsed = datetime.fromisoformat(value.replace("Z", "+00:00"))
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=UTC)
return parsed.astimezone(UTC).isoformat()
except ValueError:
return None
def _first_child(element: ET.Element, name: str) -> ET.Element | None:
for child in element:
if _local_name(child.tag) == name:
return child
return None
def _first_text(element: ET.Element, *names: str) -> str | None:
for child in element:
if _local_name(child.tag) in names:
value = _text(child)
if value:
return value
return None
def _text(element: ET.Element) -> str | None:
if element.text:
return element.text.strip()
return None
def _local_name(tag: str) -> str:
if "}" in tag:
return tag.rsplit("}", 1)[1]
return tag