Files
upbeatBytes/goodnews/queries.py
T
thejayman77 01de5a3ef0 source_health: next_due_at = later of streak-backoff and retry_after_at
Per Codex: the Next poll column computed only the streak-backoff time, so a
rate-limited source could show an earlier Next poll than the real gate (which
also requires retry_after_at <= now). Take the later of the two in the Python
post-process so the admin table agrees with due_source_rows.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-09 11:45:54 -04:00

553 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Read-only query helpers over the goodNews database.
Pure stdlib and framework-agnostic: returns plain dicts so the same functions
back both the CLI and the JSON API. All article output is metadata + a link to
the original source — never stored bodies.
"""
from __future__ import annotations
import sqlite3
from .feeds import MAX_BACKOFF_MINUTES
# Composite ranking used everywhere a "best first" order is needed. Kept as one
# expression so brief, category feeds, and the API all rank identically.
RANK_SCORE_SQL = (
"(s.constructive_score + s.agency_score + s.human_benefit_score + src.trust_score "
"- s.cortisol_score - s.ragebait_score - s.pr_risk_score)"
)
_ARTICLE_COLUMNS = f"""
a.id,
a.title,
a.description,
a.canonical_url,
a.published_at,
a.image_url,
a.source_id,
src.name AS source_name,
s.topic,
s.flavor,
s.accepted,
s.constructive_score,
s.cortisol_score,
s.ragebait_score,
s.agency_score,
s.human_benefit_score,
s.pr_risk_score,
s.reason_code,
s.reason_text,
s.model_name,
(SELECT group_concat(t.tag) FROM article_tags t WHERE t.article_id = a.id) AS tags,
{RANK_SCORE_SQL} AS rank_score
"""
def feed(
conn: sqlite3.Connection,
topic: str | None = None,
flavor: str | None = None,
accepted_only: bool = True,
limit: int = 30,
offset: int = 0,
include_topics: list[str] | None = None,
include_flavors: list[str] | None = None,
mute_topics: list[str] | None = None,
mute_flavors: list[str] | None = None,
max_cortisol: int | None = None,
max_ragebait: int | None = None,
tag: str | None = None,
source_id: int | None = None,
sort: str = "ranked",
) -> list[dict]:
"""Return articles with categorical filters applied in SQL.
sort="ranked" (default) is best-first by the composite rank; sort="latest"
is pure recency (newest first) for the chronological "Latest" feed. Both
stay accepted-only and respect the same boundaries.
Categorical filters (topic/flavor include & mute, cortisol/ragebait ceilings)
must be applied here, not after ranking — otherwise low-ranked-but-matching
items (e.g. 'discovery' for a Wonder lane) fall outside any over-fetch window.
Word-boundary avoid-terms remain a Python pass on the caller side.
"""
clauses = ["a.duplicate_of IS NULL", "src.content_visible = 1"]
params: list = []
if accepted_only:
clauses.append("s.accepted = 1")
if topic:
clauses.append("s.topic = ?")
params.append(topic.lower())
if flavor:
clauses.append("s.flavor = ?")
params.append(flavor.lower())
def _in(column: str, values: list[str], negate: bool = False) -> None:
vals = [v.lower() for v in values]
placeholders = ",".join("?" * len(vals))
op = "NOT IN" if negate else "IN"
# COALESCE keeps NULL-category rows from being dropped by NOT IN.
clauses.append(f"COALESCE({column}, '') {op} ({placeholders})")
params.extend(vals)
if include_topics:
_in("s.topic", include_topics)
if include_flavors:
_in("s.flavor", include_flavors)
if mute_topics:
_in("s.topic", mute_topics, negate=True)
if mute_flavors:
_in("s.flavor", mute_flavors, negate=True)
if max_cortisol is not None:
clauses.append("COALESCE(s.cortisol_score, 0) <= ?")
params.append(max_cortisol)
if max_ragebait is not None:
clauses.append("COALESCE(s.ragebait_score, 0) <= ?")
params.append(max_ragebait)
if tag:
clauses.append("EXISTS (SELECT 1 FROM article_tags at WHERE at.article_id = a.id AND at.tag = ?)")
params.append(tag.lower())
if source_id:
clauses.append("a.source_id = ?")
params.append(source_id)
where = "WHERE " + " AND ".join(clauses)
params.extend([limit, offset])
order_by = (
"COALESCE(a.published_at, a.discovered_at) DESC, rank_score DESC"
if sort == "latest"
else "rank_score DESC, COALESCE(a.published_at, a.discovered_at) DESC"
)
rows = conn.execute(
f"""
SELECT {_ARTICLE_COLUMNS}
FROM articles a
JOIN sources src ON src.id = a.source_id
JOIN article_scores s ON s.article_id = a.id
{where}
ORDER BY {order_by}
LIMIT ? OFFSET ?
""",
params,
).fetchall()
return [dict(row) for row in rows]
def brief(conn: sqlite3.Connection, brief_date: str | None = None, limit: int = 10) -> dict:
"""Return a stored daily brief (latest if no date) with its ranked items."""
target_date = brief_date or _latest_brief_date(conn)
if not target_date:
return {"brief_date": None, "title": None, "items": []}
header = conn.execute(
"SELECT brief_date, title, created_at FROM daily_briefs WHERE brief_date = ?",
(target_date,),
).fetchone()
if not header:
return {"brief_date": target_date, "title": None, "created_at": None, "items": []}
rows = conn.execute(
f"""
SELECT bi.rank, bi.selection_reason, {_ARTICLE_COLUMNS},
(SELECT summary FROM article_summaries WHERE article_id = a.id) AS summary
FROM daily_briefs b
JOIN daily_brief_items bi ON bi.brief_id = b.id
JOIN articles a ON a.id = bi.article_id
JOIN sources src ON src.id = a.source_id
LEFT JOIN article_scores s ON s.article_id = a.id
WHERE b.brief_date = ? AND src.content_visible = 1
ORDER BY bi.rank
LIMIT ?
""",
(target_date, limit),
).fetchall()
return {
"brief_date": header["brief_date"],
"title": header["title"],
"created_at": header["created_at"],
"items": [dict(row) for row in rows],
}
def saved(conn: sqlite3.Connection, user_id: int, limit: int = 200) -> list[dict]:
"""Articles a user has saved, newest first (same shape as the feed)."""
rows = conn.execute(
f"""
SELECT {_ARTICLE_COLUMNS}
FROM saved_articles sv
JOIN articles a ON a.id = sv.article_id
JOIN sources src ON src.id = a.source_id
LEFT JOIN article_scores s ON s.article_id = a.id
WHERE sv.user_id = ?
ORDER BY sv.saved_at DESC
LIMIT ?
""",
(user_id, limit),
).fetchall()
return [dict(row) for row in rows]
def saved_ids(conn: sqlite3.Connection, user_id: int) -> list[int]:
return [r[0] for r in conn.execute(
"SELECT article_id FROM saved_articles WHERE user_id = ?", (user_id,)
)]
def history(conn: sqlite3.Connection, user_id: int, limit: int = 200) -> list[dict]:
"""Articles in a user's account history, most-recent first."""
rows = conn.execute(
f"""
SELECT {_ARTICLE_COLUMNS}, MAX(h.at) AS seen_at
FROM user_history h
JOIN articles a ON a.id = h.article_id
JOIN sources src ON src.id = a.source_id
LEFT JOIN article_scores s ON s.article_id = a.id
WHERE h.user_id = ?
GROUP BY a.id
ORDER BY seen_at DESC
LIMIT ?
""",
(user_id, limit),
).fetchall()
return [dict(row) for row in rows]
def content_stats(conn: sqlite3.Connection) -> dict:
"""Corpus / serving health for the dashboard: how much good news is live."""
def scalar(sql, params=()):
return conn.execute(sql, params).fetchone()[0] or 0
served = scalar(
"SELECT COUNT(*) FROM article_scores s JOIN articles a ON a.id=s.article_id "
"WHERE s.accepted=1 AND a.duplicate_of IS NULL"
)
accepted_7d = scalar(
"SELECT COUNT(*) FROM article_scores s JOIN articles a ON a.id=s.article_id "
"WHERE s.accepted=1 AND a.duplicate_of IS NULL "
"AND COALESCE(a.published_at, a.discovered_at) >= datetime('now','-7 days')"
)
brief = conn.execute(
"SELECT brief_date, (SELECT COUNT(*) FROM daily_brief_items WHERE brief_id=daily_briefs.id) n "
"FROM daily_briefs ORDER BY brief_date DESC LIMIT 1"
).fetchone()
with_image = scalar(
"SELECT COUNT(*) FROM article_scores s JOIN articles a ON a.id=s.article_id "
"WHERE s.accepted=1 AND a.duplicate_of IS NULL AND a.image_url IS NOT NULL AND a.image_url!=''"
)
# Coverage is scoped to LIVE articles (accepted, non-duplicate) so the
# percentages can't drift past 100% as rejected/duplicate rows accrue summaries.
summaries = scalar(
"SELECT COUNT(*) FROM article_summaries m JOIN articles a ON a.id=m.article_id "
"JOIN article_scores s ON s.article_id=a.id WHERE s.accepted=1 AND a.duplicate_of IS NULL"
)
summaries_with_image = scalar(
"SELECT COUNT(*) FROM article_summaries m JOIN articles a ON a.id=m.article_id "
"JOIN article_scores s ON s.article_id=a.id WHERE s.accepted=1 AND a.duplicate_of IS NULL "
"AND a.image_url IS NOT NULL AND a.image_url!=''"
)
brief_with_image = 0
if brief:
brief_with_image = scalar(
"SELECT COUNT(*) FROM daily_brief_items bi JOIN articles a ON a.id=bi.article_id "
"JOIN daily_briefs b ON b.id=bi.brief_id "
"WHERE b.brief_date=? AND a.image_url IS NOT NULL AND a.image_url!=''",
(brief["brief_date"],),
)
return {
"served": served,
"total": scalar("SELECT COUNT(*) FROM articles"),
"rejected": scalar("SELECT COUNT(*) FROM article_scores WHERE accepted=0"),
"accepted_7d": accepted_7d,
"added_24h": scalar("SELECT COUNT(*) FROM articles WHERE discovered_at >= datetime('now','-1 day')"),
"added_7d": scalar("SELECT COUNT(*) FROM articles WHERE discovered_at >= datetime('now','-7 days')"),
"summaries": summaries,
"summaries_with_image": summaries_with_image,
"with_image": with_image,
# Accepted, imageless articles whose enrichment was tried recently and
# came up empty (no usable og:image) — the image "misses" to watch.
"recent_enrich_fail": scalar(
"SELECT COUNT(*) FROM article_scores s JOIN articles a ON a.id=s.article_id "
"WHERE s.accepted=1 AND a.duplicate_of IS NULL AND (a.image_url IS NULL OR a.image_url='') "
"AND a.image_checked_at >= datetime('now','-1 day')"
),
"active_sources": scalar("SELECT COUNT(*) FROM sources WHERE active=1"),
"total_sources": scalar("SELECT COUNT(*) FROM sources"),
"latest_brief_date": brief["brief_date"] if brief else None,
"latest_brief_size": brief["n"] if brief else 0,
"brief_with_image": brief_with_image,
}
def source_health(conn: sqlite3.Connection) -> list[dict]:
"""Per source (active, paused, AND retired), the data an operator needs to manage feeds:
failure streak, last error/success/attempt, computed next poll, and the
backing metrics (served/accepted/total counts + acceptance & duplicate rates)
so Pause/Flag decisions aren't vibes-based.
next_due_at = last attempt + MIN(cap, interval * (1 + consecutive_failures)),
mirroring feeds.due_source_rows; NULL last attempt means "due now". Paused
sources are included (their articles stay live; only polling stops).
"""
rows = conn.execute(
"""
SELECT
s.id, s.name, s.default_category AS category, s.active,
s.status, s.content_visible, s.retry_after_at,
s.consecutive_failures AS failures, s.review_flag, s.review_reason,
s.poll_interval_minutes AS interval_minutes,
s.last_success_at, s.last_error_at, substr(s.last_error, 1, 160) AS last_error,
(SELECT MAX(r.finished_at) FROM ingest_runs r
WHERE r.source_id = s.id AND r.finished_at IS NOT NULL) AS last_attempt,
(SELECT COUNT(*) FROM articles a WHERE a.source_id = s.id) AS total_articles,
(SELECT COUNT(*) FROM articles a JOIN article_scores sc ON sc.article_id = a.id
WHERE a.source_id = s.id AND sc.accepted = 1) AS accepted_total,
(SELECT COUNT(*) FROM articles a WHERE a.source_id = s.id AND a.duplicate_of IS NOT NULL) AS duplicates,
(SELECT COUNT(*) FROM articles a JOIN article_scores sc ON sc.article_id = a.id
WHERE a.source_id = s.id AND sc.accepted = 1 AND a.duplicate_of IS NULL) AS served,
datetime(
(SELECT MAX(r.finished_at) FROM ingest_runs r
WHERE r.source_id = s.id AND r.finished_at IS NOT NULL),
'+' || MIN(?, s.poll_interval_minutes * (1 + s.consecutive_failures)) || ' minutes'
) AS next_due_at
FROM sources s
ORDER BY s.active DESC, s.consecutive_failures DESC, served DESC, s.name
""",
(MAX_BACKOFF_MINUTES,),
).fetchall()
out = []
for r in rows:
d = dict(r)
total = d["total_articles"] or 0
accepted = d["accepted_total"] or 0
d["acceptance_rate"] = round(100 * accepted / total) if total else None
d["duplicate_rate"] = round(100 * d["duplicates"] / total) if total else None
# Curation quality: of what this source got ACCEPTED, how much was a
# duplicate of content already served (accepted_total served = accepted dupes).
d["accepted_dup_rate"] = round(100 * (accepted - d["served"]) / accepted) if accepted else None
# Match the REAL scheduler gate: due = the later of the streak-backoff time
# and any retry_after_at rest (UTC strings sort chronologically).
due_times = [t for t in (d["next_due_at"], d["retry_after_at"]) if t]
d["next_due_at"] = max(due_times) if due_times else None
out.append(d)
return out
def _attention(content: dict, sources: list[dict], feedback_unread: int) -> list[dict]:
"""The 'Attention Needed' strip: what an operator should look at, soft-toned
(warn = act soon, info = worth a glance). Derived from the same data shown
elsewhere, so it never disagrees with the detail sections."""
items: list[dict] = []
n = lambda c: "" if c == 1 else "s" # noqa: E731 — tiny pluralizer
resting = [s for s in sources if s.get("active") and (s.get("failures") or 0) > 0]
if resting:
items.append({"level": "warn", "text": f"{len(resting)} source{n(len(resting))} backing off after failures"})
flagged = [s for s in sources if s.get("review_flag")]
if flagged:
items.append({"level": "warn", "text": f"{len(flagged)} source{n(len(flagged))} flagged for review"})
served = content.get("served") or 0
with_image = content.get("with_image") or 0
if served and (with_image / served) < 0.70:
items.append({"level": "info", "text": f"Image coverage at {round(100 * with_image / served)}% (aim for 70%+)"})
brief_size = content.get("latest_brief_size") or 0
if brief_size < 7:
items.append({"level": "info", "text": f"Today's brief has only {brief_size} item{n(brief_size)}"})
if feedback_unread:
items.append({"level": "info", "text": f"{feedback_unread} unread feedback message{n(feedback_unread)}"})
return items
def admin_stats(conn: sqlite3.Connection, days: int = 30) -> dict:
"""Aggregate, non-personal usage stats for the admin dashboard."""
since = f"-{days} days"
def scalar(sql, params=()):
return conn.execute(sql, params).fetchone()[0] or 0
visitors = {
"today": scalar("SELECT COUNT(DISTINCT visitor_hash) FROM events "
"WHERE kind='visit' AND visitor_hash!='' AND day=date('now')"),
"d7": scalar("SELECT COUNT(DISTINCT visitor_hash) FROM events "
"WHERE kind='visit' AND visitor_hash!='' AND day>=date('now','-7 days')"),
"d30": scalar("SELECT COUNT(DISTINCT visitor_hash) FROM events "
"WHERE kind='visit' AND visitor_hash!='' AND day>=date('now',?)", (since,)),
}
# Returning (seen on ≥2 distinct days) vs one-and-done, over the window.
rows = conn.execute(
"SELECT CASE WHEN d>=2 THEN 'returning' ELSE 'once' END g, COUNT(*) n FROM ("
" SELECT visitor_hash, COUNT(DISTINCT day) d FROM events "
" WHERE kind='visit' AND visitor_hash!='' AND day>=date('now',?) GROUP BY visitor_hash"
") GROUP BY g", (since,),
).fetchall()
loyalty = {r["g"]: r["n"] for r in rows}
# An "open" = opening the article via our summary page (legacy 'open' + 'summary_viewed').
OPEN = "e.kind IN ('open','summary_viewed')"
top_articles = [dict(r) for r in conn.execute(
f"SELECT e.article_id AS id, a.title, src.name AS source, COUNT(*) AS opens "
f"FROM events e JOIN articles a ON a.id=e.article_id JOIN sources src ON src.id=a.source_id "
f"WHERE {OPEN} AND e.article_id>0 AND e.day>=date('now',?) "
f"GROUP BY e.article_id ORDER BY opens DESC LIMIT 12", (since,),
)]
top_groupings = [dict(r) for r in conn.execute(
f"SELECT t.tag, COUNT(*) AS opens FROM events e JOIN article_tags t ON t.article_id=e.article_id "
f"WHERE {OPEN} AND e.day>=date('now',?) GROUP BY t.tag ORDER BY opens DESC LIMIT 12", (since,),
)]
top_topics = [dict(r) for r in conn.execute(
f"SELECT s.topic, COUNT(*) AS opens FROM events e JOIN article_scores s ON s.article_id=e.article_id "
f"WHERE {OPEN} AND s.topic IS NOT NULL AND e.day>=date('now',?) "
f"GROUP BY s.topic ORDER BY opens DESC", (since,),
)]
# Counts per event kind over the window (each = distinct visitor-days, by dedup).
kc_rows = conn.execute(
"SELECT kind, COUNT(*) n FROM events WHERE day>=date('now',?) GROUP BY kind", (since,)
).fetchall()
kc = {r["kind"]: r["n"] for r in kc_rows}
shares = {k: kc.get(k, 0) for k in ("share_ub", "copy_source", "native_share", "source_click")}
summary_views = kc.get("summary_viewed", 0)
source_clicks = kc.get("source_click", 0)
funnel = {
"summary_viewed": summary_views,
"source_click": source_clicks,
"full_story": kc.get("full_story", 0),
"source_rate": round(100 * source_clicks / summary_views) if summary_views else 0,
}
emotional_mix = {
"not_today": kc.get("not_today", 0),
"less_like_this": kc.get("less_like_this", 0),
"hide_topic": kc.get("hide_topic", 0),
}
paywall = {
"paywall_replace": kc.get("paywall_replace", 0),
"paywalled_source_open": kc.get("paywalled_source_open", 0),
}
replace = {"used": kc.get("replace_used", 0), "none": kc.get("replace_none", 0)}
# Accounts — aggregate counts only (no emails, no per-user listing).
accounts = {
"total": scalar("SELECT COUNT(*) FROM users"),
"new_today": scalar("SELECT COUNT(*) FROM users WHERE date(created_at)=date('now')"),
"new_7d": scalar("SELECT COUNT(*) FROM users WHERE created_at>=date('now','-7 days')"),
"new_30d": scalar("SELECT COUNT(*) FROM users WHERE created_at>=date('now',?)", (since,)),
"active_7d": scalar("SELECT COUNT(DISTINCT user_id) FROM sessions WHERE last_seen_at>=date('now','-7 days')"),
}
# Returning-visitor buckets by distinct active days in the window.
bucket_rows = conn.execute(
"SELECT CASE WHEN d>=8 THEN '8+' WHEN d>=4 THEN '4-7' WHEN d>=2 THEN '2-3' ELSE 'new' END b, "
"COUNT(*) n FROM (SELECT visitor_hash, COUNT(DISTINCT day) d FROM events "
"WHERE kind='visit' AND visitor_hash!='' AND day>=date('now',?) GROUP BY visitor_hash) GROUP BY b",
(since,),
).fetchall()
buckets = {r["b"]: r["n"] for r in bucket_rows}
retention = {k: buckets.get(k, 0) for k in ("new", "2-3", "4-7", "8+")}
daily = [dict(r) for r in conn.execute(
"SELECT day, SUM(kind IN ('open','summary_viewed')) AS opens, SUM(kind='visit') AS visits "
"FROM events WHERE day>=date('now',?) GROUP BY day ORDER BY day", (since,),
)]
content = content_stats(conn)
sources = source_health(conn)
feedback_7d = scalar("SELECT COUNT(*) FROM feedback WHERE created_at >= datetime('now','-7 days')")
feedback_unread = scalar("SELECT COUNT(*) FROM feedback WHERE read_at IS NULL")
return {
"days": days,
"content": content,
"sources": sources,
"attention": _attention(content, sources, feedback_unread),
"feedback_7d": feedback_7d,
"feedback_unread": feedback_unread,
"visitors": visitors,
"returning": loyalty.get("returning", 0),
"once": loyalty.get("once", 0),
"retention": retention,
"accounts": accounts,
"funnel": funnel,
"emotional_mix": emotional_mix,
"paywall": paywall,
"replace": replace,
"top_articles": top_articles,
"top_groupings": top_groupings,
"top_topics": top_topics,
"shares": shares,
"daily": daily,
}
def existing_article_ids(conn: sqlite3.Connection, ids: list[int]) -> list[int]:
"""Filter to ids that still exist (FK-safe inserts for save/history/import)."""
clean = list({int(i) for i in ids})[:1000]
if not clean:
return []
placeholders = ",".join("?" * len(clean))
return [r[0] for r in conn.execute(
f"SELECT id FROM articles WHERE id IN ({placeholders})", clean
)]
def tag_counts(conn: sqlite3.Connection, accepted_only: bool = True) -> dict:
"""How many shown (accepted, non-duplicate) articles carry each grouping tag."""
where = "WHERE a.duplicate_of IS NULL" + (" AND s.accepted = 1" if accepted_only else "")
rows = conn.execute(
f"""
SELECT t.tag, COUNT(*) AS count
FROM article_tags t
JOIN articles a ON a.id = t.article_id
JOIN article_scores s ON s.article_id = a.id
{where}
GROUP BY t.tag
"""
).fetchall()
return {r["tag"]: r["count"] for r in rows}
def category_counts(conn: sqlite3.Connection, accepted_only: bool = True) -> list[dict]:
"""Return per topic/flavor article counts for building browse UIs.
Joins articles and excludes duplicates so the counts match exactly what the
feed endpoint will actually return for each topic/flavor.
"""
clauses = ["a.duplicate_of IS NULL"]
clauses.append("s.accepted = 1" if accepted_only else "s.topic IS NOT NULL")
rows = conn.execute(
f"""
SELECT s.topic, s.flavor, COUNT(*) AS count
FROM article_scores s
JOIN articles a ON a.id = s.article_id
WHERE {" AND ".join(clauses)}
GROUP BY s.topic, s.flavor
ORDER BY s.topic, s.flavor
"""
).fetchall()
return [dict(row) for row in rows]
def available_dates(conn: sqlite3.Connection, limit: int = 30) -> list[str]:
rows = conn.execute(
"SELECT brief_date FROM daily_briefs ORDER BY brief_date DESC LIMIT ?",
(limit,),
).fetchall()
return [row["brief_date"] for row in rows]
def _latest_brief_date(conn: sqlite3.Connection) -> str | None:
row = conn.execute(
"SELECT brief_date FROM daily_briefs ORDER BY brief_date DESC LIMIT 1"
).fetchone()
return row["brief_date"] if row else None