Files
upbeatBytes/goodnews/queries.py
T
thejayman77 0d21231597 news: hard-exclude paywalled sources from the feed + brief (no unreadable news)
Per Jay: don't surface stories people can't read without paying — it's off-brand
("no paywalls") and pointless. Paywalled is source-level (domain rule, admin-
overridable): just 3 sources today (Nature, New Scientist, MIT Tech Review),
~5.4% of accepted articles.

- queries.paywalled_source_ids(conn): live source set (admin override wins).
- queries.feed gains include_paywalled=False (default) → adds `a.source_id NOT IN
  (…)`. One chokepoint covers Latest/tags/sources/moods/topics/search/since AND
  the brief top-up. Source-level + SQL → paging stays exact, no frontend change.
- brief(): filter the cached/home pool by the same rule; replacement already
  avoids paywalled and now rides the feed exclusion too.
- Dropped the now-moot "paywalled below readable" demotion sort.
- Saved/history keep showing items you saved (their own queries, not excluded).
- test_source_paywall_override updated: paywalled source → excluded from the feed
  (was: shown with a badge); 'free' override → returns, no badge. 418 tests green.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-28 17:10:00 -04:00

952 lines
44 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Read-only query helpers over the goodNews database.
Pure stdlib and framework-agnostic: returns plain dicts so the same functions
back both the CLI and the JSON API. All article output is metadata + a link to
the original source — never stored bodies.
"""
from __future__ import annotations
import sqlite3
from datetime import UTC, datetime, timedelta
from .feeds import MAX_BACKOFF_MINUTES
from .localtime import local_now
from .paywall import is_paywalled, is_paywalled_for_source
# UA substrings that mark automated clients. Crawlers run JS on a throttled
# budget and trip the boot-failure beacon routinely — without this filter they
# read as real users seeing blank screens.
BOT_UA_MARKS = ("headlesschrome", "bot", "spider", "crawl", "python", "curl", "wget", "phantomjs")
_NOT_BOT_SQL = " AND ".join(f"instr(lower(user_agent), '{m}')=0" for m in BOT_UA_MARKS)
def is_bot_ua(ua: str | None) -> bool:
low = (ua or "").lower()
return any(m in low for m in BOT_UA_MARKS)
# Composite ranking used everywhere a "best first" order is needed. Kept as one
# expression so brief, category feeds, and the API all rank identically.
RANK_SCORE_SQL = (
"(s.constructive_score + s.agency_score + s.human_benefit_score + src.trust_score "
"- s.cortisol_score - s.ragebait_score - s.pr_risk_score)"
)
_ARTICLE_COLUMNS = f"""
a.id,
a.title,
a.description,
a.canonical_url,
a.published_at,
a.image_url,
a.source_id,
src.name AS source_name,
s.topic,
s.flavor,
s.accepted,
s.constructive_score,
s.cortisol_score,
s.ragebait_score,
s.agency_score,
s.human_benefit_score,
s.pr_risk_score,
s.reason_code,
s.reason_text,
s.model_name,
src.paywall_override AS paywall_override,
a.source_words,
(SELECT group_concat(t.tag) FROM article_tags t WHERE t.article_id = a.id) AS tags,
{RANK_SCORE_SQL} AS rank_score
"""
def paywalled_source_ids(conn: sqlite3.Connection) -> list[int]:
"""Source ids whose stories are paywalled — the domain rule (PAYWALL_DOMAINS),
overridable per source in admin. Computed live so an admin flag takes effect at
once. Small set (a handful of sources), so the lookup is cheap."""
rows = conn.execute(
"SELECT id, homepage_url, feed_url, paywall_override FROM sources"
).fetchall()
return [
r["id"] for r in rows
if is_paywalled_for_source((r["homepage_url"] or r["feed_url"]), r["paywall_override"])
]
def feed(
conn: sqlite3.Connection,
topic: str | None = None,
flavor: str | None = None,
accepted_only: bool = True,
limit: int = 30,
offset: int = 0,
include_topics: list[str] | None = None,
include_flavors: list[str] | None = None,
mute_topics: list[str] | None = None,
mute_flavors: list[str] | None = None,
max_cortisol: int | None = None,
max_ragebait: int | None = None,
tag: str | None = None,
source_id: int | None = None,
sort: str = "ranked",
follow_sources: list[int] | None = None,
follow_tags: list[str] | None = None,
since: str | None = None,
match: str | None = None,
home_country: str | None = None,
home_state: str | None = None,
geo_scope: str | None = None, # 'near' | 'country' | 'world' relative to the reader's home
include_paywalled: bool = False, # default: hide paywalled-source stories (no unreadable news in the feed)
) -> list[dict]:
"""Return articles with categorical filters applied in SQL.
sort="ranked" (default) is best-first by the composite rank; sort="latest"
is pure recency (newest first) for the chronological "Latest" feed. Both
stay accepted-only and respect the same boundaries.
Categorical filters (topic/flavor include & mute, cortisol/ragebait ceilings)
must be applied here, not after ranking — otherwise low-ranked-but-matching
items (e.g. 'discovery' for a Wonder lane) fall outside any over-fetch window.
Word-boundary avoid-terms remain a Python pass on the caller side.
"""
clauses = ["a.duplicate_of IS NULL", "src.content_visible = 1"]
params: list = []
# Full-text search: join the FTS index and MATCH first, so its bound param
# leads and relevance can drive the ordering. All the boundary clauses below
# still apply, so search mirrors exactly what the visitor feed would show.
fts_join = ""
if match:
fts_join = "JOIN article_search ON article_search.article_id = a.id"
clauses.append("article_search MATCH ?")
params.append(match)
# Hard-exclude paywalled sources (admin-overridable). Added after MATCH so the FTS
# bound param keeps leading; the NOT IN list (a handful of ids) follows.
if not include_paywalled:
pwx = paywalled_source_ids(conn)
if pwx:
clauses.append("a.source_id NOT IN (%s)" % ",".join("?" * len(pwx)))
params.extend(pwx)
if accepted_only:
clauses.append("s.accepted = 1")
if topic:
clauses.append("s.topic = ?")
params.append(topic.lower())
if flavor:
clauses.append("s.flavor = ?")
params.append(flavor.lower())
def _in(column: str, values: list[str], negate: bool = False) -> None:
vals = [v.lower() for v in values]
placeholders = ",".join("?" * len(vals))
op = "NOT IN" if negate else "IN"
# COALESCE keeps NULL-category rows from being dropped by NOT IN.
clauses.append(f"COALESCE({column}, '') {op} ({placeholders})")
params.extend(vals)
if include_topics:
_in("s.topic", include_topics)
if include_flavors:
_in("s.flavor", include_flavors)
if mute_topics:
_in("s.topic", mute_topics, negate=True)
if mute_flavors:
_in("s.flavor", mute_flavors, negate=True)
if max_cortisol is not None:
clauses.append("COALESCE(s.cortisol_score, 0) <= ?")
params.append(max_cortisol)
if max_ragebait is not None:
clauses.append("COALESCE(s.ragebait_score, 0) <= ?")
params.append(max_ragebait)
if tag:
clauses.append("EXISTS (SELECT 1 FROM article_tags at WHERE at.article_id = a.id AND at.tag = ?)")
params.append(tag.lower())
if source_id:
clauses.append("a.source_id = ?")
params.append(source_id)
if since:
# "New since last visit": articles discovered after the reader's last visit.
clauses.append("a.discovered_at > ?")
params.append(since)
# "Following" feed: articles from a followed source OR carrying a followed tag.
# Passing either list (even empty) switches to following mode; no follows → none.
if follow_sources is not None or follow_tags is not None:
ors: list[str] = []
for sid in follow_sources or []:
params.append(sid)
if follow_sources:
ors.append(f"a.source_id IN ({','.join('?' * len(follow_sources))})")
ftags = [t.lower() for t in (follow_tags or [])]
if ftags:
ors.append(
f"EXISTS (SELECT 1 FROM article_tags at WHERE at.article_id = a.id "
f"AND at.tag IN ({','.join('?' * len(ftags))}))"
)
params.extend(ftags)
clauses.append("(" + " OR ".join(ors) + ")" if ors else "0")
# Home-aware scoping for "Closer to Home" (server-side). Relative to the reader's
# chosen home; geo_scope=None leaves the feed exactly as it is today. A STATE match
# only counts when geo confidence is high/medium (don't surface "Near you" on a
# shaky location). Untagged articles have no places, so they land in 'world' — never
# lost while the backfill is still running.
if geo_scope == "near":
# Anything elevated as "Near you" / "Close to home" requires high/medium geo
# confidence — the feature's promise is relevance, so don't surface shaky locals.
if home_state and home_country:
clauses.append(
"g.confidence IN ('high','medium') AND EXISTS (SELECT 1 FROM article_places p "
"WHERE p.article_id = a.id AND p.country_code = ? AND p.state_code = ?)")
params.extend([home_country, home_state])
elif home_country:
clauses.append(
"g.confidence IN ('high','medium') AND EXISTS (SELECT 1 FROM article_places p "
"WHERE p.article_id = a.id AND p.country_code = ?)")
params.append(home_country)
elif geo_scope == "country" and home_country:
clauses.append("EXISTS (SELECT 1 FROM article_places p WHERE p.article_id = a.id AND p.country_code = ?)")
params.append(home_country)
if home_state:
# "elsewhere in your country" excludes ONLY what actually went to "near" (a
# high/medium-confidence home-state match). A low-confidence home-state story
# isn't near, so it must still surface here, not vanish between tiers.
clauses.append(
"NOT (g.confidence IN ('high','medium') AND EXISTS (SELECT 1 FROM article_places p2 "
"WHERE p2.article_id = a.id AND p2.country_code = ? AND p2.state_code = ?))")
params.extend([home_country, home_state])
elif geo_scope == "world" and home_country:
if home_state:
# State mode: the "country" tier catches all home-country stories (incl.
# low-confidence ones), so world is simply everything outside your country.
clauses.append("NOT EXISTS (SELECT 1 FROM article_places p WHERE p.article_id = a.id AND p.country_code = ?)")
params.append(home_country)
else:
# Country-only mode has no "country" tier, so a LOW-confidence home-country
# story isn't "near" and must land here rather than vanish between tiers.
clauses.append(
"NOT (g.confidence IN ('high','medium') AND EXISTS (SELECT 1 FROM article_places p "
"WHERE p.article_id = a.id AND p.country_code = ?))")
params.append(home_country)
where = "WHERE " + " AND ".join(clauses)
params.extend([limit, offset])
if match:
order_by = "bm25(article_search), COALESCE(a.published_at, a.discovered_at) DESC" # relevance, then recency
elif sort == "latest":
order_by = "COALESCE(a.published_at, a.discovered_at) DESC, rank_score DESC"
else:
order_by = "rank_score DESC, COALESCE(a.published_at, a.discovered_at) DESC"
rows = conn.execute(
f"""
SELECT {_ARTICLE_COLUMNS},
g.breadth AS geo_breadth, g.confidence AS geo_confidence,
(SELECT group_concat(
p.country_code || CASE WHEN p.state_code IS NOT NULL THEN '-' || p.state_code ELSE '' END, ',')
FROM article_places p WHERE p.article_id = a.id) AS geo_places
FROM articles a
JOIN sources src ON src.id = a.source_id
JOIN article_scores s ON s.article_id = a.id
LEFT JOIN article_geo g ON g.article_id = a.id
{fts_join}
{where}
ORDER BY {order_by}
LIMIT ? OFFSET ?
""",
params,
).fetchall()
return [dict(row) for row in rows]
def reindex_search(conn: sqlite3.Connection) -> int:
"""Rebuild the article_search FTS index from the accepted, non-duplicate corpus
(title/description/source name/tags). A cheap full rebuild (a few thousand
rows); run on each ingest cycle and lazily on first search. Live visibility /
boundary filtering is applied at query time, so it doesn't need reindexing."""
conn.execute("DELETE FROM article_search")
conn.execute(
"""
INSERT INTO article_search (article_id, title, body, source_name, tags)
SELECT a.id, a.title, COALESCE(a.description, ''), src.name,
COALESCE((SELECT group_concat(t.tag, ' ') FROM article_tags t WHERE t.article_id = a.id), '')
FROM articles a
JOIN sources src ON src.id = a.source_id
JOIN article_scores s ON s.article_id = a.id
WHERE s.accepted = 1 AND a.duplicate_of IS NULL
"""
)
conn.commit()
return conn.execute("SELECT COUNT(*) FROM article_search").fetchone()[0]
# Scope dial: the reader's "emotional radius". Each tier is a closest->widest lead
# preference, not a hard filter; 'world' is the implicit final tier. State + region
# are confidence-gated (high/medium) so a shaky location is never promoted as local.
_STATE_SQL = ("(g.confidence IN ('high','medium') AND EXISTS (SELECT 1 FROM article_places p "
"WHERE p.article_id = a.id AND p.country_code = ? AND p.state_code = ?))")
_COUNTRY_SQL = "(EXISTS (SELECT 1 FROM article_places p WHERE p.article_id = a.id AND p.country_code = ?))"
SCOPES = ("nearby", "region", "country", "world")
def _region_sql(n: int) -> str:
placeholders = ",".join("?" * n)
return ("(g.confidence IN ('high','medium') AND EXISTS (SELECT 1 FROM article_places p "
f"WHERE p.article_id = a.id AND p.country_code = ? AND p.state_code IN ({placeholders})))")
def home_tiers(home_country: str, home_state: str | None, scope: str) -> list[tuple]:
"""Ordered [(section_key, predicate_sql, params)] closest->widest for a home + scope.
Evaluated first-match (CASE WHEN / composed in order), so tiers needn't be SQL-exclusive.
'world' is implicit (everything not matched). 'region'/'nearby' need a US state; otherwise
they gracefully fall back to country (country-only / non-US homes collapse to Country/World).
"""
from .geo import region_states
rs = region_states(home_state) if home_state else []
tiers: list[tuple] = []
if scope == "nearby" and home_state:
tiers.append(("state", _STATE_SQL, [home_country, home_state]))
if rs:
tiers.append(("region", _region_sql(len(rs)), [home_country, *rs]))
tiers.append(("country", _COUNTRY_SQL, [home_country]))
elif scope == "region" and home_state and rs:
tiers.append(("region", _region_sql(len(rs)), [home_country, *rs])) # includes the state
tiers.append(("country", _COUNTRY_SQL, [home_country]))
else: # country scope, country-only / non-US home, or any fallback
tiers.append(("country", _COUNTRY_SQL, [home_country]))
return tiers
def home_brief(conn: sqlite3.Connection, home_country: str, home_state: str | None = None,
scope: str = "nearby", limit: int = 7, window_days: int = 3) -> list[dict]:
"""Scope-aware local-first landing highlights. Leads with the reader's chosen radius
(state / region / country) then blends outward so the set is always full — "closest
first", never three stale local stories. Prefers already-summarized stories so the
calm read stays rich. Brief-shaped rows tagged with a concrete section key.
"""
tiers = home_tiers(home_country, home_state, scope)
whens, params = [], []
for i, (_key, pred, ps) in enumerate(tiers):
whens.append(f"WHEN {pred} THEN {i}")
params += ps
world_rank = len(tiers)
section_case = ("CASE " + " ".join(whens) + f" ELSE {world_rank} END") if whens else "0"
section_keys = [k for k, _, _ in tiers] + ["world"]
rows = conn.execute(
f"""
SELECT {_ARTICLE_COLUMNS},
sm.summary AS summary,
{section_case} AS section_rank,
(sm.summary IS NOT NULL) AS has_summary
FROM articles a
JOIN sources src ON src.id = a.source_id
JOIN article_scores s ON s.article_id = a.id
LEFT JOIN article_geo g ON g.article_id = a.id
LEFT JOIN article_summaries sm ON sm.article_id = a.id
WHERE a.duplicate_of IS NULL AND src.content_visible = 1 AND s.accepted = 1
AND a.discovered_at >= datetime('now', ?)
ORDER BY section_rank ASC, has_summary DESC, rank_score DESC,
COALESCE(a.published_at, a.discovered_at) DESC
LIMIT ?
""",
params + [f"-{window_days} days", limit],
).fetchall()
out = []
for r in rows:
d = dict(r)
rank = d.pop("section_rank", world_rank)
d["__section"] = section_keys[rank] if 0 <= rank < len(section_keys) else "world"
out.append(d)
return out
def brief(conn: sqlite3.Connection, brief_date: str | None = None, limit: int = 10) -> dict:
"""Return a stored daily brief (latest if no date) with its ranked items."""
target_date = brief_date or _latest_brief_date(conn)
if not target_date:
return {"brief_date": None, "title": None, "items": []}
header = conn.execute(
"SELECT brief_date, title, created_at FROM daily_briefs WHERE brief_date = ?",
(target_date,),
).fetchone()
if not header:
return {"brief_date": target_date, "title": None, "created_at": None, "items": []}
rows = conn.execute(
f"""
SELECT bi.rank, bi.selection_reason, {_ARTICLE_COLUMNS},
(SELECT summary FROM article_summaries WHERE article_id = a.id) AS summary
FROM daily_briefs b
JOIN daily_brief_items bi ON bi.brief_id = b.id
JOIN articles a ON a.id = bi.article_id
JOIN sources src ON src.id = a.source_id
LEFT JOIN article_scores s ON s.article_id = a.id
WHERE b.brief_date = ? AND src.content_visible = 1
ORDER BY bi.rank
LIMIT ?
""",
(target_date, limit),
).fetchall()
return {
"brief_date": header["brief_date"],
"title": header["title"],
"created_at": header["created_at"],
"items": [dict(row) for row in rows],
}
def saved(conn: sqlite3.Connection, user_id: int, limit: int = 200) -> list[dict]:
"""Articles a user has saved, newest first (same shape as the feed)."""
rows = conn.execute(
f"""
SELECT {_ARTICLE_COLUMNS}
FROM saved_articles sv
JOIN articles a ON a.id = sv.article_id
JOIN sources src ON src.id = a.source_id
LEFT JOIN article_scores s ON s.article_id = a.id
WHERE sv.user_id = ?
ORDER BY sv.saved_at DESC
LIMIT ?
""",
(user_id, limit),
).fetchall()
return [dict(row) for row in rows]
def saved_ids(conn: sqlite3.Connection, user_id: int) -> list[int]:
return [r[0] for r in conn.execute(
"SELECT article_id FROM saved_articles WHERE user_id = ?", (user_id,)
)]
def history(conn: sqlite3.Connection, user_id: int, limit: int = 200) -> list[dict]:
"""Articles in a user's account history, most-recent first."""
rows = conn.execute(
f"""
SELECT {_ARTICLE_COLUMNS}, MAX(h.at) AS seen_at
FROM user_history h
JOIN articles a ON a.id = h.article_id
JOIN sources src ON src.id = a.source_id
LEFT JOIN article_scores s ON s.article_id = a.id
WHERE h.user_id = ?
GROUP BY a.id
ORDER BY seen_at DESC
LIMIT ?
""",
(user_id, limit),
).fetchall()
return [dict(row) for row in rows]
def content_stats(conn: sqlite3.Connection) -> dict:
"""Corpus / serving health for the dashboard: how much good news is live."""
def scalar(sql, params=()):
return conn.execute(sql, params).fetchone()[0] or 0
served = scalar(
"SELECT COUNT(*) FROM article_scores s JOIN articles a ON a.id=s.article_id "
"WHERE s.accepted=1 AND a.duplicate_of IS NULL"
)
accepted_7d = scalar(
"SELECT COUNT(*) FROM article_scores s JOIN articles a ON a.id=s.article_id "
"WHERE s.accepted=1 AND a.duplicate_of IS NULL "
"AND COALESCE(a.published_at, a.discovered_at) >= datetime('now','-7 days')"
)
brief = conn.execute(
"SELECT brief_date, (SELECT COUNT(*) FROM daily_brief_items WHERE brief_id=daily_briefs.id) n "
"FROM daily_briefs ORDER BY brief_date DESC LIMIT 1"
).fetchone()
with_image = scalar(
"SELECT COUNT(*) FROM article_scores s JOIN articles a ON a.id=s.article_id "
"WHERE s.accepted=1 AND a.duplicate_of IS NULL AND a.image_url IS NOT NULL AND a.image_url!=''"
)
# Coverage is scoped to LIVE articles (accepted, non-duplicate) so the
# percentages can't drift past 100% as rejected/duplicate rows accrue summaries.
summaries = scalar(
"SELECT COUNT(*) FROM article_summaries m JOIN articles a ON a.id=m.article_id "
"JOIN article_scores s ON s.article_id=a.id WHERE s.accepted=1 AND a.duplicate_of IS NULL"
)
summaries_with_image = scalar(
"SELECT COUNT(*) FROM article_summaries m JOIN articles a ON a.id=m.article_id "
"JOIN article_scores s ON s.article_id=a.id WHERE s.accepted=1 AND a.duplicate_of IS NULL "
"AND a.image_url IS NOT NULL AND a.image_url!=''"
)
brief_with_image = 0
if brief:
brief_with_image = scalar(
"SELECT COUNT(*) FROM daily_brief_items bi JOIN articles a ON a.id=bi.article_id "
"JOIN daily_briefs b ON b.id=bi.brief_id "
"WHERE b.brief_date=? AND a.image_url IS NOT NULL AND a.image_url!=''",
(brief["brief_date"],),
)
return {
"served": served,
"total": scalar("SELECT COUNT(*) FROM articles"),
"rejected": scalar("SELECT COUNT(*) FROM article_scores WHERE accepted=0"),
"accepted_7d": accepted_7d,
"added_24h": scalar("SELECT COUNT(*) FROM articles WHERE discovered_at >= datetime('now','-1 day')"),
"added_7d": scalar("SELECT COUNT(*) FROM articles WHERE discovered_at >= datetime('now','-7 days')"),
"summaries": summaries,
"summaries_with_image": summaries_with_image,
"with_image": with_image,
# Accepted, imageless articles whose enrichment was tried recently and
# came up empty (no usable og:image) — the image "misses" to watch.
"recent_enrich_fail": scalar(
"SELECT COUNT(*) FROM article_scores s JOIN articles a ON a.id=s.article_id "
"WHERE s.accepted=1 AND a.duplicate_of IS NULL AND (a.image_url IS NULL OR a.image_url='') "
"AND a.image_checked_at >= datetime('now','-1 day')"
),
"active_sources": scalar("SELECT COUNT(*) FROM sources WHERE active=1"),
"total_sources": scalar("SELECT COUNT(*) FROM sources"),
"latest_brief_date": brief["brief_date"] if brief else None,
"latest_brief_size": brief["n"] if brief else 0,
"brief_with_image": brief_with_image,
}
def source_health(conn: sqlite3.Connection) -> list[dict]:
"""Per source (active, paused, AND retired), the data an operator needs to manage feeds:
failure streak, last error/success/attempt, computed next poll, and the
backing metrics (served/accepted/total counts + acceptance & duplicate rates)
so Pause/Flag decisions aren't vibes-based.
next_due_at = last attempt + MIN(cap, interval * (1 + consecutive_failures)),
mirroring feeds.due_source_rows; NULL last attempt means "due now". Paused
sources are included (their articles stay live; only polling stops).
"""
rows = conn.execute(
"""
SELECT
s.id, s.name, s.feed_url, s.homepage_url, s.default_category AS category, s.active,
s.status, s.content_visible, s.retry_after_at,
s.consecutive_failures AS failures, s.review_flag, s.review_reason, s.paywall_override,
s.poll_interval_minutes AS interval_minutes,
s.last_success_at, s.last_error_at, substr(s.last_error, 1, 160) AS last_error,
(SELECT MAX(r.finished_at) FROM ingest_runs r
WHERE r.source_id = s.id AND r.finished_at IS NOT NULL) AS last_attempt,
(SELECT COUNT(*) FROM articles a WHERE a.source_id = s.id) AS total_articles,
(SELECT COUNT(*) FROM articles a JOIN article_scores sc ON sc.article_id = a.id
WHERE a.source_id = s.id AND sc.accepted = 1) AS accepted_total,
(SELECT COUNT(*) FROM articles a JOIN article_scores sc ON sc.article_id = a.id
WHERE a.source_id = s.id AND sc.reason_code = 'non_english') AS non_english,
(SELECT COUNT(*) FROM articles a WHERE a.source_id = s.id AND a.duplicate_of IS NOT NULL) AS duplicates,
(SELECT COUNT(*) FROM articles a JOIN article_scores sc ON sc.article_id = a.id
WHERE a.source_id = s.id AND sc.accepted = 1 AND a.duplicate_of IS NULL) AS served,
(SELECT COUNT(*) FROM articles a JOIN article_scores sc ON sc.article_id = a.id
WHERE a.source_id = s.id AND sc.accepted = 1 AND a.duplicate_of IS NULL
AND a.image_url IS NOT NULL AND a.image_url != '') AS images,
datetime(
(SELECT MAX(r.finished_at) FROM ingest_runs r
WHERE r.source_id = s.id AND r.finished_at IS NOT NULL),
'+' || MIN(?, s.poll_interval_minutes * (1 + s.consecutive_failures)) || ' minutes'
) AS next_due_at
FROM sources s
ORDER BY s.active DESC, s.consecutive_failures DESC, served DESC, s.name
""",
(MAX_BACKOFF_MINUTES,),
).fetchall()
out = []
for r in rows:
d = dict(r)
total = d["total_articles"] or 0
accepted = d["accepted_total"] or 0
non_english = d.get("non_english") or 0
# Acceptance is judged over articles actually scored in English — non-English
# items are HELD (awaiting translation), not calm-filter rejections, so they
# don't drag a multilingual source's rate down.
judged = total - non_english
d["acceptance_rate"] = round(100 * accepted / judged) if judged else None
d["non_english"] = non_english
d["non_english_rate"] = round(100 * non_english / total) if total else None
d["duplicate_rate"] = round(100 * d["duplicates"] / total) if total else None
# Curation quality: of what this source got ACCEPTED, how much was a
# duplicate of content already served (accepted_total served = accepted dupes).
d["accepted_dup_rate"] = round(100 * (accepted - d["served"]) / accepted) if accepted else None
d["image_coverage"] = round(100 * (d["images"] or 0) / d["served"]) if d["served"] else None
# Paywall is a domain-level hint + a per-source override; show the EFFECTIVE flag.
d["paywalled"] = is_paywalled_for_source(d.get("homepage_url") or d.get("feed_url"), d.get("paywall_override"))
# Match the REAL scheduler gate: due = the later of the streak-backoff time
# and any retry_after_at rest (UTC strings sort chronologically).
due_times = [t for t in (d["next_due_at"], d["retry_after_at"]) if t]
d["next_due_at"] = max(due_times) if due_times else None
out.append(d)
return out
# Attention thresholds — conservative + volume-gated, so the strip is a calm
# operator nudge rather than a noisy scold. Items are aggregated (one line per
# condition with a count), not one line per offending source.
_STALE_DAYS = 10
_REJECT_MIN_INGESTED = 20
_REJECT_RATE = 25 # acceptance below this % (with enough volume)
_DUP_MIN_ACCEPTED = 10
_DUP_RATE = 50 # accepted-duplicate above this %
_IMG_MIN_SERVED = 10
_IMG_COVERAGE = 25 # per-source image coverage below this %
_LONG_REST_HOURS = 12
def _attention(content: dict, sources: list[dict], feedback_unread: int, now: datetime | None = None) -> list[dict]:
"""The 'Attention Needed' strip: what an operator should look at, soft-toned
(warn = act soon, info = worth a glance). Derived from the same data shown
elsewhere, so it never disagrees with the detail sections."""
items: list[dict] = []
n = lambda c: "" if c == 1 else "s" # noqa: E731 — tiny pluralizer
now = now or datetime.now(UTC)
active = [s for s in sources if (s.get("status") or ("active" if s.get("active") else "paused")) == "active"]
resting = [s for s in active if (s.get("failures") or 0) > 0]
if resting:
items.append({"level": "warn", "text": f"{len(resting)} source{n(len(resting))} backing off after failures"})
flagged = [s for s in sources if s.get("review_flag")]
if flagged:
items.append({"level": "warn", "text": f"{len(flagged)} source{n(len(flagged))} flagged for review"})
# Stale: active, visible feeds whose last success is well in the past.
stale_cutoff = (now - timedelta(days=_STALE_DAYS)).strftime("%Y-%m-%d %H:%M:%S")
stale = [s for s in active if s.get("content_visible", 1) and s.get("last_success_at") and s["last_success_at"] < stale_cutoff]
if stale:
items.append({"level": "warn", "text": f"{len(stale)} source{n(len(stale))} haven't updated in over {_STALE_DAYS} days"})
# High rejection: enough ingested volume, low acceptance.
rejecting = [s for s in active if (s.get("total_articles") or 0) >= _REJECT_MIN_INGESTED
and s.get("acceptance_rate") is not None and s["acceptance_rate"] < _REJECT_RATE]
if rejecting:
items.append({"level": "info", "text": f"{len(rejecting)} source{n(len(rejecting))} accepting under {_REJECT_RATE}% of submissions"})
# High accepted-duplicate: enough accepted volume, mostly echoing others.
duping = [s for s in active if (s.get("accepted_total") or 0) >= _DUP_MIN_ACCEPTED
and s.get("accepted_dup_rate") is not None and s["accepted_dup_rate"] > _DUP_RATE]
if duping:
items.append({"level": "info", "text": f"{len(duping)} source{n(len(duping))} mostly duplicating other feeds"})
# Low image coverage (info, not a warning).
thin = [s for s in active if (s.get("served") or 0) >= _IMG_MIN_SERVED
and s.get("image_coverage") is not None and s["image_coverage"] < _IMG_COVERAGE]
if thin:
items.append({"level": "info", "text": f"{len(thin)} source{n(len(thin))} with thin image coverage (under {_IMG_COVERAGE}%)"})
# Long rate-limit rest (info).
rest_cutoff = (now + timedelta(hours=_LONG_REST_HOURS)).strftime("%Y-%m-%d %H:%M:%S")
long_rest = [s for s in active if s.get("retry_after_at") and s["retry_after_at"] > rest_cutoff]
if long_rest:
items.append({"level": "info", "text": f"{len(long_rest)} source{n(len(long_rest))} rate-limited for {_LONG_REST_HOURS}h+"})
# Site-wide signals.
served = content.get("served") or 0
with_image = content.get("with_image") or 0
if served and (with_image / served) < 0.70:
items.append({"level": "info", "text": f"Image coverage at {round(100 * with_image / served)}% (aim for 70%+)"})
brief_size = content.get("latest_brief_size") or 0
if brief_size < 7:
items.append({"level": "info", "text": f"Today's brief has only {brief_size} item{n(brief_size)}"})
if feedback_unread:
items.append({"level": "info", "text": f"{feedback_unread} unread feedback message{n(feedback_unread)}"})
return items
# --- Source article inspector: the real articles behind the source metrics -----
_SRC_ART_FILTERS = {
"accepted": "AND s.accepted = 1",
# 'rejected' = calm-filter rejections only; non-English is HELD, its own bucket.
"rejected": "AND s.accepted = 0 AND COALESCE(s.reason_code,'') != 'non_english'",
"held": "AND s.reason_code = 'non_english'",
"no_image": "AND (a.image_url IS NULL OR a.image_url = '')",
"duplicates": "AND a.duplicate_of IS NOT NULL",
}
def source_articles(conn: sqlite3.Connection, source_id: int, filter: str = "all",
limit: int = 25, offset: int = 0) -> list[dict]:
"""The actual ingested articles for a source, newest first — so admins can
verify the metric (paywall/image/acceptance) against real evidence."""
ov = conn.execute("SELECT paywall_override FROM sources WHERE id = ?", (source_id,)).fetchone()
override = ov["paywall_override"] if ov else None
where = _SRC_ART_FILTERS.get(filter, "")
rows = conn.execute(
f"""
SELECT a.id, a.title, a.canonical_url, a.published_at, a.discovered_at,
a.image_url, a.duplicate_of,
s.accepted, s.reason_code, s.reason_text, s.topic, s.flavor
FROM articles a
LEFT JOIN article_scores s ON s.article_id = a.id
WHERE a.source_id = ? {where}
ORDER BY COALESCE(a.published_at, a.discovered_at) DESC
LIMIT ? OFFSET ?
""",
(source_id, limit, offset),
).fetchall()
return [
{
"id": r["id"],
"title": r["title"],
"url": r["canonical_url"],
"published_at": r["published_at"] or r["discovered_at"],
"accepted": r["accepted"],
"reason": r["reason_text"] or r["reason_code"], # the "why" behind accept/reject
"held": r["reason_code"] == "non_english", # held for language, not rejected
"topic": r["topic"],
"flavor": r["flavor"],
"paywalled": is_paywalled_for_source(r["canonical_url"], override), # effective (domain rule + override)
"has_image": bool(r["image_url"]),
"duplicate": r["duplicate_of"] is not None,
}
for r in rows
]
def source_articles_summary(conn: sqlite3.Connection, source_id: int) -> dict:
"""Counts behind the table metrics + the source-level paywall rule, so the
panel header reads e.g. '120 · 96 accepted · 24 rejected · 3 no image · paywall: ON'."""
agg = conn.execute(
"""
SELECT COUNT(*) total,
COALESCE(SUM(s.accepted = 1), 0) accepted,
COALESCE(SUM(s.accepted = 0 AND COALESCE(s.reason_code,'') != 'non_english'), 0) rejected,
COALESCE(SUM(s.reason_code = 'non_english'), 0) non_english,
COALESCE(SUM(a.image_url IS NULL OR a.image_url = ''), 0) no_image,
COALESCE(SUM(a.duplicate_of IS NOT NULL), 0) duplicates
FROM articles a LEFT JOIN article_scores s ON s.article_id = a.id
WHERE a.source_id = ?
""",
(source_id,),
).fetchone()
srow = conn.execute("SELECT homepage_url, feed_url, paywall_override FROM sources WHERE id = ?", (source_id,)).fetchone()
override = srow["paywall_override"] if srow else None
url = (srow["homepage_url"] or srow["feed_url"]) if srow else None
return {
"total": agg["total"], "accepted": agg["accepted"], "rejected": agg["rejected"],
"non_english": agg["non_english"], # held for language (not a calm-filter rejection)
"no_image": agg["no_image"], "duplicates": agg["duplicates"],
"paywalled": is_paywalled_for_source(url, override), # effective
"paywall_domain": is_paywalled(url), # what the domain rule alone says
"paywall_override": override, # null | 'free' | 'paywalled' — the basis
}
def admin_stats(conn: sqlite3.Connection, days: int = 30) -> dict:
"""Aggregate, non-personal usage stats for the admin dashboard."""
since = f"-{days} days"
# "Today" for timestamp-based counters is the SITE-LOCAL day (GOODNEWS_TZ), not
# UTC: otherwise an evening error (e.g. 22:53 local) lands on the next UTC day and
# reads as a fresh "today" the following morning — the exact false-alarm we hit.
local_day_start = (local_now().replace(hour=0, minute=0, second=0, microsecond=0)
.astimezone(UTC).strftime("%Y-%m-%d %H:%M:%S"))
def scalar(sql, params=()):
return conn.execute(sql, params).fetchone()[0] or 0
visitors = {
"today": scalar("SELECT COUNT(DISTINCT visitor_hash) FROM events "
"WHERE kind='visit' AND visitor_hash!='' AND day=date('now')"),
"d7": scalar("SELECT COUNT(DISTINCT visitor_hash) FROM events "
"WHERE kind='visit' AND visitor_hash!='' AND day>=date('now','-7 days')"),
"d30": scalar("SELECT COUNT(DISTINCT visitor_hash) FROM events "
"WHERE kind='visit' AND visitor_hash!='' AND day>=date('now',?)", (since,)),
}
# Returning (seen on ≥2 distinct days) vs one-and-done, over the window.
rows = conn.execute(
"SELECT CASE WHEN d>=2 THEN 'returning' ELSE 'once' END g, COUNT(*) n FROM ("
" SELECT visitor_hash, COUNT(DISTINCT day) d FROM events "
" WHERE kind='visit' AND visitor_hash!='' AND day>=date('now',?) GROUP BY visitor_hash"
") GROUP BY g", (since,),
).fetchall()
loyalty = {r["g"]: r["n"] for r in rows}
# An "open" = opening the article via our summary page (legacy 'open' + 'summary_viewed').
OPEN = "e.kind IN ('open','summary_viewed')"
top_articles = [dict(r) for r in conn.execute(
f"SELECT e.article_id AS id, a.title, src.name AS source, COUNT(*) AS opens "
f"FROM events e JOIN articles a ON a.id=e.article_id JOIN sources src ON src.id=a.source_id "
f"WHERE {OPEN} AND e.article_id>0 AND e.day>=date('now',?) "
f"GROUP BY e.article_id ORDER BY opens DESC LIMIT 12", (since,),
)]
top_groupings = [dict(r) for r in conn.execute(
f"SELECT t.tag, COUNT(*) AS opens FROM events e JOIN article_tags t ON t.article_id=e.article_id "
f"WHERE {OPEN} AND e.day>=date('now',?) GROUP BY t.tag ORDER BY opens DESC LIMIT 12", (since,),
)]
top_topics = [dict(r) for r in conn.execute(
f"SELECT s.topic, COUNT(*) AS opens FROM events e JOIN article_scores s ON s.article_id=e.article_id "
f"WHERE {OPEN} AND s.topic IS NOT NULL AND e.day>=date('now',?) "
f"GROUP BY s.topic ORDER BY opens DESC", (since,),
)]
# Counts per event kind over the window (each = distinct visitor-days, by dedup).
kc_rows = conn.execute(
"SELECT kind, COUNT(*) n FROM events WHERE day>=date('now',?) GROUP BY kind", (since,)
).fetchall()
kc = {r["kind"]: r["n"] for r in kc_rows}
shares = {k: kc.get(k, 0) for k in ("share_ub", "copy_source", "native_share", "source_click")}
summary_views = kc.get("summary_viewed", 0)
source_clicks = kc.get("source_click", 0)
funnel = {
"summary_viewed": summary_views,
"source_click": source_clicks,
"full_story": kc.get("full_story", 0),
"source_rate": round(100 * source_clicks / summary_views) if summary_views else 0,
}
emotional_mix = {
"not_today": kc.get("not_today", 0),
"less_like_this": kc.get("less_like_this", 0),
"hide_topic": kc.get("hide_topic", 0),
}
paywall = {
"paywall_replace": kc.get("paywall_replace", 0),
"paywalled_source_open": kc.get("paywalled_source_open", 0),
}
replace = {"used": kc.get("replace_used", 0), "none": kc.get("replace_none", 0)}
# Game funnel — the growth loop we're instrumenting. Each count is distinct
# visitor-days (events dedupe per kind/day), so it reads as "people", not actions.
_GAME_NAMES = ("word", "wordsearch", "bloom", "match")
_GAME_EVENTS = ("arrival", "started", "completed", "shared") # arrival = share-loop acquisition
games_by = {
g: {e: kc.get(f"{g}_{e}", 0) for e in _GAME_EVENTS}
for g in _GAME_NAMES
}
games = {
"by_game": games_by,
"totals": {e: sum(games_by[g][e] for g in _GAME_NAMES) for e in _GAME_EVENTS},
}
# Accounts — aggregate counts only (no emails, no per-user listing).
accounts = {
"total": scalar("SELECT COUNT(*) FROM users"),
"new_today": scalar("SELECT COUNT(*) FROM users WHERE date(created_at)=date('now')"),
"new_7d": scalar("SELECT COUNT(*) FROM users WHERE created_at>=date('now','-7 days')"),
"new_30d": scalar("SELECT COUNT(*) FROM users WHERE created_at>=date('now',?)", (since,)),
"active_7d": scalar("SELECT COUNT(DISTINCT user_id) FROM sessions WHERE last_seen_at>=date('now','-7 days')"),
}
# Returning-visitor buckets by distinct active days in the window.
bucket_rows = conn.execute(
"SELECT CASE WHEN d>=8 THEN '8+' WHEN d>=4 THEN '4-7' WHEN d>=2 THEN '2-3' ELSE 'new' END b, "
"COUNT(*) n FROM (SELECT visitor_hash, COUNT(DISTINCT day) d FROM events "
"WHERE kind='visit' AND visitor_hash!='' AND day>=date('now',?) GROUP BY visitor_hash) GROUP BY b",
(since,),
).fetchall()
buckets = {r["b"]: r["n"] for r in bucket_rows}
retention = {k: buckets.get(k, 0) for k in ("new", "2-3", "4-7", "8+")}
daily = [dict(r) for r in conn.execute(
"SELECT day, SUM(kind IN ('open','summary_viewed')) AS opens, SUM(kind='visit') AS visits "
"FROM events WHERE day>=date('now',?) GROUP BY day ORDER BY day", (since,),
)]
content = content_stats(conn)
sources = source_health(conn)
feedback_7d = scalar("SELECT COUNT(*) FROM feedback WHERE created_at >= datetime('now','-7 days')")
feedback_unread = scalar("SELECT COUNT(*) FROM feedback WHERE read_at IS NULL")
return {
"days": days,
"content": content,
"sources": sources,
"attention": _attention(content, sources, feedback_unread),
"feedback_7d": feedback_7d,
"feedback_unread": feedback_unread,
"visitors": visitors,
"returning": loyalty.get("returning", 0),
"once": loyalty.get("once", 0),
"retention": retention,
"accounts": accounts,
"funnel": funnel,
"emotional_mix": emotional_mix,
"paywall": paywall,
"replace": replace,
"games": games,
"top_articles": top_articles,
"top_groupings": top_groupings,
"top_topics": top_topics,
"shares": shares,
"daily": daily,
# Boot-failure seatbelt signal — blank-screen risk surfacing. Bots are
# excluded from the headline counts: throttled crawlers fail the boot
# check routinely and would read as real users seeing blank screens.
"client_errors": {
"today": scalar(
f"SELECT COUNT(*) FROM client_errors WHERE created_at >= ? AND {_NOT_BOT_SQL}",
(local_day_start,),
),
"window": scalar(
f"SELECT COUNT(*) FROM client_errors WHERE created_at>=date('now',?) AND {_NOT_BOT_SQL}",
(since,),
),
},
}
def existing_article_ids(conn: sqlite3.Connection, ids: list[int]) -> list[int]:
"""Filter to ids that still exist (FK-safe inserts for save/history/import)."""
clean = list({int(i) for i in ids})[:1000]
if not clean:
return []
placeholders = ",".join("?" * len(clean))
return [r[0] for r in conn.execute(
f"SELECT id FROM articles WHERE id IN ({placeholders})", clean
)]
def tag_counts(conn: sqlite3.Connection, accepted_only: bool = True) -> dict:
"""How many shown (accepted, non-duplicate) articles carry each grouping tag."""
where = "WHERE a.duplicate_of IS NULL" + (" AND s.accepted = 1" if accepted_only else "")
rows = conn.execute(
f"""
SELECT t.tag, COUNT(*) AS count
FROM article_tags t
JOIN articles a ON a.id = t.article_id
JOIN article_scores s ON s.article_id = a.id
{where}
GROUP BY t.tag
"""
).fetchall()
return {r["tag"]: r["count"] for r in rows}
def category_counts(conn: sqlite3.Connection, accepted_only: bool = True) -> list[dict]:
"""Return per topic/flavor article counts for building browse UIs.
Joins articles and excludes duplicates so the counts match exactly what the
feed endpoint will actually return for each topic/flavor.
"""
clauses = ["a.duplicate_of IS NULL"]
clauses.append("s.accepted = 1" if accepted_only else "s.topic IS NOT NULL")
rows = conn.execute(
f"""
SELECT s.topic, s.flavor, COUNT(*) AS count
FROM article_scores s
JOIN articles a ON a.id = s.article_id
WHERE {" AND ".join(clauses)}
GROUP BY s.topic, s.flavor
ORDER BY s.topic, s.flavor
"""
).fetchall()
return [dict(row) for row in rows]
def available_dates(conn: sqlite3.Connection, limit: int = 30) -> list[str]:
rows = conn.execute(
"SELECT brief_date FROM daily_briefs ORDER BY brief_date DESC LIMIT ?",
(limit,),
).fetchall()
return [row["brief_date"] for row in rows]
def _latest_brief_date(conn: sqlite3.Connection) -> str | None:
row = conn.execute(
"SELECT brief_date FROM daily_briefs ORDER BY brief_date DESC LIMIT 1"
).fetchone()
return row["brief_date"] if row else None