from __future__ import annotations import email.utils import re import sqlite3 import urllib.error import urllib.request import xml.etree.ElementTree as ET from collections import Counter from dataclasses import dataclass from datetime import UTC, datetime, timedelta from urllib.parse import urljoin, urlsplit from .enrich import MAX_REDIRECTS, _NoRedirect, _host_is_public from .scoring import score_article from .text import canonicalize_url, clean_text, sha256_text USER_AGENT = "goodNews/0.1 (+local constructive news prototype)" FEED_MAX_BYTES = 2_000_000 # cap on a fetched feed body (SSRF-safe preview path) @dataclass class FeedItem: title: str url: str description: str | None = None author: str | None = None published_at: str | None = None image_url: str | None = None language: str | None = None raw_guid: str | None = None def poll_all_sources(conn: sqlite3.Connection, limit: int | None = None) -> dict: return _poll_rows(conn, conn.execute( "SELECT * FROM sources WHERE active = 1 ORDER BY id" ).fetchall(), limit) # A failing source backs off so it isn't re-hit every scheduler cycle: the # effective wait grows with the failure streak, capped at a day. This keeps a # rate-limited feed (HTTP 429) resting instead of hammered, and lets it recover # on its own once the limit lifts. MAX_BACKOFF_MINUTES = 1440 class RateLimited(RuntimeError): """A feed returned HTTP 429. Carries the parsed retry-after timestamp (a UTC 'YYYY-MM-DD HH:MM:SS' string, or None) so the caller can rest politely rather than treating it as operational breakage.""" def __init__(self, message: str, retry_after_at: str | None = None) -> None: super().__init__(message) self.retry_after_at = retry_after_at def parse_retry_after(value: str | None, now: datetime | None = None) -> str | None: """Parse a Retry-After header → UTC 'YYYY-MM-DD HH:MM:SS', or None. Accepts delta-seconds or an HTTP-date; ignores invalid/negative/past values; caps the result at now + MAX_BACKOFF_MINUTES so a feed can't park itself. """ if not value: return None now = now or datetime.now(UTC) value = value.strip() target = None if value.isdigit(): target = now + timedelta(seconds=int(value)) else: try: dt = email.utils.parsedate_to_datetime(value) except (TypeError, ValueError, IndexError): dt = None if dt is not None: target = dt if dt.tzinfo else dt.replace(tzinfo=UTC) if target is None: return None target = target.astimezone(UTC) if target <= now: return None # negative / past → ignore cap = now + timedelta(minutes=MAX_BACKOFF_MINUTES) if target > cap: target = cap return target.strftime("%Y-%m-%d %H:%M:%S") def poll_due_sources(conn: sqlite3.Connection, limit: int | None = None) -> dict: """Poll only active sources whose last *attempt* (success OR failure) is older than their effective interval, or that have never been polled. Keying on the last attempt — not the last success — is what stops a perpetually-failing feed from being retried every cycle. The effective interval is poll_interval_minutes scaled up by the consecutive-failure streak (capped at MAX_BACKOFF_MINUTES), so healthy feeds keep their cadence while broken ones step down to occasional retries. """ return _poll_rows(conn, due_source_rows(conn), limit) def due_source_rows(conn: sqlite3.Connection) -> list[sqlite3.Row]: """Active sources currently due to poll (see poll_due_sources for the rule). Split out so the due/backoff decision can be tested without the network. """ return conn.execute( """ SELECT s.* FROM sources s WHERE s.active = 1 AND (s.retry_after_at IS NULL OR s.retry_after_at <= datetime('now')) AND ( NOT EXISTS ( SELECT 1 FROM ingest_runs r WHERE r.source_id = s.id AND r.finished_at IS NOT NULL ) OR ( SELECT MAX(r.finished_at) FROM ingest_runs r WHERE r.source_id = s.id AND r.finished_at IS NOT NULL ) <= datetime( 'now', '-' || MIN(?, s.poll_interval_minutes * (1 + s.consecutive_failures)) || ' minutes' ) ) ORDER BY s.id """, (MAX_BACKOFF_MINUTES,), ).fetchall() def _poll_rows(conn: sqlite3.Connection, rows: list[sqlite3.Row], limit: int | None) -> dict: if limit is not None: rows = rows[:limit] totals = {"sources": 0, "seen": 0, "inserted": 0, "duplicate": 0, "failed": 0} for source in rows: result = poll_source(conn, source) totals["sources"] += 1 totals["seen"] += result["seen"] totals["inserted"] += result["inserted"] totals["duplicate"] += result["duplicate"] totals["failed"] += 1 if result["status"] == "failed" else 0 return totals def poll_source(conn: sqlite3.Connection, source: sqlite3.Row) -> dict: run_id = conn.execute( "INSERT INTO ingest_runs (source_id) VALUES (?)", (source["id"],), ).lastrowid conn.commit() seen = inserted = duplicate = 0 try: xml = fetch_feed(source["feed_url"]) items = parse_feed(xml) seen = len(items) for item in items: inserted_now = insert_article(conn, source, item) if inserted_now: inserted += 1 else: duplicate += 1 conn.execute( """ UPDATE ingest_runs SET finished_at = CURRENT_TIMESTAMP, status = 'ok', items_seen = ?, items_inserted = ?, items_duplicate = ? WHERE id = ? """, (seen, inserted, duplicate, run_id), ) # A clean poll resets the failure streak, clears any rate-limit rest, and # records the success time. conn.execute( """ UPDATE sources SET last_success_at = CURRENT_TIMESTAMP, consecutive_failures = 0, retry_after_at = NULL WHERE id = ? """, (source["id"],), ) conn.commit() return {"status": "ok", "seen": seen, "inserted": inserted, "duplicate": duplicate} except RateLimited as exc: # The publisher asked us to wait — not operational breakage. Record a # polite rest window WITHOUT inflating the failure streak. conn.execute( """ UPDATE ingest_runs SET finished_at = CURRENT_TIMESTAMP, status = 'rate_limited', items_seen = ?, items_inserted = ?, items_duplicate = ?, error = ? WHERE id = ? """, (seen, inserted, duplicate, str(exc), run_id), ) conn.execute( "UPDATE sources SET last_error_at = CURRENT_TIMESTAMP, last_error = ?, retry_after_at = ? WHERE id = ?", (str(exc), exc.retry_after_at, source["id"]), ) conn.commit() return { "status": "rate_limited", "seen": seen, "inserted": inserted, "duplicate": duplicate, "retry_after_at": exc.retry_after_at, } except Exception as exc: conn.execute( """ UPDATE ingest_runs SET finished_at = CURRENT_TIMESTAMP, status = 'failed', items_seen = ?, items_inserted = ?, items_duplicate = ?, error = ? WHERE id = ? """, (seen, inserted, duplicate, str(exc), run_id), ) # Track the failure streak and latest error for advisory review flags. conn.execute( """ UPDATE sources SET consecutive_failures = consecutive_failures + 1, last_error_at = CURRENT_TIMESTAMP, last_error = ? WHERE id = ? """, (str(exc), source["id"]), ) conn.commit() return { "status": "failed", "seen": seen, "inserted": inserted, "duplicate": duplicate, "error": str(exc), } # Deep-preview accessibility sample bounds (module-level so tests can shrink them). _ACCESS_FETCH_TIMEOUT = 6 # per-article socket timeout (seconds) _ACCESS_DEADLINE_S = 12.0 # hard wall-clock cap for the whole access phase def preview_feed(url: str, sample: int = 25, pr_risk_default: int = 3, client=None, fetcher=None) -> dict: """Fetch and score a sample of a feed WITHOUT persisting anything. Read-only: lets a user vet a candidate source before it is ever added. By default it uses the fast heuristic; pass an LLM client to also get the topic/flavor mix and the model's acceptance view (slower). Pass fetcher=safe_fetch_feed for untrusted (admin-pasted) URLs. """ items = parse_feed((fetcher or fetch_feed)(url)) rows = [] for item in items[:sample]: title = clean_text(item.title, max_len=500) if not title: continue description = clean_text(item.description, max_len=1000) s = score_article(title, description, pr_risk_default) rows.append( { "title": title, "description": description, "url": canonicalize_url(item.url), "published_at": item.published_at, "accepted": bool(s["accepted"]), "cortisol": s["cortisol_score"], "ragebait": s["ragebait_score"], "pr_risk": s["pr_risk_score"], "reason_code": s["reason_code"], "topic": None, "flavor": None, } ) classified = False if client and rows: from .llm import normalize_scores classified = True for r in rows: try: raw = client.classify( { "source_name": "preview", "default_category": None, "source_trust_score": 5, "source_pr_risk_score": pr_risk_default, "published_at": r["published_at"], "title": r["title"], "description": r["description"] or "", "canonical_url": r["url"], } ) ns = normalize_scores(raw, model_name=client.model) r.update( accepted=bool(ns["accepted"]), topic=ns["topic"], flavor=ns["flavor"], cortisol=ns["cortisol_score"], ragebait=ns["ragebait_score"], pr_risk=ns["pr_risk_score"], reason_code=ns["reason_code"], language=ns.get("language", ""), ) except Exception: pass # one bad item shouldn't sink the whole preview total = len(rows) accepted = sum(1 for r in rows if r["accepted"]) # Non-English items are HELD (English-only feed for now), not calm-filter # rejections — surface the count and judge acceptance over English items only, so # a multilingual wire (e.g. PR Newswire) isn't unfairly penalized in the preview. non_english = sum(1 for r in rows if r.get("reason_code") == "non_english") judged = total - non_english # Accessibility sample — deep preview only (it already means "spend ~a minute to # really know"). Layered per Codex: the instant DOMAIN rule + a small sampled # article fetch, so a paywall verdict rests on evidence, not domain alone (NYT # Learning proved domain rules false-positive). from .paywall import check_article_access, is_paywalled domain_paywalled = is_paywalled(url) access = None access_verdict = None if classified and rows: from concurrent.futures import ThreadPoolExecutor, as_completed # prefer the URLs the model would actually surface, then fill from the rest ordered = [r["url"] for r in rows if r["accepted"] and r["url"]] + \ [r["url"] for r in rows if not r["accepted"] and r["url"]] seen, sample_urls = set(), [] for u in ordered: if u not in seen: seen.add(u) sample_urls.append(u) if len(sample_urls) >= 6: break results = [] if sample_urls: af = fetcher or fetch_feed ex = ThreadPoolExecutor(max_workers=min(6, len(sample_urls))) futs = {ex.submit(check_article_access, u, af, _ACCESS_FETCH_TIMEOUT): u for u in sample_urls} done = {} try: # Hard wall-clock cap: the access step can NEVER stall the whole # preview. Fetches run in parallel; whatever hasn't finished by the # deadline is left 'unknown' (unverified — never counts as walled). # shutdown(wait=False, cancel_futures=True) below means we don't block # on stragglers (no `with ... as ex` join), so wall-clock == the cap. for fut in as_completed(futs, timeout=_ACCESS_DEADLINE_S): done[futs[fut]] = fut.result() except Exception: # noqa: BLE001 — overall deadline hit; use what finished pass ex.shutdown(wait=False, cancel_futures=True) results = [(u, done.get(u, "unknown")) for u in sample_urls] counts = Counter(a for _, a in results) readable, paywalled = counts.get("readable", 0), counts.get("paywalled", 0) assessable = readable + paywalled inacc = (paywalled / assessable) if assessable else None # `blocked` is deliberately NOT counted as inaccessible: a bot-block isn't a # reader paywall (it may open fine in a browser), so it can never push a # source to reject-ready — only readable-vs-paywalled evidence does. Need a # few clearly-assessable samples before judging confidently. ENOUGH = 3 if assessable < ENOUGH: access_verdict = "review" # mostly blocked/unknown — can't confirm; click examples elif domain_paywalled and inacc >= 0.7: access_verdict = "reject-ready" # domain rule AND sample agree it's walled elif domain_paywalled: access_verdict = "review" # domain says walled but the sample isn't — likely a false positive, look elif inacc >= 0.7: access_verdict = "review" # not on the list but mostly walled — candidate for the rule elif inacc <= 0.3: access_verdict = "fine" else: access_verdict = "review" # mixed access = { "checked": len(results), "readable": readable, "paywalled": paywalled, "blocked": counts.get("blocked", 0), "unknown": counts.get("unknown", 0), "examples": [{"url": u, "access": a} for u, a in results][:5], } def _avg(key: str) -> float: return round(sum(r[key] for r in rows) / total, 1) if total else 0.0 # Freshness: newest item and how many landed in the last week. now = datetime.now(UTC) dates = [] for r in rows: if r["published_at"]: try: dates.append(datetime.fromisoformat(r["published_at"])) except ValueError: pass newest = max(dates).isoformat() if dates else None recent_7d = sum(1 for d in dates if (now - d).days <= 7) return { "url": url, "sampled": total, "classified": classified, "accepted": accepted, "non_english": non_english, # held for language (English-only feed for now) # None (not 0%) when there are no English items to judge — "all held", not "all rejected". "acceptance_rate": round(accepted / judged, 2) if judged else None, "avg_cortisol": _avg("cortisol"), "avg_ragebait": _avg("ragebait"), "avg_pr_risk": _avg("pr_risk"), "newest_published": newest, "recent_7d": recent_7d, "paywall_rule": domain_paywalled, # instant domain hint "access": access, # sampled readable/paywalled/blocked/unknown (deep only) "access_verdict": access_verdict, # fine | review | reject-ready "topic_mix": dict(Counter(r["topic"] for r in rows if r["topic"])), "flavor_mix": dict(Counter(r["flavor"] for r in rows if r["flavor"])), "examples_accepted": [r["title"] for r in rows if r["accepted"]][:5], "examples_rejected": [ {"title": r["title"], "reason": r["reason_code"]} for r in rows if not r["accepted"] ][:5], } def fetch_feed(url: str, timeout: int = 20) -> bytes: request = urllib.request.Request(url, headers={"User-Agent": USER_AGENT}) try: with urllib.request.urlopen(request, timeout=timeout) as response: return response.read() except urllib.error.HTTPError as exc: if exc.code == 429: ra = parse_retry_after(exc.headers.get("Retry-After")) if exc.headers else None raise RateLimited(f"HTTP 429 rate limited fetching {url}", retry_after_at=ra) from exc raise RuntimeError(f"HTTP {exc.code} fetching {url}") from exc except urllib.error.URLError as exc: raise RuntimeError(f"failed fetching {url}: {exc.reason}") from exc def safe_fetch_feed(url: str, timeout: int = 10) -> bytes: """SSRF-safe feed fetch for UNTRUSTED (admin-pasted) URLs. Unlike fetch_feed (used for already-vetted, curated feeds), this re-validates every redirect hop against public IPs (reusing enrich._host_is_public), allows only http(s), caps the body, follows a bounded number of redirects, and sends no cookies/credentials. Use this for the preview/candidate path. """ opener = urllib.request.build_opener(_NoRedirect) current = url for _ in range(MAX_REDIRECTS + 1): parts = urlsplit(current) if parts.scheme not in ("http", "https") or not _host_is_public(parts.hostname): raise RuntimeError("refusing to fetch a non-public or non-http(s) URL") request = urllib.request.Request(current, headers={"User-Agent": USER_AGENT}) try: response = opener.open(request, timeout=timeout) except (urllib.error.URLError, OSError, ValueError) as exc: raise RuntimeError(f"failed fetching {current}: {exc}") from exc status = getattr(response, "status", 200) or 200 if status in (301, 302, 303, 307, 308): location = response.headers.get("Location") response.close() if not location: raise RuntimeError("redirect without a location") current = urljoin(current, location) continue try: return response.read(FEED_MAX_BYTES) finally: response.close() raise RuntimeError("too many redirects") def parse_feed(xml: bytes) -> list[FeedItem]: root = ET.fromstring(xml) root_name = _local_name(root.tag) if root_name == "feed": return _parse_atom(root) return _parse_rss(root) def insert_article(conn: sqlite3.Connection, source: sqlite3.Row, item: FeedItem) -> bool: canonical_url = canonicalize_url(item.url) if not canonical_url or not item.title: return False title = clean_text(item.title, max_len=500) description = clean_text(item.description, max_len=1000) if not title: return False url_hash = sha256_text(canonical_url) title_hash = sha256_text(title) try: cursor = conn.execute( """ INSERT INTO articles ( source_id, canonical_url, title, description, author, published_at, image_url, language, raw_guid, url_hash, title_hash ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( source["id"], canonical_url, title, description, clean_text(item.author, max_len=250), item.published_at, # Don't store the feed's image: RSS thumbnails are often tiny # (~90px) and upscale to mush. image_url is filled only by the # quality-gated og:image enrichment (brief / recent / on-open). None, item.language, item.raw_guid, url_hash, title_hash, ), ) except sqlite3.IntegrityError: return False scores = score_article(title, description, int(source["pr_risk_score"])) conn.execute( """ INSERT INTO article_scores ( article_id, constructive_score, cortisol_score, ragebait_score, agency_score, human_benefit_score, novelty_score, pr_risk_score, accepted, reason_code, reason_text, model_name ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( cursor.lastrowid, scores["constructive_score"], scores["cortisol_score"], scores["ragebait_score"], scores["agency_score"], scores["human_benefit_score"], scores["novelty_score"], scores["pr_risk_score"], scores["accepted"], scores["reason_code"], scores["reason_text"], scores["model_name"], ), ) conn.commit() return True def _parse_rss(root: ET.Element) -> list[FeedItem]: channel = _first_child(root, "channel") if channel is None: channel = root language = _first_text(channel, "language") items = [element for element in root.iter() if _local_name(element.tag) == "item"] parsed = [] for item in items: title = _first_text(item, "title") link = _first_text(item, "link") guid = _first_text(item, "guid") url = link or guid if not title or not url: continue parsed.append( FeedItem( title=title, url=url, description=_first_text(item, "description", "summary", "encoded"), author=_first_text(item, "author", "creator"), published_at=_parse_date(_first_text(item, "pubDate", "published", "updated", "date")), image_url=_find_image_url(item) or _html_image(item), language=language, raw_guid=guid, ) ) return parsed def _parse_atom(root: ET.Element) -> list[FeedItem]: language = root.attrib.get("{http://www.w3.org/XML/1998/namespace}lang") entries = [element for element in root if _local_name(element.tag) == "entry"] parsed = [] for entry in entries: title = _first_text(entry, "title") url = _atom_link(entry) if not title or not url: continue author = None author_el = _first_child(entry, "author") if author_el is not None: author = _first_text(author_el, "name") or _text(author_el) parsed.append( FeedItem( title=title, url=url, description=_first_text(entry, "summary", "content"), author=author, published_at=_parse_date(_first_text(entry, "published", "updated")), image_url=_find_image_url(entry) or _html_image(entry), language=language, raw_guid=_first_text(entry, "id"), ) ) return parsed def _atom_link(entry: ET.Element) -> str | None: fallback = None for child in entry: if _local_name(child.tag) != "link": continue href = child.attrib.get("href") if not href: continue if child.attrib.get("rel", "alternate") == "alternate": return href fallback = fallback or href return fallback _IMG_SRC_RE = re.compile(r"""]*?\bsrc=["']([^"']+)["']""", re.IGNORECASE) def _img_from_html(html: str | None) -> str | None: """First in a content/description HTML blob, if any.""" if not html: return None match = _IMG_SRC_RE.search(html) return match.group(1) if match else None def _html_image(element: ET.Element) -> str | None: """Opportunistic image from the feed's content/description HTML. Only ever reads what the feed already provides — never fetches the article page. A non-http(s)/relative URL is dropped by canonicalize_url. """ html = _first_text(element, "encoded", "content", "description", "summary") return canonicalize_url(_img_from_html(html)) def _find_image_url(element: ET.Element) -> str | None: for child in element.iter(): name = _local_name(child.tag) if name in {"thumbnail", "content"} and child.attrib.get("url"): if child.attrib.get("medium") in {None, "image"}: return child.attrib["url"] if name == "enclosure" and child.attrib.get("url"): mime = child.attrib.get("type", "") if mime.startswith("image/"): return child.attrib["url"] return None def _parse_date(value: str | None) -> str | None: if not value: return None value = value.strip() try: parsed = email.utils.parsedate_to_datetime(value) if parsed.tzinfo is None: parsed = parsed.replace(tzinfo=UTC) return parsed.astimezone(UTC).isoformat() except (TypeError, ValueError): pass try: parsed = datetime.fromisoformat(value.replace("Z", "+00:00")) if parsed.tzinfo is None: parsed = parsed.replace(tzinfo=UTC) return parsed.astimezone(UTC).isoformat() except ValueError: return None def _first_child(element: ET.Element, name: str) -> ET.Element | None: for child in element: if _local_name(child.tag) == name: return child return None def _first_text(element: ET.Element, *names: str) -> str | None: for child in element: if _local_name(child.tag) in names: value = _text(child) if value: return value return None def _text(element: ET.Element) -> str | None: if element.text: return element.text.strip() return None def _local_name(tag: str) -> str: if "}" in tag: return tag.rsplit("}", 1)[1] return tag