Files
upbeatBytes/goodnews/briefs.py
T
thejayman77 68a401eed6 Fresh server data overrides a pinned brief; pin holds otherwise
Per the agreed model: the brief is server-authoritative and a client Replace is
a soft override that yields when genuinely new data arrives.
- build_daily_brief is now idempotent: if the composed selection is unchanged it
  leaves the brief (and its created_at) alone, so the timer's 15-min rebuilds are
  no-ops when no new data landed.
- /api/brief exposes generated_at (the brief's created_at = a content-change
  stamp). The client pins its view against generated_at and keeps it across plain
  refreshes, but drops it and shows the fresh server brief when generated_at
  advances. Missed stories remain in the mood feeds.

Tests: idempotent rebuild (no-op vs content change) — 93 total.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-31 14:00:08 +00:00

230 lines
8.2 KiB
Python

from __future__ import annotations
import sqlite3
from datetime import date
from .paywall import is_paywalled
def build_daily_brief(
conn: sqlite3.Connection,
brief_date: str | None = None,
limit: int = 7,
replace: bool = False,
window_days: int = 3,
) -> int:
target_date = brief_date or date.today().isoformat()
# Compose the selection first so we can tell whether anything actually
# changed. A calm daily brief shouldn't repeatedly hand the reader a locked
# door: push paywalled candidates below readable ones (stable sort) first.
rows = _candidate_articles(conn, target_date, window_days)
rows = sorted(rows, key=lambda r: is_paywalled(r["canonical_url"]))
selected = _select_diverse(rows, limit)
selected_ids = [row["id"] for row in selected]
existing = conn.execute("SELECT id FROM daily_briefs WHERE brief_date = ?", (target_date,)).fetchone()
if existing:
existing_ids = [
r["article_id"]
for r in conn.execute(
"SELECT article_id FROM daily_brief_items WHERE brief_id = ? ORDER BY rank",
(existing["id"],),
)
]
# Idempotent: if the selection is unchanged, leave the brief (and its
# created_at freshness stamp) alone — a 15-minute rebuild with no new
# data is a no-op, so a reader's pinned view holds.
if existing_ids == selected_ids or not replace:
return int(existing["id"])
conn.execute("DELETE FROM daily_briefs WHERE id = ?", (existing["id"],))
brief_id = conn.execute(
"INSERT INTO daily_briefs (brief_date, title) VALUES (?, ?)",
(target_date, f"Highlights from Today - {target_date}"),
).lastrowid
for index, row in enumerate(selected, start=1):
conn.execute(
"""
INSERT INTO daily_brief_items (brief_id, article_id, rank, selection_reason)
VALUES (?, ?, ?, ?)
""",
(
brief_id,
row["id"],
index,
_selection_reason(row),
),
)
conn.commit()
return int(brief_id)
def show_brief(conn: sqlite3.Connection, brief_date: str | None = None, limit: int = 10) -> list[sqlite3.Row]:
target_date = brief_date or _latest_brief_date(conn)
if not target_date:
return []
return conn.execute(
"""
SELECT
b.brief_date,
bi.rank,
bi.selection_reason,
a.title,
a.description,
a.canonical_url,
a.published_at,
src.name AS source_name,
src.default_category,
s.constructive_score,
s.cortisol_score,
s.ragebait_score,
s.agency_score,
s.human_benefit_score,
s.reason_code,
s.reason_text,
s.model_name
FROM daily_briefs b
JOIN daily_brief_items bi ON bi.brief_id = b.id
JOIN articles a ON a.id = bi.article_id
JOIN sources src ON src.id = a.source_id
LEFT JOIN article_scores s ON s.article_id = a.id
WHERE b.brief_date = ?
ORDER BY bi.rank
LIMIT ?
""",
(target_date, limit),
).fetchall()
def _candidate_articles(
conn: sqlite3.Connection, target_date: str, window_days: int = 3
) -> list[sqlite3.Row]:
"""Brief candidates, sparse-day-proof.
Prefers articles dated on target_date, but widens to the preceding
`window_days` so the brief still fills on slow news days. Anything already
featured in a brief within the last 7 days (other than this same date, which
is being rebuilt) is excluded so backfilled stories cannot linger across
consecutive days.
"""
return conn.execute(
"""
SELECT
a.id,
a.title,
a.description,
a.canonical_url,
a.published_at,
a.discovered_at,
src.name AS source_name,
src.default_category,
src.trust_score,
s.constructive_score,
s.cortisol_score,
s.ragebait_score,
s.agency_score,
s.human_benefit_score,
s.novelty_score,
s.pr_risk_score,
s.reason_code,
s.reason_text,
s.model_name,
s.topic,
s.flavor,
CASE WHEN date(COALESCE(a.published_at, a.discovered_at)) = date(?)
THEN 1 ELSE 0 END AS is_today
FROM articles a
JOIN sources src ON src.id = a.source_id
JOIN article_scores s ON s.article_id = a.id
WHERE s.accepted = 1
AND a.duplicate_of IS NULL
AND date(COALESCE(a.published_at, a.discovered_at)) <= date(?)
AND date(COALESCE(a.published_at, a.discovered_at)) > date(?, '-' || ? || ' days')
AND a.id NOT IN (
SELECT bi.article_id
FROM daily_brief_items bi
JOIN daily_briefs b ON b.id = bi.brief_id
WHERE b.brief_date <> ?
AND b.brief_date <= date(?)
AND b.brief_date > date(?, '-7 days')
)
ORDER BY
is_today DESC,
(s.constructive_score + s.agency_score + s.human_benefit_score + src.trust_score
- s.cortisol_score - s.ragebait_score - s.pr_risk_score) DESC,
COALESCE(a.published_at, a.discovered_at) DESC
LIMIT 50
""",
(target_date, target_date, target_date, window_days, target_date, target_date, target_date),
).fetchall()
def _select_diverse(rows: list[sqlite3.Row], limit: int) -> list[sqlite3.Row]:
"""Pick up to `limit` items for the daily brief (rows ranked best-first).
The daily five should feel like *good news*, not a research digest, so the
emotional mix is guarded — not just topic count:
- at most 1 health item,
- at most 2 science+health items combined,
- at most 2 of any single topic,
- distinct sources.
Because science/health are capped at 2 combined, at least three of the five
are community/culture/animals/environment whenever those exist — so the page
leads with breadth, not clustered medical/science breakthroughs.
Caps are relaxed (topic first, then source) only as needed to still fill the
count on thin days; we never return fewer when candidates exist.
"""
selected: list[sqlite3.Row] = []
selected_ids: set = set()
seen_sources: set = set()
topic_count: dict = {}
def add(row: sqlite3.Row) -> None:
selected.append(row)
selected_ids.add(row["id"])
seen_sources.add(row["source_name"])
topic_count[row["topic"]] = topic_count.get(row["topic"], 0) + 1
def emotional_mix_ok(row: sqlite3.Row) -> bool:
topic = row["topic"]
health = topic_count.get("health", 0)
science = topic_count.get("science", 0)
if topic == "health" and health >= 1:
return False
if topic in ("science", "health") and (science + health) >= 2:
return False
return topic_count.get(topic, 0) < 2
def fill(enforce_mix: bool, enforce_source: bool) -> None:
for row in rows:
if len(selected) >= limit:
return
if row["id"] in selected_ids:
continue
if enforce_source and row["source_name"] in seen_sources:
continue
if enforce_mix and not emotional_mix_ok(row):
continue
add(row)
fill(enforce_mix=True, enforce_source=True) # balanced mix, distinct sources
fill(enforce_mix=False, enforce_source=True) # relax the mix caps to fill
fill(enforce_mix=False, enforce_source=False) # relax source too, last resort
return selected
def _selection_reason(row: sqlite3.Row) -> str:
return (
f"{row['reason_code']}; constructive={row['constructive_score']}, "
f"agency={row['agency_score']}, human_benefit={row['human_benefit_score']}, "
f"cortisol={row['cortisol_score']}, source={row['source_name']}"
)
def _latest_brief_date(conn: sqlite3.Connection) -> str | None:
row = conn.execute("SELECT brief_date FROM daily_briefs ORDER BY brief_date DESC LIMIT 1").fetchone()
return row["brief_date"] if row else None