452e5a3fe4
Pre-traffic cleanup from an audit: * Scheduler: poll_due_sources now keys on the last *attempt* (success or failure), not the last success, and scales the wait by the consecutive- failure streak (capped at a day). A failing feed (e.g. Phys.org's HTTP 429s) used to be retried every cycle because it had no successful run; it now backs off and recovers on its own. Extracted due_source_rows() + tests. * FK hygiene: deleting a daily_brief is supposed to cascade to its items, but SQLite enforces foreign keys per-connection — connect() already sets the pragma, so the cascade is correct going forward; added a regression test. (Orphaned items + Phys.org settings were cleaned directly on the live DB.) * a11y: modal/drawer dialogs are now focusable (tabindex), close on Escape (window) and on backdrop click via a target check (dropping the inner stopPropagation handlers). Build is warning-free. * tests: conftest points any un-mocked LLM client at a closed port with a 1s timeout, so an accidental real call fails fast instead of hanging the suite. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
518 lines
17 KiB
Python
518 lines
17 KiB
Python
from __future__ import annotations
|
|
|
|
import email.utils
|
|
import re
|
|
import sqlite3
|
|
import urllib.error
|
|
import urllib.request
|
|
import xml.etree.ElementTree as ET
|
|
from collections import Counter
|
|
from dataclasses import dataclass
|
|
from datetime import UTC, datetime
|
|
|
|
from .scoring import score_article
|
|
from .text import canonicalize_url, clean_text, sha256_text
|
|
|
|
|
|
USER_AGENT = "goodNews/0.1 (+local constructive news prototype)"
|
|
|
|
|
|
@dataclass
|
|
class FeedItem:
|
|
title: str
|
|
url: str
|
|
description: str | None = None
|
|
author: str | None = None
|
|
published_at: str | None = None
|
|
image_url: str | None = None
|
|
language: str | None = None
|
|
raw_guid: str | None = None
|
|
|
|
|
|
def poll_all_sources(conn: sqlite3.Connection, limit: int | None = None) -> dict:
|
|
return _poll_rows(conn, conn.execute(
|
|
"SELECT * FROM sources WHERE active = 1 ORDER BY id"
|
|
).fetchall(), limit)
|
|
|
|
|
|
# A failing source backs off so it isn't re-hit every scheduler cycle: the
|
|
# effective wait grows with the failure streak, capped at a day. This keeps a
|
|
# rate-limited feed (HTTP 429) resting instead of hammered, and lets it recover
|
|
# on its own once the limit lifts.
|
|
MAX_BACKOFF_MINUTES = 1440
|
|
|
|
|
|
def poll_due_sources(conn: sqlite3.Connection, limit: int | None = None) -> dict:
|
|
"""Poll only active sources whose last *attempt* (success OR failure) is
|
|
older than their effective interval, or that have never been polled.
|
|
|
|
Keying on the last attempt — not the last success — is what stops a
|
|
perpetually-failing feed from being retried every cycle. The effective
|
|
interval is poll_interval_minutes scaled up by the consecutive-failure
|
|
streak (capped at MAX_BACKOFF_MINUTES), so healthy feeds keep their cadence
|
|
while broken ones step down to occasional retries.
|
|
"""
|
|
return _poll_rows(conn, due_source_rows(conn), limit)
|
|
|
|
|
|
def due_source_rows(conn: sqlite3.Connection) -> list[sqlite3.Row]:
|
|
"""Active sources currently due to poll (see poll_due_sources for the rule).
|
|
|
|
Split out so the due/backoff decision can be tested without the network.
|
|
"""
|
|
return conn.execute(
|
|
"""
|
|
SELECT s.*
|
|
FROM sources s
|
|
WHERE s.active = 1
|
|
AND (
|
|
NOT EXISTS (
|
|
SELECT 1 FROM ingest_runs r
|
|
WHERE r.source_id = s.id AND r.finished_at IS NOT NULL
|
|
)
|
|
OR (
|
|
SELECT MAX(r.finished_at) FROM ingest_runs r
|
|
WHERE r.source_id = s.id AND r.finished_at IS NOT NULL
|
|
) <= datetime(
|
|
'now',
|
|
'-' || MIN(?, s.poll_interval_minutes * (1 + s.consecutive_failures)) || ' minutes'
|
|
)
|
|
)
|
|
ORDER BY s.id
|
|
""",
|
|
(MAX_BACKOFF_MINUTES,),
|
|
).fetchall()
|
|
|
|
|
|
def _poll_rows(conn: sqlite3.Connection, rows: list[sqlite3.Row], limit: int | None) -> dict:
|
|
if limit is not None:
|
|
rows = rows[:limit]
|
|
|
|
totals = {"sources": 0, "seen": 0, "inserted": 0, "duplicate": 0, "failed": 0}
|
|
for source in rows:
|
|
result = poll_source(conn, source)
|
|
totals["sources"] += 1
|
|
totals["seen"] += result["seen"]
|
|
totals["inserted"] += result["inserted"]
|
|
totals["duplicate"] += result["duplicate"]
|
|
totals["failed"] += 1 if result["status"] == "failed" else 0
|
|
return totals
|
|
|
|
|
|
def poll_source(conn: sqlite3.Connection, source: sqlite3.Row) -> dict:
|
|
run_id = conn.execute(
|
|
"INSERT INTO ingest_runs (source_id) VALUES (?)",
|
|
(source["id"],),
|
|
).lastrowid
|
|
conn.commit()
|
|
|
|
seen = inserted = duplicate = 0
|
|
try:
|
|
xml = fetch_feed(source["feed_url"])
|
|
items = parse_feed(xml)
|
|
seen = len(items)
|
|
for item in items:
|
|
inserted_now = insert_article(conn, source, item)
|
|
if inserted_now:
|
|
inserted += 1
|
|
else:
|
|
duplicate += 1
|
|
|
|
conn.execute(
|
|
"""
|
|
UPDATE ingest_runs
|
|
SET finished_at = CURRENT_TIMESTAMP,
|
|
status = 'ok',
|
|
items_seen = ?,
|
|
items_inserted = ?,
|
|
items_duplicate = ?
|
|
WHERE id = ?
|
|
""",
|
|
(seen, inserted, duplicate, run_id),
|
|
)
|
|
# A clean poll resets the failure streak and records the success time.
|
|
conn.execute(
|
|
"""
|
|
UPDATE sources
|
|
SET last_success_at = CURRENT_TIMESTAMP, consecutive_failures = 0
|
|
WHERE id = ?
|
|
""",
|
|
(source["id"],),
|
|
)
|
|
conn.commit()
|
|
return {"status": "ok", "seen": seen, "inserted": inserted, "duplicate": duplicate}
|
|
except Exception as exc:
|
|
conn.execute(
|
|
"""
|
|
UPDATE ingest_runs
|
|
SET finished_at = CURRENT_TIMESTAMP,
|
|
status = 'failed',
|
|
items_seen = ?,
|
|
items_inserted = ?,
|
|
items_duplicate = ?,
|
|
error = ?
|
|
WHERE id = ?
|
|
""",
|
|
(seen, inserted, duplicate, str(exc), run_id),
|
|
)
|
|
# Track the failure streak and latest error for advisory review flags.
|
|
conn.execute(
|
|
"""
|
|
UPDATE sources
|
|
SET consecutive_failures = consecutive_failures + 1,
|
|
last_error_at = CURRENT_TIMESTAMP,
|
|
last_error = ?
|
|
WHERE id = ?
|
|
""",
|
|
(str(exc), source["id"]),
|
|
)
|
|
conn.commit()
|
|
return {
|
|
"status": "failed",
|
|
"seen": seen,
|
|
"inserted": inserted,
|
|
"duplicate": duplicate,
|
|
"error": str(exc),
|
|
}
|
|
|
|
|
|
def preview_feed(url: str, sample: int = 25, pr_risk_default: int = 3, client=None) -> dict:
|
|
"""Fetch and score a sample of a feed WITHOUT persisting anything.
|
|
|
|
Read-only: lets a user vet a candidate source before it is ever added. By
|
|
default it uses the fast heuristic; pass an LLM client to also get the
|
|
topic/flavor mix and the model's acceptance view (slower).
|
|
"""
|
|
items = parse_feed(fetch_feed(url))
|
|
rows = []
|
|
for item in items[:sample]:
|
|
title = clean_text(item.title, max_len=500)
|
|
if not title:
|
|
continue
|
|
description = clean_text(item.description, max_len=1000)
|
|
s = score_article(title, description, pr_risk_default)
|
|
rows.append(
|
|
{
|
|
"title": title,
|
|
"description": description,
|
|
"url": canonicalize_url(item.url),
|
|
"published_at": item.published_at,
|
|
"accepted": bool(s["accepted"]),
|
|
"cortisol": s["cortisol_score"],
|
|
"ragebait": s["ragebait_score"],
|
|
"pr_risk": s["pr_risk_score"],
|
|
"reason_code": s["reason_code"],
|
|
"topic": None,
|
|
"flavor": None,
|
|
}
|
|
)
|
|
|
|
classified = False
|
|
if client and rows:
|
|
from .llm import normalize_scores
|
|
|
|
classified = True
|
|
for r in rows:
|
|
try:
|
|
raw = client.classify(
|
|
{
|
|
"source_name": "preview",
|
|
"default_category": None,
|
|
"source_trust_score": 5,
|
|
"source_pr_risk_score": pr_risk_default,
|
|
"published_at": r["published_at"],
|
|
"title": r["title"],
|
|
"description": r["description"] or "",
|
|
"canonical_url": r["url"],
|
|
}
|
|
)
|
|
ns = normalize_scores(raw, model_name=client.model)
|
|
r.update(
|
|
accepted=bool(ns["accepted"]),
|
|
topic=ns["topic"],
|
|
flavor=ns["flavor"],
|
|
cortisol=ns["cortisol_score"],
|
|
ragebait=ns["ragebait_score"],
|
|
pr_risk=ns["pr_risk_score"],
|
|
)
|
|
except Exception:
|
|
pass # one bad item shouldn't sink the whole preview
|
|
|
|
total = len(rows)
|
|
accepted = sum(1 for r in rows if r["accepted"])
|
|
|
|
def _avg(key: str) -> float:
|
|
return round(sum(r[key] for r in rows) / total, 1) if total else 0.0
|
|
|
|
# Freshness: newest item and how many landed in the last week.
|
|
now = datetime.now(UTC)
|
|
dates = []
|
|
for r in rows:
|
|
if r["published_at"]:
|
|
try:
|
|
dates.append(datetime.fromisoformat(r["published_at"]))
|
|
except ValueError:
|
|
pass
|
|
newest = max(dates).isoformat() if dates else None
|
|
recent_7d = sum(1 for d in dates if (now - d).days <= 7)
|
|
|
|
return {
|
|
"url": url,
|
|
"sampled": total,
|
|
"classified": classified,
|
|
"accepted": accepted,
|
|
"acceptance_rate": round(accepted / total, 2) if total else 0.0,
|
|
"avg_cortisol": _avg("cortisol"),
|
|
"avg_ragebait": _avg("ragebait"),
|
|
"avg_pr_risk": _avg("pr_risk"),
|
|
"newest_published": newest,
|
|
"recent_7d": recent_7d,
|
|
"topic_mix": dict(Counter(r["topic"] for r in rows if r["topic"])),
|
|
"flavor_mix": dict(Counter(r["flavor"] for r in rows if r["flavor"])),
|
|
"examples_accepted": [r["title"] for r in rows if r["accepted"]][:5],
|
|
"examples_rejected": [
|
|
{"title": r["title"], "reason": r["reason_code"]} for r in rows if not r["accepted"]
|
|
][:5],
|
|
}
|
|
|
|
|
|
def fetch_feed(url: str, timeout: int = 20) -> bytes:
|
|
request = urllib.request.Request(url, headers={"User-Agent": USER_AGENT})
|
|
try:
|
|
with urllib.request.urlopen(request, timeout=timeout) as response:
|
|
return response.read()
|
|
except urllib.error.HTTPError as exc:
|
|
raise RuntimeError(f"HTTP {exc.code} fetching {url}") from exc
|
|
except urllib.error.URLError as exc:
|
|
raise RuntimeError(f"failed fetching {url}: {exc.reason}") from exc
|
|
|
|
|
|
def parse_feed(xml: bytes) -> list[FeedItem]:
|
|
root = ET.fromstring(xml)
|
|
root_name = _local_name(root.tag)
|
|
if root_name == "feed":
|
|
return _parse_atom(root)
|
|
return _parse_rss(root)
|
|
|
|
|
|
def insert_article(conn: sqlite3.Connection, source: sqlite3.Row, item: FeedItem) -> bool:
|
|
canonical_url = canonicalize_url(item.url)
|
|
if not canonical_url or not item.title:
|
|
return False
|
|
|
|
title = clean_text(item.title, max_len=500)
|
|
description = clean_text(item.description, max_len=1000)
|
|
if not title:
|
|
return False
|
|
|
|
url_hash = sha256_text(canonical_url)
|
|
title_hash = sha256_text(title)
|
|
try:
|
|
cursor = conn.execute(
|
|
"""
|
|
INSERT INTO articles (
|
|
source_id, canonical_url, title, description, author,
|
|
published_at, image_url, language, raw_guid, url_hash, title_hash
|
|
)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
source["id"],
|
|
canonical_url,
|
|
title,
|
|
description,
|
|
clean_text(item.author, max_len=250),
|
|
item.published_at,
|
|
canonicalize_url(item.image_url),
|
|
item.language,
|
|
item.raw_guid,
|
|
url_hash,
|
|
title_hash,
|
|
),
|
|
)
|
|
except sqlite3.IntegrityError:
|
|
return False
|
|
|
|
scores = score_article(title, description, int(source["pr_risk_score"]))
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO article_scores (
|
|
article_id, constructive_score, cortisol_score, ragebait_score,
|
|
agency_score, human_benefit_score, novelty_score, pr_risk_score,
|
|
accepted, reason_code, reason_text, model_name
|
|
)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
cursor.lastrowid,
|
|
scores["constructive_score"],
|
|
scores["cortisol_score"],
|
|
scores["ragebait_score"],
|
|
scores["agency_score"],
|
|
scores["human_benefit_score"],
|
|
scores["novelty_score"],
|
|
scores["pr_risk_score"],
|
|
scores["accepted"],
|
|
scores["reason_code"],
|
|
scores["reason_text"],
|
|
scores["model_name"],
|
|
),
|
|
)
|
|
conn.commit()
|
|
return True
|
|
|
|
|
|
def _parse_rss(root: ET.Element) -> list[FeedItem]:
|
|
channel = _first_child(root, "channel")
|
|
if channel is None:
|
|
channel = root
|
|
language = _first_text(channel, "language")
|
|
items = [element for element in root.iter() if _local_name(element.tag) == "item"]
|
|
parsed = []
|
|
for item in items:
|
|
title = _first_text(item, "title")
|
|
link = _first_text(item, "link")
|
|
guid = _first_text(item, "guid")
|
|
url = link or guid
|
|
if not title or not url:
|
|
continue
|
|
parsed.append(
|
|
FeedItem(
|
|
title=title,
|
|
url=url,
|
|
description=_first_text(item, "description", "summary", "encoded"),
|
|
author=_first_text(item, "author", "creator"),
|
|
published_at=_parse_date(_first_text(item, "pubDate", "published", "updated", "date")),
|
|
image_url=_find_image_url(item) or _html_image(item),
|
|
language=language,
|
|
raw_guid=guid,
|
|
)
|
|
)
|
|
return parsed
|
|
|
|
|
|
def _parse_atom(root: ET.Element) -> list[FeedItem]:
|
|
language = root.attrib.get("{http://www.w3.org/XML/1998/namespace}lang")
|
|
entries = [element for element in root if _local_name(element.tag) == "entry"]
|
|
parsed = []
|
|
for entry in entries:
|
|
title = _first_text(entry, "title")
|
|
url = _atom_link(entry)
|
|
if not title or not url:
|
|
continue
|
|
author = None
|
|
author_el = _first_child(entry, "author")
|
|
if author_el is not None:
|
|
author = _first_text(author_el, "name") or _text(author_el)
|
|
parsed.append(
|
|
FeedItem(
|
|
title=title,
|
|
url=url,
|
|
description=_first_text(entry, "summary", "content"),
|
|
author=author,
|
|
published_at=_parse_date(_first_text(entry, "published", "updated")),
|
|
image_url=_find_image_url(entry) or _html_image(entry),
|
|
language=language,
|
|
raw_guid=_first_text(entry, "id"),
|
|
)
|
|
)
|
|
return parsed
|
|
|
|
|
|
def _atom_link(entry: ET.Element) -> str | None:
|
|
fallback = None
|
|
for child in entry:
|
|
if _local_name(child.tag) != "link":
|
|
continue
|
|
href = child.attrib.get("href")
|
|
if not href:
|
|
continue
|
|
if child.attrib.get("rel", "alternate") == "alternate":
|
|
return href
|
|
fallback = fallback or href
|
|
return fallback
|
|
|
|
|
|
_IMG_SRC_RE = re.compile(r"""<img\b[^>]*?\bsrc=["']([^"']+)["']""", re.IGNORECASE)
|
|
|
|
|
|
def _img_from_html(html: str | None) -> str | None:
|
|
"""First <img src> in a content/description HTML blob, if any."""
|
|
if not html:
|
|
return None
|
|
match = _IMG_SRC_RE.search(html)
|
|
return match.group(1) if match else None
|
|
|
|
|
|
def _html_image(element: ET.Element) -> str | None:
|
|
"""Opportunistic image from the feed's content/description HTML.
|
|
|
|
Only ever reads what the feed already provides — never fetches the article
|
|
page. A non-http(s)/relative URL is dropped by canonicalize_url.
|
|
"""
|
|
html = _first_text(element, "encoded", "content", "description", "summary")
|
|
return canonicalize_url(_img_from_html(html))
|
|
|
|
|
|
def _find_image_url(element: ET.Element) -> str | None:
|
|
for child in element.iter():
|
|
name = _local_name(child.tag)
|
|
if name in {"thumbnail", "content"} and child.attrib.get("url"):
|
|
if child.attrib.get("medium") in {None, "image"}:
|
|
return child.attrib["url"]
|
|
if name == "enclosure" and child.attrib.get("url"):
|
|
mime = child.attrib.get("type", "")
|
|
if mime.startswith("image/"):
|
|
return child.attrib["url"]
|
|
return None
|
|
|
|
|
|
def _parse_date(value: str | None) -> str | None:
|
|
if not value:
|
|
return None
|
|
value = value.strip()
|
|
try:
|
|
parsed = email.utils.parsedate_to_datetime(value)
|
|
if parsed.tzinfo is None:
|
|
parsed = parsed.replace(tzinfo=UTC)
|
|
return parsed.astimezone(UTC).isoformat()
|
|
except (TypeError, ValueError):
|
|
pass
|
|
|
|
try:
|
|
parsed = datetime.fromisoformat(value.replace("Z", "+00:00"))
|
|
if parsed.tzinfo is None:
|
|
parsed = parsed.replace(tzinfo=UTC)
|
|
return parsed.astimezone(UTC).isoformat()
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def _first_child(element: ET.Element, name: str) -> ET.Element | None:
|
|
for child in element:
|
|
if _local_name(child.tag) == name:
|
|
return child
|
|
return None
|
|
|
|
|
|
def _first_text(element: ET.Element, *names: str) -> str | None:
|
|
for child in element:
|
|
if _local_name(child.tag) in names:
|
|
value = _text(child)
|
|
if value:
|
|
return value
|
|
return None
|
|
|
|
|
|
def _text(element: ET.Element) -> str | None:
|
|
if element.text:
|
|
return element.text.strip()
|
|
return None
|
|
|
|
|
|
def _local_name(tag: str) -> str:
|
|
if "}" in tag:
|
|
return tag.rsplit("}", 1)[1]
|
|
return tag
|
|
|