89c0fbe1f6
The deploy pipeline runs from the working tree, so a wave of shipped features
had never been committed. This snapshots git to what's actually running.
SEO impression recovery (live + verified):
- Duplicate /a/{id} now 301-redirect to their canonical twin instead of 404
(a hard 404 silently dropped already-indexed URLs and tanked impressions).
- Dedup representative selection reworked: accepted/serveable -> established
rep (URL stability) -> quality score, so an accepted page never retires to a
rejected rep and an indexed canonical doesn't churn when a newer twin arrives.
- HEAD /a/{id} returns the same status as GET (api_route GET+HEAD) instead of
falling through to the static mount and 404ing.
- `dedup --force-recluster`: cycle-locked, model-free re-cluster to re-apply the
policy to the existing corpus (shared cycle_lock context manager).
- CLI honors GOODNEWS_DB for its default --db (was silently ignored).
Publishing Desk (admin tool to post highlights to X via Web Intents):
- publishing.py queue/rank/handle-resolution; admin UI; full searchable emoji
picker (bundled data, no CDN) for the blurb editor.
Play games + site:
- Bloom (word-wheel), Memory Match, daily ritual set, Zen Den (dev-gated).
- English-only language gate; source prospecting; paywall + dedup hardening.
Tests: full suite green (349). Ignores tightened (node_modules, data/*.db).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
709 lines
26 KiB
Python
709 lines
26 KiB
Python
from __future__ import annotations
|
|
|
|
import email.utils
|
|
import re
|
|
import sqlite3
|
|
import urllib.error
|
|
import urllib.request
|
|
import xml.etree.ElementTree as ET
|
|
from collections import Counter
|
|
from dataclasses import dataclass
|
|
from datetime import UTC, datetime, timedelta
|
|
from urllib.parse import urljoin, urlsplit
|
|
|
|
from .enrich import MAX_REDIRECTS, _NoRedirect, _host_is_public
|
|
from .scoring import score_article
|
|
from .text import canonicalize_url, clean_text, sha256_text
|
|
|
|
|
|
USER_AGENT = "goodNews/0.1 (+local constructive news prototype)"
|
|
FEED_MAX_BYTES = 2_000_000 # cap on a fetched feed body (SSRF-safe preview path)
|
|
|
|
|
|
@dataclass
|
|
class FeedItem:
|
|
title: str
|
|
url: str
|
|
description: str | None = None
|
|
author: str | None = None
|
|
published_at: str | None = None
|
|
image_url: str | None = None
|
|
language: str | None = None
|
|
raw_guid: str | None = None
|
|
|
|
|
|
def poll_all_sources(conn: sqlite3.Connection, limit: int | None = None) -> dict:
|
|
return _poll_rows(conn, conn.execute(
|
|
"SELECT * FROM sources WHERE active = 1 ORDER BY id"
|
|
).fetchall(), limit)
|
|
|
|
|
|
# A failing source backs off so it isn't re-hit every scheduler cycle: the
|
|
# effective wait grows with the failure streak, capped at a day. This keeps a
|
|
# rate-limited feed (HTTP 429) resting instead of hammered, and lets it recover
|
|
# on its own once the limit lifts.
|
|
MAX_BACKOFF_MINUTES = 1440
|
|
|
|
|
|
class RateLimited(RuntimeError):
|
|
"""A feed returned HTTP 429. Carries the parsed retry-after timestamp (a UTC
|
|
'YYYY-MM-DD HH:MM:SS' string, or None) so the caller can rest politely rather
|
|
than treating it as operational breakage."""
|
|
|
|
def __init__(self, message: str, retry_after_at: str | None = None) -> None:
|
|
super().__init__(message)
|
|
self.retry_after_at = retry_after_at
|
|
|
|
|
|
def parse_retry_after(value: str | None, now: datetime | None = None) -> str | None:
|
|
"""Parse a Retry-After header → UTC 'YYYY-MM-DD HH:MM:SS', or None.
|
|
|
|
Accepts delta-seconds or an HTTP-date; ignores invalid/negative/past values;
|
|
caps the result at now + MAX_BACKOFF_MINUTES so a feed can't park itself.
|
|
"""
|
|
if not value:
|
|
return None
|
|
now = now or datetime.now(UTC)
|
|
value = value.strip()
|
|
target = None
|
|
if value.isdigit():
|
|
target = now + timedelta(seconds=int(value))
|
|
else:
|
|
try:
|
|
dt = email.utils.parsedate_to_datetime(value)
|
|
except (TypeError, ValueError, IndexError):
|
|
dt = None
|
|
if dt is not None:
|
|
target = dt if dt.tzinfo else dt.replace(tzinfo=UTC)
|
|
if target is None:
|
|
return None
|
|
target = target.astimezone(UTC)
|
|
if target <= now:
|
|
return None # negative / past → ignore
|
|
cap = now + timedelta(minutes=MAX_BACKOFF_MINUTES)
|
|
if target > cap:
|
|
target = cap
|
|
return target.strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
|
|
def poll_due_sources(conn: sqlite3.Connection, limit: int | None = None) -> dict:
|
|
"""Poll only active sources whose last *attempt* (success OR failure) is
|
|
older than their effective interval, or that have never been polled.
|
|
|
|
Keying on the last attempt — not the last success — is what stops a
|
|
perpetually-failing feed from being retried every cycle. The effective
|
|
interval is poll_interval_minutes scaled up by the consecutive-failure
|
|
streak (capped at MAX_BACKOFF_MINUTES), so healthy feeds keep their cadence
|
|
while broken ones step down to occasional retries.
|
|
"""
|
|
return _poll_rows(conn, due_source_rows(conn), limit)
|
|
|
|
|
|
def due_source_rows(conn: sqlite3.Connection) -> list[sqlite3.Row]:
|
|
"""Active sources currently due to poll (see poll_due_sources for the rule).
|
|
|
|
Split out so the due/backoff decision can be tested without the network.
|
|
"""
|
|
return conn.execute(
|
|
"""
|
|
SELECT s.*
|
|
FROM sources s
|
|
WHERE s.active = 1
|
|
AND (s.retry_after_at IS NULL OR s.retry_after_at <= datetime('now'))
|
|
AND (
|
|
NOT EXISTS (
|
|
SELECT 1 FROM ingest_runs r
|
|
WHERE r.source_id = s.id AND r.finished_at IS NOT NULL
|
|
)
|
|
OR (
|
|
SELECT MAX(r.finished_at) FROM ingest_runs r
|
|
WHERE r.source_id = s.id AND r.finished_at IS NOT NULL
|
|
) <= datetime(
|
|
'now',
|
|
'-' || MIN(?, s.poll_interval_minutes * (1 + s.consecutive_failures)) || ' minutes'
|
|
)
|
|
)
|
|
ORDER BY s.id
|
|
""",
|
|
(MAX_BACKOFF_MINUTES,),
|
|
).fetchall()
|
|
|
|
|
|
def _poll_rows(conn: sqlite3.Connection, rows: list[sqlite3.Row], limit: int | None) -> dict:
|
|
if limit is not None:
|
|
rows = rows[:limit]
|
|
|
|
totals = {"sources": 0, "seen": 0, "inserted": 0, "duplicate": 0, "failed": 0}
|
|
for source in rows:
|
|
result = poll_source(conn, source)
|
|
totals["sources"] += 1
|
|
totals["seen"] += result["seen"]
|
|
totals["inserted"] += result["inserted"]
|
|
totals["duplicate"] += result["duplicate"]
|
|
totals["failed"] += 1 if result["status"] == "failed" else 0
|
|
return totals
|
|
|
|
|
|
def poll_source(conn: sqlite3.Connection, source: sqlite3.Row) -> dict:
|
|
run_id = conn.execute(
|
|
"INSERT INTO ingest_runs (source_id) VALUES (?)",
|
|
(source["id"],),
|
|
).lastrowid
|
|
conn.commit()
|
|
|
|
seen = inserted = duplicate = 0
|
|
try:
|
|
xml = fetch_feed(source["feed_url"])
|
|
items = parse_feed(xml)
|
|
seen = len(items)
|
|
for item in items:
|
|
inserted_now = insert_article(conn, source, item)
|
|
if inserted_now:
|
|
inserted += 1
|
|
else:
|
|
duplicate += 1
|
|
|
|
conn.execute(
|
|
"""
|
|
UPDATE ingest_runs
|
|
SET finished_at = CURRENT_TIMESTAMP,
|
|
status = 'ok',
|
|
items_seen = ?,
|
|
items_inserted = ?,
|
|
items_duplicate = ?
|
|
WHERE id = ?
|
|
""",
|
|
(seen, inserted, duplicate, run_id),
|
|
)
|
|
# A clean poll resets the failure streak, clears any rate-limit rest, and
|
|
# records the success time.
|
|
conn.execute(
|
|
"""
|
|
UPDATE sources
|
|
SET last_success_at = CURRENT_TIMESTAMP, consecutive_failures = 0, retry_after_at = NULL
|
|
WHERE id = ?
|
|
""",
|
|
(source["id"],),
|
|
)
|
|
conn.commit()
|
|
return {"status": "ok", "seen": seen, "inserted": inserted, "duplicate": duplicate}
|
|
except RateLimited as exc:
|
|
# The publisher asked us to wait — not operational breakage. Record a
|
|
# polite rest window WITHOUT inflating the failure streak.
|
|
conn.execute(
|
|
"""
|
|
UPDATE ingest_runs
|
|
SET finished_at = CURRENT_TIMESTAMP, status = 'rate_limited',
|
|
items_seen = ?, items_inserted = ?, items_duplicate = ?, error = ?
|
|
WHERE id = ?
|
|
""",
|
|
(seen, inserted, duplicate, str(exc), run_id),
|
|
)
|
|
conn.execute(
|
|
"UPDATE sources SET last_error_at = CURRENT_TIMESTAMP, last_error = ?, retry_after_at = ? WHERE id = ?",
|
|
(str(exc), exc.retry_after_at, source["id"]),
|
|
)
|
|
conn.commit()
|
|
return {
|
|
"status": "rate_limited", "seen": seen, "inserted": inserted,
|
|
"duplicate": duplicate, "retry_after_at": exc.retry_after_at,
|
|
}
|
|
except Exception as exc:
|
|
conn.execute(
|
|
"""
|
|
UPDATE ingest_runs
|
|
SET finished_at = CURRENT_TIMESTAMP,
|
|
status = 'failed',
|
|
items_seen = ?,
|
|
items_inserted = ?,
|
|
items_duplicate = ?,
|
|
error = ?
|
|
WHERE id = ?
|
|
""",
|
|
(seen, inserted, duplicate, str(exc), run_id),
|
|
)
|
|
# Track the failure streak and latest error for advisory review flags.
|
|
conn.execute(
|
|
"""
|
|
UPDATE sources
|
|
SET consecutive_failures = consecutive_failures + 1,
|
|
last_error_at = CURRENT_TIMESTAMP,
|
|
last_error = ?
|
|
WHERE id = ?
|
|
""",
|
|
(str(exc), source["id"]),
|
|
)
|
|
conn.commit()
|
|
return {
|
|
"status": "failed",
|
|
"seen": seen,
|
|
"inserted": inserted,
|
|
"duplicate": duplicate,
|
|
"error": str(exc),
|
|
}
|
|
|
|
|
|
# Deep-preview accessibility sample bounds (module-level so tests can shrink them).
|
|
_ACCESS_FETCH_TIMEOUT = 6 # per-article socket timeout (seconds)
|
|
_ACCESS_DEADLINE_S = 12.0 # hard wall-clock cap for the whole access phase
|
|
|
|
|
|
def preview_feed(url: str, sample: int = 25, pr_risk_default: int = 3, client=None, fetcher=None) -> dict:
|
|
"""Fetch and score a sample of a feed WITHOUT persisting anything.
|
|
|
|
Read-only: lets a user vet a candidate source before it is ever added. By
|
|
default it uses the fast heuristic; pass an LLM client to also get the
|
|
topic/flavor mix and the model's acceptance view (slower). Pass
|
|
fetcher=safe_fetch_feed for untrusted (admin-pasted) URLs.
|
|
"""
|
|
items = parse_feed((fetcher or fetch_feed)(url))
|
|
rows = []
|
|
for item in items[:sample]:
|
|
title = clean_text(item.title, max_len=500)
|
|
if not title:
|
|
continue
|
|
description = clean_text(item.description, max_len=1000)
|
|
s = score_article(title, description, pr_risk_default)
|
|
rows.append(
|
|
{
|
|
"title": title,
|
|
"description": description,
|
|
"url": canonicalize_url(item.url),
|
|
"published_at": item.published_at,
|
|
"accepted": bool(s["accepted"]),
|
|
"cortisol": s["cortisol_score"],
|
|
"ragebait": s["ragebait_score"],
|
|
"pr_risk": s["pr_risk_score"],
|
|
"reason_code": s["reason_code"],
|
|
"topic": None,
|
|
"flavor": None,
|
|
}
|
|
)
|
|
|
|
classified = False
|
|
if client and rows:
|
|
from .llm import normalize_scores
|
|
|
|
classified = True
|
|
for r in rows:
|
|
try:
|
|
raw = client.classify(
|
|
{
|
|
"source_name": "preview",
|
|
"default_category": None,
|
|
"source_trust_score": 5,
|
|
"source_pr_risk_score": pr_risk_default,
|
|
"published_at": r["published_at"],
|
|
"title": r["title"],
|
|
"description": r["description"] or "",
|
|
"canonical_url": r["url"],
|
|
}
|
|
)
|
|
ns = normalize_scores(raw, model_name=client.model)
|
|
r.update(
|
|
accepted=bool(ns["accepted"]),
|
|
topic=ns["topic"],
|
|
flavor=ns["flavor"],
|
|
cortisol=ns["cortisol_score"],
|
|
ragebait=ns["ragebait_score"],
|
|
pr_risk=ns["pr_risk_score"],
|
|
reason_code=ns["reason_code"],
|
|
language=ns.get("language", ""),
|
|
)
|
|
except Exception:
|
|
pass # one bad item shouldn't sink the whole preview
|
|
|
|
total = len(rows)
|
|
accepted = sum(1 for r in rows if r["accepted"])
|
|
# Non-English items are HELD (English-only feed for now), not calm-filter
|
|
# rejections — surface the count and judge acceptance over English items only, so
|
|
# a multilingual wire (e.g. PR Newswire) isn't unfairly penalized in the preview.
|
|
non_english = sum(1 for r in rows if r.get("reason_code") == "non_english")
|
|
judged = total - non_english
|
|
|
|
# Accessibility sample — deep preview only (it already means "spend ~a minute to
|
|
# really know"). Layered per Codex: the instant DOMAIN rule + a small sampled
|
|
# article fetch, so a paywall verdict rests on evidence, not domain alone (NYT
|
|
# Learning proved domain rules false-positive).
|
|
from .paywall import check_article_access, is_paywalled
|
|
domain_paywalled = is_paywalled(url)
|
|
access = None
|
|
access_verdict = None
|
|
if classified and rows:
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
# prefer the URLs the model would actually surface, then fill from the rest
|
|
ordered = [r["url"] for r in rows if r["accepted"] and r["url"]] + \
|
|
[r["url"] for r in rows if not r["accepted"] and r["url"]]
|
|
seen, sample_urls = set(), []
|
|
for u in ordered:
|
|
if u not in seen:
|
|
seen.add(u)
|
|
sample_urls.append(u)
|
|
if len(sample_urls) >= 6:
|
|
break
|
|
results = []
|
|
if sample_urls:
|
|
af = fetcher or fetch_feed
|
|
ex = ThreadPoolExecutor(max_workers=min(6, len(sample_urls)))
|
|
futs = {ex.submit(check_article_access, u, af, _ACCESS_FETCH_TIMEOUT): u for u in sample_urls}
|
|
done = {}
|
|
try:
|
|
# Hard wall-clock cap: the access step can NEVER stall the whole
|
|
# preview. Fetches run in parallel; whatever hasn't finished by the
|
|
# deadline is left 'unknown' (unverified — never counts as walled).
|
|
# shutdown(wait=False, cancel_futures=True) below means we don't block
|
|
# on stragglers (no `with ... as ex` join), so wall-clock == the cap.
|
|
for fut in as_completed(futs, timeout=_ACCESS_DEADLINE_S):
|
|
done[futs[fut]] = fut.result()
|
|
except Exception: # noqa: BLE001 — overall deadline hit; use what finished
|
|
pass
|
|
ex.shutdown(wait=False, cancel_futures=True)
|
|
results = [(u, done.get(u, "unknown")) for u in sample_urls]
|
|
counts = Counter(a for _, a in results)
|
|
readable, paywalled = counts.get("readable", 0), counts.get("paywalled", 0)
|
|
assessable = readable + paywalled
|
|
inacc = (paywalled / assessable) if assessable else None
|
|
# `blocked` is deliberately NOT counted as inaccessible: a bot-block isn't a
|
|
# reader paywall (it may open fine in a browser), so it can never push a
|
|
# source to reject-ready — only readable-vs-paywalled evidence does. Need a
|
|
# few clearly-assessable samples before judging confidently.
|
|
ENOUGH = 3
|
|
if assessable < ENOUGH:
|
|
access_verdict = "review" # mostly blocked/unknown — can't confirm; click examples
|
|
elif domain_paywalled and inacc >= 0.7:
|
|
access_verdict = "reject-ready" # domain rule AND sample agree it's walled
|
|
elif domain_paywalled:
|
|
access_verdict = "review" # domain says walled but the sample isn't — likely a false positive, look
|
|
elif inacc >= 0.7:
|
|
access_verdict = "review" # not on the list but mostly walled — candidate for the rule
|
|
elif inacc <= 0.3:
|
|
access_verdict = "fine"
|
|
else:
|
|
access_verdict = "review" # mixed
|
|
access = {
|
|
"checked": len(results),
|
|
"readable": readable, "paywalled": paywalled,
|
|
"blocked": counts.get("blocked", 0), "unknown": counts.get("unknown", 0),
|
|
"examples": [{"url": u, "access": a} for u, a in results][:5],
|
|
}
|
|
|
|
def _avg(key: str) -> float:
|
|
return round(sum(r[key] for r in rows) / total, 1) if total else 0.0
|
|
|
|
# Freshness: newest item and how many landed in the last week.
|
|
now = datetime.now(UTC)
|
|
dates = []
|
|
for r in rows:
|
|
if r["published_at"]:
|
|
try:
|
|
dates.append(datetime.fromisoformat(r["published_at"]))
|
|
except ValueError:
|
|
pass
|
|
newest = max(dates).isoformat() if dates else None
|
|
recent_7d = sum(1 for d in dates if (now - d).days <= 7)
|
|
|
|
return {
|
|
"url": url,
|
|
"sampled": total,
|
|
"classified": classified,
|
|
"accepted": accepted,
|
|
"non_english": non_english, # held for language (English-only feed for now)
|
|
# None (not 0%) when there are no English items to judge — "all held", not "all rejected".
|
|
"acceptance_rate": round(accepted / judged, 2) if judged else None,
|
|
"avg_cortisol": _avg("cortisol"),
|
|
"avg_ragebait": _avg("ragebait"),
|
|
"avg_pr_risk": _avg("pr_risk"),
|
|
"newest_published": newest,
|
|
"recent_7d": recent_7d,
|
|
"paywall_rule": domain_paywalled, # instant domain hint
|
|
"access": access, # sampled readable/paywalled/blocked/unknown (deep only)
|
|
"access_verdict": access_verdict, # fine | review | reject-ready
|
|
"topic_mix": dict(Counter(r["topic"] for r in rows if r["topic"])),
|
|
"flavor_mix": dict(Counter(r["flavor"] for r in rows if r["flavor"])),
|
|
"examples_accepted": [r["title"] for r in rows if r["accepted"]][:5],
|
|
"examples_rejected": [
|
|
{"title": r["title"], "reason": r["reason_code"]} for r in rows if not r["accepted"]
|
|
][:5],
|
|
}
|
|
|
|
|
|
def fetch_feed(url: str, timeout: int = 20) -> bytes:
|
|
request = urllib.request.Request(url, headers={"User-Agent": USER_AGENT})
|
|
try:
|
|
with urllib.request.urlopen(request, timeout=timeout) as response:
|
|
return response.read()
|
|
except urllib.error.HTTPError as exc:
|
|
if exc.code == 429:
|
|
ra = parse_retry_after(exc.headers.get("Retry-After")) if exc.headers else None
|
|
raise RateLimited(f"HTTP 429 rate limited fetching {url}", retry_after_at=ra) from exc
|
|
raise RuntimeError(f"HTTP {exc.code} fetching {url}") from exc
|
|
except urllib.error.URLError as exc:
|
|
raise RuntimeError(f"failed fetching {url}: {exc.reason}") from exc
|
|
|
|
|
|
def safe_fetch_feed(url: str, timeout: int = 10) -> bytes:
|
|
"""SSRF-safe feed fetch for UNTRUSTED (admin-pasted) URLs.
|
|
|
|
Unlike fetch_feed (used for already-vetted, curated feeds), this re-validates
|
|
every redirect hop against public IPs (reusing enrich._host_is_public),
|
|
allows only http(s), caps the body, follows a bounded number of redirects,
|
|
and sends no cookies/credentials. Use this for the preview/candidate path.
|
|
"""
|
|
opener = urllib.request.build_opener(_NoRedirect)
|
|
current = url
|
|
for _ in range(MAX_REDIRECTS + 1):
|
|
parts = urlsplit(current)
|
|
if parts.scheme not in ("http", "https") or not _host_is_public(parts.hostname):
|
|
raise RuntimeError("refusing to fetch a non-public or non-http(s) URL")
|
|
request = urllib.request.Request(current, headers={"User-Agent": USER_AGENT})
|
|
try:
|
|
response = opener.open(request, timeout=timeout)
|
|
except (urllib.error.URLError, OSError, ValueError) as exc:
|
|
raise RuntimeError(f"failed fetching {current}: {exc}") from exc
|
|
status = getattr(response, "status", 200) or 200
|
|
if status in (301, 302, 303, 307, 308):
|
|
location = response.headers.get("Location")
|
|
response.close()
|
|
if not location:
|
|
raise RuntimeError("redirect without a location")
|
|
current = urljoin(current, location)
|
|
continue
|
|
try:
|
|
return response.read(FEED_MAX_BYTES)
|
|
finally:
|
|
response.close()
|
|
raise RuntimeError("too many redirects")
|
|
|
|
|
|
def parse_feed(xml: bytes) -> list[FeedItem]:
|
|
root = ET.fromstring(xml)
|
|
root_name = _local_name(root.tag)
|
|
if root_name == "feed":
|
|
return _parse_atom(root)
|
|
return _parse_rss(root)
|
|
|
|
|
|
def insert_article(conn: sqlite3.Connection, source: sqlite3.Row, item: FeedItem) -> bool:
|
|
canonical_url = canonicalize_url(item.url)
|
|
if not canonical_url or not item.title:
|
|
return False
|
|
|
|
title = clean_text(item.title, max_len=500)
|
|
description = clean_text(item.description, max_len=1000)
|
|
if not title:
|
|
return False
|
|
|
|
url_hash = sha256_text(canonical_url)
|
|
title_hash = sha256_text(title)
|
|
try:
|
|
cursor = conn.execute(
|
|
"""
|
|
INSERT INTO articles (
|
|
source_id, canonical_url, title, description, author,
|
|
published_at, image_url, language, raw_guid, url_hash, title_hash
|
|
)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
source["id"],
|
|
canonical_url,
|
|
title,
|
|
description,
|
|
clean_text(item.author, max_len=250),
|
|
item.published_at,
|
|
# Don't store the feed's image: RSS thumbnails are often tiny
|
|
# (~90px) and upscale to mush. image_url is filled only by the
|
|
# quality-gated og:image enrichment (brief / recent / on-open).
|
|
None,
|
|
item.language,
|
|
item.raw_guid,
|
|
url_hash,
|
|
title_hash,
|
|
),
|
|
)
|
|
except sqlite3.IntegrityError:
|
|
return False
|
|
|
|
scores = score_article(title, description, int(source["pr_risk_score"]))
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO article_scores (
|
|
article_id, constructive_score, cortisol_score, ragebait_score,
|
|
agency_score, human_benefit_score, novelty_score, pr_risk_score,
|
|
accepted, reason_code, reason_text, model_name
|
|
)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
cursor.lastrowid,
|
|
scores["constructive_score"],
|
|
scores["cortisol_score"],
|
|
scores["ragebait_score"],
|
|
scores["agency_score"],
|
|
scores["human_benefit_score"],
|
|
scores["novelty_score"],
|
|
scores["pr_risk_score"],
|
|
scores["accepted"],
|
|
scores["reason_code"],
|
|
scores["reason_text"],
|
|
scores["model_name"],
|
|
),
|
|
)
|
|
conn.commit()
|
|
return True
|
|
|
|
|
|
def _parse_rss(root: ET.Element) -> list[FeedItem]:
|
|
channel = _first_child(root, "channel")
|
|
if channel is None:
|
|
channel = root
|
|
language = _first_text(channel, "language")
|
|
items = [element for element in root.iter() if _local_name(element.tag) == "item"]
|
|
parsed = []
|
|
for item in items:
|
|
title = _first_text(item, "title")
|
|
link = _first_text(item, "link")
|
|
guid = _first_text(item, "guid")
|
|
url = link or guid
|
|
if not title or not url:
|
|
continue
|
|
parsed.append(
|
|
FeedItem(
|
|
title=title,
|
|
url=url,
|
|
description=_first_text(item, "description", "summary", "encoded"),
|
|
author=_first_text(item, "author", "creator"),
|
|
published_at=_parse_date(_first_text(item, "pubDate", "published", "updated", "date")),
|
|
image_url=_find_image_url(item) or _html_image(item),
|
|
language=language,
|
|
raw_guid=guid,
|
|
)
|
|
)
|
|
return parsed
|
|
|
|
|
|
def _parse_atom(root: ET.Element) -> list[FeedItem]:
|
|
language = root.attrib.get("{http://www.w3.org/XML/1998/namespace}lang")
|
|
entries = [element for element in root if _local_name(element.tag) == "entry"]
|
|
parsed = []
|
|
for entry in entries:
|
|
title = _first_text(entry, "title")
|
|
url = _atom_link(entry)
|
|
if not title or not url:
|
|
continue
|
|
author = None
|
|
author_el = _first_child(entry, "author")
|
|
if author_el is not None:
|
|
author = _first_text(author_el, "name") or _text(author_el)
|
|
parsed.append(
|
|
FeedItem(
|
|
title=title,
|
|
url=url,
|
|
description=_first_text(entry, "summary", "content"),
|
|
author=author,
|
|
published_at=_parse_date(_first_text(entry, "published", "updated")),
|
|
image_url=_find_image_url(entry) or _html_image(entry),
|
|
language=language,
|
|
raw_guid=_first_text(entry, "id"),
|
|
)
|
|
)
|
|
return parsed
|
|
|
|
|
|
def _atom_link(entry: ET.Element) -> str | None:
|
|
fallback = None
|
|
for child in entry:
|
|
if _local_name(child.tag) != "link":
|
|
continue
|
|
href = child.attrib.get("href")
|
|
if not href:
|
|
continue
|
|
if child.attrib.get("rel", "alternate") == "alternate":
|
|
return href
|
|
fallback = fallback or href
|
|
return fallback
|
|
|
|
|
|
_IMG_SRC_RE = re.compile(r"""<img\b[^>]*?\bsrc=["']([^"']+)["']""", re.IGNORECASE)
|
|
|
|
|
|
def _img_from_html(html: str | None) -> str | None:
|
|
"""First <img src> in a content/description HTML blob, if any."""
|
|
if not html:
|
|
return None
|
|
match = _IMG_SRC_RE.search(html)
|
|
return match.group(1) if match else None
|
|
|
|
|
|
def _html_image(element: ET.Element) -> str | None:
|
|
"""Opportunistic image from the feed's content/description HTML.
|
|
|
|
Only ever reads what the feed already provides — never fetches the article
|
|
page. A non-http(s)/relative URL is dropped by canonicalize_url.
|
|
"""
|
|
html = _first_text(element, "encoded", "content", "description", "summary")
|
|
return canonicalize_url(_img_from_html(html))
|
|
|
|
|
|
def _find_image_url(element: ET.Element) -> str | None:
|
|
for child in element.iter():
|
|
name = _local_name(child.tag)
|
|
if name in {"thumbnail", "content"} and child.attrib.get("url"):
|
|
if child.attrib.get("medium") in {None, "image"}:
|
|
return child.attrib["url"]
|
|
if name == "enclosure" and child.attrib.get("url"):
|
|
mime = child.attrib.get("type", "")
|
|
if mime.startswith("image/"):
|
|
return child.attrib["url"]
|
|
return None
|
|
|
|
|
|
def _parse_date(value: str | None) -> str | None:
|
|
if not value:
|
|
return None
|
|
value = value.strip()
|
|
try:
|
|
parsed = email.utils.parsedate_to_datetime(value)
|
|
if parsed.tzinfo is None:
|
|
parsed = parsed.replace(tzinfo=UTC)
|
|
return parsed.astimezone(UTC).isoformat()
|
|
except (TypeError, ValueError):
|
|
pass
|
|
|
|
try:
|
|
parsed = datetime.fromisoformat(value.replace("Z", "+00:00"))
|
|
if parsed.tzinfo is None:
|
|
parsed = parsed.replace(tzinfo=UTC)
|
|
return parsed.astimezone(UTC).isoformat()
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def _first_child(element: ET.Element, name: str) -> ET.Element | None:
|
|
for child in element:
|
|
if _local_name(child.tag) == name:
|
|
return child
|
|
return None
|
|
|
|
|
|
def _first_text(element: ET.Element, *names: str) -> str | None:
|
|
for child in element:
|
|
if _local_name(child.tag) in names:
|
|
value = _text(child)
|
|
if value:
|
|
return value
|
|
return None
|
|
|
|
|
|
def _text(element: ET.Element) -> str | None:
|
|
if element.text:
|
|
return element.text.strip()
|
|
return None
|
|
|
|
|
|
def _local_name(tag: str) -> str:
|
|
if "}" in tag:
|
|
return tag.rsplit("}", 1)[1]
|
|
return tag
|
|
|