ad4e88c8f2
Foundation for "Closer to Home" (server-side, Codex-approved). No behavior change
yet — geo_scope defaults None, so the default/edge-cached feed is identical.
- queries.feed now returns each article's geo (breadth, confidence, and ISO-coded
places) via a LEFT JOIN + places subquery. Article.from_row parses geo_places
into [{country, state}]. Brief query doesn't select geo, so the Brief stays bare.
- queries.feed gains home-scope filters (home_country/home_state/geo_scope =
near|country|world): STATE match only counts on high/medium geo confidence;
untagged articles fall to 'world' so nothing is lost during backfill.
Next: API composition (home param + near/country/world sectioning with soft/blended
headers + a next_offset pagination model) and the Home picker UI. 360 tests green.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
831 lines
37 KiB
Python
831 lines
37 KiB
Python
"""Read-only query helpers over the goodNews database.
|
||
|
||
Pure stdlib and framework-agnostic: returns plain dicts so the same functions
|
||
back both the CLI and the JSON API. All article output is metadata + a link to
|
||
the original source — never stored bodies.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import sqlite3
|
||
from datetime import UTC, datetime, timedelta
|
||
|
||
from .feeds import MAX_BACKOFF_MINUTES
|
||
from .localtime import local_now
|
||
from .paywall import is_paywalled, is_paywalled_for_source
|
||
|
||
# UA substrings that mark automated clients. Crawlers run JS on a throttled
|
||
# budget and trip the boot-failure beacon routinely — without this filter they
|
||
# read as real users seeing blank screens.
|
||
BOT_UA_MARKS = ("headlesschrome", "bot", "spider", "crawl", "python", "curl", "wget", "phantomjs")
|
||
_NOT_BOT_SQL = " AND ".join(f"instr(lower(user_agent), '{m}')=0" for m in BOT_UA_MARKS)
|
||
|
||
|
||
def is_bot_ua(ua: str | None) -> bool:
|
||
low = (ua or "").lower()
|
||
return any(m in low for m in BOT_UA_MARKS)
|
||
|
||
|
||
# Composite ranking used everywhere a "best first" order is needed. Kept as one
|
||
# expression so brief, category feeds, and the API all rank identically.
|
||
RANK_SCORE_SQL = (
|
||
"(s.constructive_score + s.agency_score + s.human_benefit_score + src.trust_score "
|
||
"- s.cortisol_score - s.ragebait_score - s.pr_risk_score)"
|
||
)
|
||
|
||
_ARTICLE_COLUMNS = f"""
|
||
a.id,
|
||
a.title,
|
||
a.description,
|
||
a.canonical_url,
|
||
a.published_at,
|
||
a.image_url,
|
||
a.source_id,
|
||
src.name AS source_name,
|
||
s.topic,
|
||
s.flavor,
|
||
s.accepted,
|
||
s.constructive_score,
|
||
s.cortisol_score,
|
||
s.ragebait_score,
|
||
s.agency_score,
|
||
s.human_benefit_score,
|
||
s.pr_risk_score,
|
||
s.reason_code,
|
||
s.reason_text,
|
||
s.model_name,
|
||
src.paywall_override AS paywall_override,
|
||
(SELECT group_concat(t.tag) FROM article_tags t WHERE t.article_id = a.id) AS tags,
|
||
{RANK_SCORE_SQL} AS rank_score
|
||
"""
|
||
|
||
|
||
def feed(
|
||
conn: sqlite3.Connection,
|
||
topic: str | None = None,
|
||
flavor: str | None = None,
|
||
accepted_only: bool = True,
|
||
limit: int = 30,
|
||
offset: int = 0,
|
||
include_topics: list[str] | None = None,
|
||
include_flavors: list[str] | None = None,
|
||
mute_topics: list[str] | None = None,
|
||
mute_flavors: list[str] | None = None,
|
||
max_cortisol: int | None = None,
|
||
max_ragebait: int | None = None,
|
||
tag: str | None = None,
|
||
source_id: int | None = None,
|
||
sort: str = "ranked",
|
||
follow_sources: list[int] | None = None,
|
||
follow_tags: list[str] | None = None,
|
||
since: str | None = None,
|
||
match: str | None = None,
|
||
home_country: str | None = None,
|
||
home_state: str | None = None,
|
||
geo_scope: str | None = None, # 'near' | 'country' | 'world' relative to the reader's home
|
||
) -> list[dict]:
|
||
"""Return articles with categorical filters applied in SQL.
|
||
|
||
sort="ranked" (default) is best-first by the composite rank; sort="latest"
|
||
is pure recency (newest first) for the chronological "Latest" feed. Both
|
||
stay accepted-only and respect the same boundaries.
|
||
|
||
Categorical filters (topic/flavor include & mute, cortisol/ragebait ceilings)
|
||
must be applied here, not after ranking — otherwise low-ranked-but-matching
|
||
items (e.g. 'discovery' for a Wonder lane) fall outside any over-fetch window.
|
||
Word-boundary avoid-terms remain a Python pass on the caller side.
|
||
"""
|
||
clauses = ["a.duplicate_of IS NULL", "src.content_visible = 1"]
|
||
params: list = []
|
||
# Full-text search: join the FTS index and MATCH first, so its bound param
|
||
# leads and relevance can drive the ordering. All the boundary clauses below
|
||
# still apply, so search mirrors exactly what the visitor feed would show.
|
||
fts_join = ""
|
||
if match:
|
||
fts_join = "JOIN article_search ON article_search.article_id = a.id"
|
||
clauses.append("article_search MATCH ?")
|
||
params.append(match)
|
||
if accepted_only:
|
||
clauses.append("s.accepted = 1")
|
||
if topic:
|
||
clauses.append("s.topic = ?")
|
||
params.append(topic.lower())
|
||
if flavor:
|
||
clauses.append("s.flavor = ?")
|
||
params.append(flavor.lower())
|
||
|
||
def _in(column: str, values: list[str], negate: bool = False) -> None:
|
||
vals = [v.lower() for v in values]
|
||
placeholders = ",".join("?" * len(vals))
|
||
op = "NOT IN" if negate else "IN"
|
||
# COALESCE keeps NULL-category rows from being dropped by NOT IN.
|
||
clauses.append(f"COALESCE({column}, '') {op} ({placeholders})")
|
||
params.extend(vals)
|
||
|
||
if include_topics:
|
||
_in("s.topic", include_topics)
|
||
if include_flavors:
|
||
_in("s.flavor", include_flavors)
|
||
if mute_topics:
|
||
_in("s.topic", mute_topics, negate=True)
|
||
if mute_flavors:
|
||
_in("s.flavor", mute_flavors, negate=True)
|
||
if max_cortisol is not None:
|
||
clauses.append("COALESCE(s.cortisol_score, 0) <= ?")
|
||
params.append(max_cortisol)
|
||
if max_ragebait is not None:
|
||
clauses.append("COALESCE(s.ragebait_score, 0) <= ?")
|
||
params.append(max_ragebait)
|
||
if tag:
|
||
clauses.append("EXISTS (SELECT 1 FROM article_tags at WHERE at.article_id = a.id AND at.tag = ?)")
|
||
params.append(tag.lower())
|
||
if source_id:
|
||
clauses.append("a.source_id = ?")
|
||
params.append(source_id)
|
||
|
||
if since:
|
||
# "New since last visit": articles discovered after the reader's last visit.
|
||
clauses.append("a.discovered_at > ?")
|
||
params.append(since)
|
||
|
||
# "Following" feed: articles from a followed source OR carrying a followed tag.
|
||
# Passing either list (even empty) switches to following mode; no follows → none.
|
||
if follow_sources is not None or follow_tags is not None:
|
||
ors: list[str] = []
|
||
for sid in follow_sources or []:
|
||
params.append(sid)
|
||
if follow_sources:
|
||
ors.append(f"a.source_id IN ({','.join('?' * len(follow_sources))})")
|
||
ftags = [t.lower() for t in (follow_tags or [])]
|
||
if ftags:
|
||
ors.append(
|
||
f"EXISTS (SELECT 1 FROM article_tags at WHERE at.article_id = a.id "
|
||
f"AND at.tag IN ({','.join('?' * len(ftags))}))"
|
||
)
|
||
params.extend(ftags)
|
||
clauses.append("(" + " OR ".join(ors) + ")" if ors else "0")
|
||
|
||
# Home-aware scoping for "Closer to Home" (server-side). Relative to the reader's
|
||
# chosen home; geo_scope=None leaves the feed exactly as it is today. A STATE match
|
||
# only counts when geo confidence is high/medium (don't surface "Near you" on a
|
||
# shaky location). Untagged articles have no places, so they land in 'world' — never
|
||
# lost while the backfill is still running.
|
||
if geo_scope == "near":
|
||
if home_state and home_country:
|
||
clauses.append(
|
||
"g.confidence IN ('high','medium') AND EXISTS (SELECT 1 FROM article_places p "
|
||
"WHERE p.article_id = a.id AND p.country_code = ? AND p.state_code = ?)")
|
||
params.extend([home_country, home_state])
|
||
elif home_country:
|
||
clauses.append("EXISTS (SELECT 1 FROM article_places p WHERE p.article_id = a.id AND p.country_code = ?)")
|
||
params.append(home_country)
|
||
elif geo_scope == "country" and home_country:
|
||
clauses.append("EXISTS (SELECT 1 FROM article_places p WHERE p.article_id = a.id AND p.country_code = ?)")
|
||
params.append(home_country)
|
||
if home_state: # "elsewhere in your country" = your country, but not your state
|
||
clauses.append("NOT EXISTS (SELECT 1 FROM article_places p2 WHERE p2.article_id = a.id AND p2.state_code = ?)")
|
||
params.append(home_state)
|
||
elif geo_scope == "world" and home_country:
|
||
clauses.append("NOT EXISTS (SELECT 1 FROM article_places p WHERE p.article_id = a.id AND p.country_code = ?)")
|
||
params.append(home_country)
|
||
|
||
where = "WHERE " + " AND ".join(clauses)
|
||
params.extend([limit, offset])
|
||
|
||
if match:
|
||
order_by = "bm25(article_search), COALESCE(a.published_at, a.discovered_at) DESC" # relevance, then recency
|
||
elif sort == "latest":
|
||
order_by = "COALESCE(a.published_at, a.discovered_at) DESC, rank_score DESC"
|
||
else:
|
||
order_by = "rank_score DESC, COALESCE(a.published_at, a.discovered_at) DESC"
|
||
rows = conn.execute(
|
||
f"""
|
||
SELECT {_ARTICLE_COLUMNS},
|
||
g.breadth AS geo_breadth, g.confidence AS geo_confidence,
|
||
(SELECT group_concat(
|
||
p.country_code || CASE WHEN p.state_code IS NOT NULL THEN '-' || p.state_code ELSE '' END, ',')
|
||
FROM article_places p WHERE p.article_id = a.id) AS geo_places
|
||
FROM articles a
|
||
JOIN sources src ON src.id = a.source_id
|
||
JOIN article_scores s ON s.article_id = a.id
|
||
LEFT JOIN article_geo g ON g.article_id = a.id
|
||
{fts_join}
|
||
{where}
|
||
ORDER BY {order_by}
|
||
LIMIT ? OFFSET ?
|
||
""",
|
||
params,
|
||
).fetchall()
|
||
return [dict(row) for row in rows]
|
||
|
||
|
||
def reindex_search(conn: sqlite3.Connection) -> int:
|
||
"""Rebuild the article_search FTS index from the accepted, non-duplicate corpus
|
||
(title/description/source name/tags). A cheap full rebuild (a few thousand
|
||
rows); run on each ingest cycle and lazily on first search. Live visibility /
|
||
boundary filtering is applied at query time, so it doesn't need reindexing."""
|
||
conn.execute("DELETE FROM article_search")
|
||
conn.execute(
|
||
"""
|
||
INSERT INTO article_search (article_id, title, body, source_name, tags)
|
||
SELECT a.id, a.title, COALESCE(a.description, ''), src.name,
|
||
COALESCE((SELECT group_concat(t.tag, ' ') FROM article_tags t WHERE t.article_id = a.id), '')
|
||
FROM articles a
|
||
JOIN sources src ON src.id = a.source_id
|
||
JOIN article_scores s ON s.article_id = a.id
|
||
WHERE s.accepted = 1 AND a.duplicate_of IS NULL
|
||
"""
|
||
)
|
||
conn.commit()
|
||
return conn.execute("SELECT COUNT(*) FROM article_search").fetchone()[0]
|
||
|
||
|
||
def brief(conn: sqlite3.Connection, brief_date: str | None = None, limit: int = 10) -> dict:
|
||
"""Return a stored daily brief (latest if no date) with its ranked items."""
|
||
target_date = brief_date or _latest_brief_date(conn)
|
||
if not target_date:
|
||
return {"brief_date": None, "title": None, "items": []}
|
||
|
||
header = conn.execute(
|
||
"SELECT brief_date, title, created_at FROM daily_briefs WHERE brief_date = ?",
|
||
(target_date,),
|
||
).fetchone()
|
||
if not header:
|
||
return {"brief_date": target_date, "title": None, "created_at": None, "items": []}
|
||
|
||
rows = conn.execute(
|
||
f"""
|
||
SELECT bi.rank, bi.selection_reason, {_ARTICLE_COLUMNS},
|
||
(SELECT summary FROM article_summaries WHERE article_id = a.id) AS summary
|
||
FROM daily_briefs b
|
||
JOIN daily_brief_items bi ON bi.brief_id = b.id
|
||
JOIN articles a ON a.id = bi.article_id
|
||
JOIN sources src ON src.id = a.source_id
|
||
LEFT JOIN article_scores s ON s.article_id = a.id
|
||
WHERE b.brief_date = ? AND src.content_visible = 1
|
||
ORDER BY bi.rank
|
||
LIMIT ?
|
||
""",
|
||
(target_date, limit),
|
||
).fetchall()
|
||
return {
|
||
"brief_date": header["brief_date"],
|
||
"title": header["title"],
|
||
"created_at": header["created_at"],
|
||
"items": [dict(row) for row in rows],
|
||
}
|
||
|
||
|
||
def saved(conn: sqlite3.Connection, user_id: int, limit: int = 200) -> list[dict]:
|
||
"""Articles a user has saved, newest first (same shape as the feed)."""
|
||
rows = conn.execute(
|
||
f"""
|
||
SELECT {_ARTICLE_COLUMNS}
|
||
FROM saved_articles sv
|
||
JOIN articles a ON a.id = sv.article_id
|
||
JOIN sources src ON src.id = a.source_id
|
||
LEFT JOIN article_scores s ON s.article_id = a.id
|
||
WHERE sv.user_id = ?
|
||
ORDER BY sv.saved_at DESC
|
||
LIMIT ?
|
||
""",
|
||
(user_id, limit),
|
||
).fetchall()
|
||
return [dict(row) for row in rows]
|
||
|
||
|
||
def saved_ids(conn: sqlite3.Connection, user_id: int) -> list[int]:
|
||
return [r[0] for r in conn.execute(
|
||
"SELECT article_id FROM saved_articles WHERE user_id = ?", (user_id,)
|
||
)]
|
||
|
||
|
||
def history(conn: sqlite3.Connection, user_id: int, limit: int = 200) -> list[dict]:
|
||
"""Articles in a user's account history, most-recent first."""
|
||
rows = conn.execute(
|
||
f"""
|
||
SELECT {_ARTICLE_COLUMNS}, MAX(h.at) AS seen_at
|
||
FROM user_history h
|
||
JOIN articles a ON a.id = h.article_id
|
||
JOIN sources src ON src.id = a.source_id
|
||
LEFT JOIN article_scores s ON s.article_id = a.id
|
||
WHERE h.user_id = ?
|
||
GROUP BY a.id
|
||
ORDER BY seen_at DESC
|
||
LIMIT ?
|
||
""",
|
||
(user_id, limit),
|
||
).fetchall()
|
||
return [dict(row) for row in rows]
|
||
|
||
|
||
def content_stats(conn: sqlite3.Connection) -> dict:
|
||
"""Corpus / serving health for the dashboard: how much good news is live."""
|
||
|
||
def scalar(sql, params=()):
|
||
return conn.execute(sql, params).fetchone()[0] or 0
|
||
|
||
served = scalar(
|
||
"SELECT COUNT(*) FROM article_scores s JOIN articles a ON a.id=s.article_id "
|
||
"WHERE s.accepted=1 AND a.duplicate_of IS NULL"
|
||
)
|
||
accepted_7d = scalar(
|
||
"SELECT COUNT(*) FROM article_scores s JOIN articles a ON a.id=s.article_id "
|
||
"WHERE s.accepted=1 AND a.duplicate_of IS NULL "
|
||
"AND COALESCE(a.published_at, a.discovered_at) >= datetime('now','-7 days')"
|
||
)
|
||
brief = conn.execute(
|
||
"SELECT brief_date, (SELECT COUNT(*) FROM daily_brief_items WHERE brief_id=daily_briefs.id) n "
|
||
"FROM daily_briefs ORDER BY brief_date DESC LIMIT 1"
|
||
).fetchone()
|
||
with_image = scalar(
|
||
"SELECT COUNT(*) FROM article_scores s JOIN articles a ON a.id=s.article_id "
|
||
"WHERE s.accepted=1 AND a.duplicate_of IS NULL AND a.image_url IS NOT NULL AND a.image_url!=''"
|
||
)
|
||
# Coverage is scoped to LIVE articles (accepted, non-duplicate) so the
|
||
# percentages can't drift past 100% as rejected/duplicate rows accrue summaries.
|
||
summaries = scalar(
|
||
"SELECT COUNT(*) FROM article_summaries m JOIN articles a ON a.id=m.article_id "
|
||
"JOIN article_scores s ON s.article_id=a.id WHERE s.accepted=1 AND a.duplicate_of IS NULL"
|
||
)
|
||
summaries_with_image = scalar(
|
||
"SELECT COUNT(*) FROM article_summaries m JOIN articles a ON a.id=m.article_id "
|
||
"JOIN article_scores s ON s.article_id=a.id WHERE s.accepted=1 AND a.duplicate_of IS NULL "
|
||
"AND a.image_url IS NOT NULL AND a.image_url!=''"
|
||
)
|
||
brief_with_image = 0
|
||
if brief:
|
||
brief_with_image = scalar(
|
||
"SELECT COUNT(*) FROM daily_brief_items bi JOIN articles a ON a.id=bi.article_id "
|
||
"JOIN daily_briefs b ON b.id=bi.brief_id "
|
||
"WHERE b.brief_date=? AND a.image_url IS NOT NULL AND a.image_url!=''",
|
||
(brief["brief_date"],),
|
||
)
|
||
return {
|
||
"served": served,
|
||
"total": scalar("SELECT COUNT(*) FROM articles"),
|
||
"rejected": scalar("SELECT COUNT(*) FROM article_scores WHERE accepted=0"),
|
||
"accepted_7d": accepted_7d,
|
||
"added_24h": scalar("SELECT COUNT(*) FROM articles WHERE discovered_at >= datetime('now','-1 day')"),
|
||
"added_7d": scalar("SELECT COUNT(*) FROM articles WHERE discovered_at >= datetime('now','-7 days')"),
|
||
"summaries": summaries,
|
||
"summaries_with_image": summaries_with_image,
|
||
"with_image": with_image,
|
||
# Accepted, imageless articles whose enrichment was tried recently and
|
||
# came up empty (no usable og:image) — the image "misses" to watch.
|
||
"recent_enrich_fail": scalar(
|
||
"SELECT COUNT(*) FROM article_scores s JOIN articles a ON a.id=s.article_id "
|
||
"WHERE s.accepted=1 AND a.duplicate_of IS NULL AND (a.image_url IS NULL OR a.image_url='') "
|
||
"AND a.image_checked_at >= datetime('now','-1 day')"
|
||
),
|
||
"active_sources": scalar("SELECT COUNT(*) FROM sources WHERE active=1"),
|
||
"total_sources": scalar("SELECT COUNT(*) FROM sources"),
|
||
"latest_brief_date": brief["brief_date"] if brief else None,
|
||
"latest_brief_size": brief["n"] if brief else 0,
|
||
"brief_with_image": brief_with_image,
|
||
}
|
||
|
||
|
||
def source_health(conn: sqlite3.Connection) -> list[dict]:
|
||
"""Per source (active, paused, AND retired), the data an operator needs to manage feeds:
|
||
failure streak, last error/success/attempt, computed next poll, and the
|
||
backing metrics (served/accepted/total counts + acceptance & duplicate rates)
|
||
so Pause/Flag decisions aren't vibes-based.
|
||
|
||
next_due_at = last attempt + MIN(cap, interval * (1 + consecutive_failures)),
|
||
mirroring feeds.due_source_rows; NULL last attempt means "due now". Paused
|
||
sources are included (their articles stay live; only polling stops).
|
||
"""
|
||
rows = conn.execute(
|
||
"""
|
||
SELECT
|
||
s.id, s.name, s.feed_url, s.homepage_url, s.default_category AS category, s.active,
|
||
s.status, s.content_visible, s.retry_after_at,
|
||
s.consecutive_failures AS failures, s.review_flag, s.review_reason, s.paywall_override,
|
||
s.poll_interval_minutes AS interval_minutes,
|
||
s.last_success_at, s.last_error_at, substr(s.last_error, 1, 160) AS last_error,
|
||
(SELECT MAX(r.finished_at) FROM ingest_runs r
|
||
WHERE r.source_id = s.id AND r.finished_at IS NOT NULL) AS last_attempt,
|
||
(SELECT COUNT(*) FROM articles a WHERE a.source_id = s.id) AS total_articles,
|
||
(SELECT COUNT(*) FROM articles a JOIN article_scores sc ON sc.article_id = a.id
|
||
WHERE a.source_id = s.id AND sc.accepted = 1) AS accepted_total,
|
||
(SELECT COUNT(*) FROM articles a JOIN article_scores sc ON sc.article_id = a.id
|
||
WHERE a.source_id = s.id AND sc.reason_code = 'non_english') AS non_english,
|
||
(SELECT COUNT(*) FROM articles a WHERE a.source_id = s.id AND a.duplicate_of IS NOT NULL) AS duplicates,
|
||
(SELECT COUNT(*) FROM articles a JOIN article_scores sc ON sc.article_id = a.id
|
||
WHERE a.source_id = s.id AND sc.accepted = 1 AND a.duplicate_of IS NULL) AS served,
|
||
(SELECT COUNT(*) FROM articles a JOIN article_scores sc ON sc.article_id = a.id
|
||
WHERE a.source_id = s.id AND sc.accepted = 1 AND a.duplicate_of IS NULL
|
||
AND a.image_url IS NOT NULL AND a.image_url != '') AS images,
|
||
datetime(
|
||
(SELECT MAX(r.finished_at) FROM ingest_runs r
|
||
WHERE r.source_id = s.id AND r.finished_at IS NOT NULL),
|
||
'+' || MIN(?, s.poll_interval_minutes * (1 + s.consecutive_failures)) || ' minutes'
|
||
) AS next_due_at
|
||
FROM sources s
|
||
ORDER BY s.active DESC, s.consecutive_failures DESC, served DESC, s.name
|
||
""",
|
||
(MAX_BACKOFF_MINUTES,),
|
||
).fetchall()
|
||
out = []
|
||
for r in rows:
|
||
d = dict(r)
|
||
total = d["total_articles"] or 0
|
||
accepted = d["accepted_total"] or 0
|
||
non_english = d.get("non_english") or 0
|
||
# Acceptance is judged over articles actually scored in English — non-English
|
||
# items are HELD (awaiting translation), not calm-filter rejections, so they
|
||
# don't drag a multilingual source's rate down.
|
||
judged = total - non_english
|
||
d["acceptance_rate"] = round(100 * accepted / judged) if judged else None
|
||
d["non_english"] = non_english
|
||
d["non_english_rate"] = round(100 * non_english / total) if total else None
|
||
d["duplicate_rate"] = round(100 * d["duplicates"] / total) if total else None
|
||
# Curation quality: of what this source got ACCEPTED, how much was a
|
||
# duplicate of content already served (accepted_total − served = accepted dupes).
|
||
d["accepted_dup_rate"] = round(100 * (accepted - d["served"]) / accepted) if accepted else None
|
||
d["image_coverage"] = round(100 * (d["images"] or 0) / d["served"]) if d["served"] else None
|
||
# Paywall is a domain-level hint + a per-source override; show the EFFECTIVE flag.
|
||
d["paywalled"] = is_paywalled_for_source(d.get("homepage_url") or d.get("feed_url"), d.get("paywall_override"))
|
||
# Match the REAL scheduler gate: due = the later of the streak-backoff time
|
||
# and any retry_after_at rest (UTC strings sort chronologically).
|
||
due_times = [t for t in (d["next_due_at"], d["retry_after_at"]) if t]
|
||
d["next_due_at"] = max(due_times) if due_times else None
|
||
out.append(d)
|
||
return out
|
||
|
||
|
||
# Attention thresholds — conservative + volume-gated, so the strip is a calm
|
||
# operator nudge rather than a noisy scold. Items are aggregated (one line per
|
||
# condition with a count), not one line per offending source.
|
||
_STALE_DAYS = 10
|
||
_REJECT_MIN_INGESTED = 20
|
||
_REJECT_RATE = 25 # acceptance below this % (with enough volume)
|
||
_DUP_MIN_ACCEPTED = 10
|
||
_DUP_RATE = 50 # accepted-duplicate above this %
|
||
_IMG_MIN_SERVED = 10
|
||
_IMG_COVERAGE = 25 # per-source image coverage below this %
|
||
_LONG_REST_HOURS = 12
|
||
|
||
|
||
def _attention(content: dict, sources: list[dict], feedback_unread: int, now: datetime | None = None) -> list[dict]:
|
||
"""The 'Attention Needed' strip: what an operator should look at, soft-toned
|
||
(warn = act soon, info = worth a glance). Derived from the same data shown
|
||
elsewhere, so it never disagrees with the detail sections."""
|
||
items: list[dict] = []
|
||
n = lambda c: "" if c == 1 else "s" # noqa: E731 — tiny pluralizer
|
||
now = now or datetime.now(UTC)
|
||
active = [s for s in sources if (s.get("status") or ("active" if s.get("active") else "paused")) == "active"]
|
||
|
||
resting = [s for s in active if (s.get("failures") or 0) > 0]
|
||
if resting:
|
||
items.append({"level": "warn", "text": f"{len(resting)} source{n(len(resting))} backing off after failures"})
|
||
flagged = [s for s in sources if s.get("review_flag")]
|
||
if flagged:
|
||
items.append({"level": "warn", "text": f"{len(flagged)} source{n(len(flagged))} flagged for review"})
|
||
|
||
# Stale: active, visible feeds whose last success is well in the past.
|
||
stale_cutoff = (now - timedelta(days=_STALE_DAYS)).strftime("%Y-%m-%d %H:%M:%S")
|
||
stale = [s for s in active if s.get("content_visible", 1) and s.get("last_success_at") and s["last_success_at"] < stale_cutoff]
|
||
if stale:
|
||
items.append({"level": "warn", "text": f"{len(stale)} source{n(len(stale))} haven't updated in over {_STALE_DAYS} days"})
|
||
|
||
# High rejection: enough ingested volume, low acceptance.
|
||
rejecting = [s for s in active if (s.get("total_articles") or 0) >= _REJECT_MIN_INGESTED
|
||
and s.get("acceptance_rate") is not None and s["acceptance_rate"] < _REJECT_RATE]
|
||
if rejecting:
|
||
items.append({"level": "info", "text": f"{len(rejecting)} source{n(len(rejecting))} accepting under {_REJECT_RATE}% of submissions"})
|
||
|
||
# High accepted-duplicate: enough accepted volume, mostly echoing others.
|
||
duping = [s for s in active if (s.get("accepted_total") or 0) >= _DUP_MIN_ACCEPTED
|
||
and s.get("accepted_dup_rate") is not None and s["accepted_dup_rate"] > _DUP_RATE]
|
||
if duping:
|
||
items.append({"level": "info", "text": f"{len(duping)} source{n(len(duping))} mostly duplicating other feeds"})
|
||
|
||
# Low image coverage (info, not a warning).
|
||
thin = [s for s in active if (s.get("served") or 0) >= _IMG_MIN_SERVED
|
||
and s.get("image_coverage") is not None and s["image_coverage"] < _IMG_COVERAGE]
|
||
if thin:
|
||
items.append({"level": "info", "text": f"{len(thin)} source{n(len(thin))} with thin image coverage (under {_IMG_COVERAGE}%)"})
|
||
|
||
# Long rate-limit rest (info).
|
||
rest_cutoff = (now + timedelta(hours=_LONG_REST_HOURS)).strftime("%Y-%m-%d %H:%M:%S")
|
||
long_rest = [s for s in active if s.get("retry_after_at") and s["retry_after_at"] > rest_cutoff]
|
||
if long_rest:
|
||
items.append({"level": "info", "text": f"{len(long_rest)} source{n(len(long_rest))} rate-limited for {_LONG_REST_HOURS}h+"})
|
||
|
||
# Site-wide signals.
|
||
served = content.get("served") or 0
|
||
with_image = content.get("with_image") or 0
|
||
if served and (with_image / served) < 0.70:
|
||
items.append({"level": "info", "text": f"Image coverage at {round(100 * with_image / served)}% (aim for 70%+)"})
|
||
|
||
brief_size = content.get("latest_brief_size") or 0
|
||
if brief_size < 7:
|
||
items.append({"level": "info", "text": f"Today's brief has only {brief_size} item{n(brief_size)}"})
|
||
|
||
if feedback_unread:
|
||
items.append({"level": "info", "text": f"{feedback_unread} unread feedback message{n(feedback_unread)}"})
|
||
return items
|
||
|
||
|
||
# --- Source article inspector: the real articles behind the source metrics -----
|
||
|
||
_SRC_ART_FILTERS = {
|
||
"accepted": "AND s.accepted = 1",
|
||
# 'rejected' = calm-filter rejections only; non-English is HELD, its own bucket.
|
||
"rejected": "AND s.accepted = 0 AND COALESCE(s.reason_code,'') != 'non_english'",
|
||
"held": "AND s.reason_code = 'non_english'",
|
||
"no_image": "AND (a.image_url IS NULL OR a.image_url = '')",
|
||
"duplicates": "AND a.duplicate_of IS NOT NULL",
|
||
}
|
||
|
||
|
||
def source_articles(conn: sqlite3.Connection, source_id: int, filter: str = "all",
|
||
limit: int = 25, offset: int = 0) -> list[dict]:
|
||
"""The actual ingested articles for a source, newest first — so admins can
|
||
verify the metric (paywall/image/acceptance) against real evidence."""
|
||
ov = conn.execute("SELECT paywall_override FROM sources WHERE id = ?", (source_id,)).fetchone()
|
||
override = ov["paywall_override"] if ov else None
|
||
where = _SRC_ART_FILTERS.get(filter, "")
|
||
rows = conn.execute(
|
||
f"""
|
||
SELECT a.id, a.title, a.canonical_url, a.published_at, a.discovered_at,
|
||
a.image_url, a.duplicate_of,
|
||
s.accepted, s.reason_code, s.reason_text, s.topic, s.flavor
|
||
FROM articles a
|
||
LEFT JOIN article_scores s ON s.article_id = a.id
|
||
WHERE a.source_id = ? {where}
|
||
ORDER BY COALESCE(a.published_at, a.discovered_at) DESC
|
||
LIMIT ? OFFSET ?
|
||
""",
|
||
(source_id, limit, offset),
|
||
).fetchall()
|
||
return [
|
||
{
|
||
"id": r["id"],
|
||
"title": r["title"],
|
||
"url": r["canonical_url"],
|
||
"published_at": r["published_at"] or r["discovered_at"],
|
||
"accepted": r["accepted"],
|
||
"reason": r["reason_text"] or r["reason_code"], # the "why" behind accept/reject
|
||
"held": r["reason_code"] == "non_english", # held for language, not rejected
|
||
"topic": r["topic"],
|
||
"flavor": r["flavor"],
|
||
"paywalled": is_paywalled_for_source(r["canonical_url"], override), # effective (domain rule + override)
|
||
"has_image": bool(r["image_url"]),
|
||
"duplicate": r["duplicate_of"] is not None,
|
||
}
|
||
for r in rows
|
||
]
|
||
|
||
|
||
def source_articles_summary(conn: sqlite3.Connection, source_id: int) -> dict:
|
||
"""Counts behind the table metrics + the source-level paywall rule, so the
|
||
panel header reads e.g. '120 · 96 accepted · 24 rejected · 3 no image · paywall: ON'."""
|
||
agg = conn.execute(
|
||
"""
|
||
SELECT COUNT(*) total,
|
||
COALESCE(SUM(s.accepted = 1), 0) accepted,
|
||
COALESCE(SUM(s.accepted = 0 AND COALESCE(s.reason_code,'') != 'non_english'), 0) rejected,
|
||
COALESCE(SUM(s.reason_code = 'non_english'), 0) non_english,
|
||
COALESCE(SUM(a.image_url IS NULL OR a.image_url = ''), 0) no_image,
|
||
COALESCE(SUM(a.duplicate_of IS NOT NULL), 0) duplicates
|
||
FROM articles a LEFT JOIN article_scores s ON s.article_id = a.id
|
||
WHERE a.source_id = ?
|
||
""",
|
||
(source_id,),
|
||
).fetchone()
|
||
srow = conn.execute("SELECT homepage_url, feed_url, paywall_override FROM sources WHERE id = ?", (source_id,)).fetchone()
|
||
override = srow["paywall_override"] if srow else None
|
||
url = (srow["homepage_url"] or srow["feed_url"]) if srow else None
|
||
return {
|
||
"total": agg["total"], "accepted": agg["accepted"], "rejected": agg["rejected"],
|
||
"non_english": agg["non_english"], # held for language (not a calm-filter rejection)
|
||
"no_image": agg["no_image"], "duplicates": agg["duplicates"],
|
||
"paywalled": is_paywalled_for_source(url, override), # effective
|
||
"paywall_domain": is_paywalled(url), # what the domain rule alone says
|
||
"paywall_override": override, # null | 'free' | 'paywalled' — the basis
|
||
}
|
||
|
||
|
||
def admin_stats(conn: sqlite3.Connection, days: int = 30) -> dict:
|
||
"""Aggregate, non-personal usage stats for the admin dashboard."""
|
||
since = f"-{days} days"
|
||
# "Today" for timestamp-based counters is the SITE-LOCAL day (GOODNEWS_TZ), not
|
||
# UTC: otherwise an evening error (e.g. 22:53 local) lands on the next UTC day and
|
||
# reads as a fresh "today" the following morning — the exact false-alarm we hit.
|
||
local_day_start = (local_now().replace(hour=0, minute=0, second=0, microsecond=0)
|
||
.astimezone(UTC).strftime("%Y-%m-%d %H:%M:%S"))
|
||
|
||
def scalar(sql, params=()):
|
||
return conn.execute(sql, params).fetchone()[0] or 0
|
||
|
||
visitors = {
|
||
"today": scalar("SELECT COUNT(DISTINCT visitor_hash) FROM events "
|
||
"WHERE kind='visit' AND visitor_hash!='' AND day=date('now')"),
|
||
"d7": scalar("SELECT COUNT(DISTINCT visitor_hash) FROM events "
|
||
"WHERE kind='visit' AND visitor_hash!='' AND day>=date('now','-7 days')"),
|
||
"d30": scalar("SELECT COUNT(DISTINCT visitor_hash) FROM events "
|
||
"WHERE kind='visit' AND visitor_hash!='' AND day>=date('now',?)", (since,)),
|
||
}
|
||
|
||
# Returning (seen on ≥2 distinct days) vs one-and-done, over the window.
|
||
rows = conn.execute(
|
||
"SELECT CASE WHEN d>=2 THEN 'returning' ELSE 'once' END g, COUNT(*) n FROM ("
|
||
" SELECT visitor_hash, COUNT(DISTINCT day) d FROM events "
|
||
" WHERE kind='visit' AND visitor_hash!='' AND day>=date('now',?) GROUP BY visitor_hash"
|
||
") GROUP BY g", (since,),
|
||
).fetchall()
|
||
loyalty = {r["g"]: r["n"] for r in rows}
|
||
|
||
# An "open" = opening the article via our summary page (legacy 'open' + 'summary_viewed').
|
||
OPEN = "e.kind IN ('open','summary_viewed')"
|
||
|
||
top_articles = [dict(r) for r in conn.execute(
|
||
f"SELECT e.article_id AS id, a.title, src.name AS source, COUNT(*) AS opens "
|
||
f"FROM events e JOIN articles a ON a.id=e.article_id JOIN sources src ON src.id=a.source_id "
|
||
f"WHERE {OPEN} AND e.article_id>0 AND e.day>=date('now',?) "
|
||
f"GROUP BY e.article_id ORDER BY opens DESC LIMIT 12", (since,),
|
||
)]
|
||
|
||
top_groupings = [dict(r) for r in conn.execute(
|
||
f"SELECT t.tag, COUNT(*) AS opens FROM events e JOIN article_tags t ON t.article_id=e.article_id "
|
||
f"WHERE {OPEN} AND e.day>=date('now',?) GROUP BY t.tag ORDER BY opens DESC LIMIT 12", (since,),
|
||
)]
|
||
|
||
top_topics = [dict(r) for r in conn.execute(
|
||
f"SELECT s.topic, COUNT(*) AS opens FROM events e JOIN article_scores s ON s.article_id=e.article_id "
|
||
f"WHERE {OPEN} AND s.topic IS NOT NULL AND e.day>=date('now',?) "
|
||
f"GROUP BY s.topic ORDER BY opens DESC", (since,),
|
||
)]
|
||
|
||
# Counts per event kind over the window (each = distinct visitor-days, by dedup).
|
||
kc_rows = conn.execute(
|
||
"SELECT kind, COUNT(*) n FROM events WHERE day>=date('now',?) GROUP BY kind", (since,)
|
||
).fetchall()
|
||
kc = {r["kind"]: r["n"] for r in kc_rows}
|
||
|
||
shares = {k: kc.get(k, 0) for k in ("share_ub", "copy_source", "native_share", "source_click")}
|
||
|
||
summary_views = kc.get("summary_viewed", 0)
|
||
source_clicks = kc.get("source_click", 0)
|
||
funnel = {
|
||
"summary_viewed": summary_views,
|
||
"source_click": source_clicks,
|
||
"full_story": kc.get("full_story", 0),
|
||
"source_rate": round(100 * source_clicks / summary_views) if summary_views else 0,
|
||
}
|
||
emotional_mix = {
|
||
"not_today": kc.get("not_today", 0),
|
||
"less_like_this": kc.get("less_like_this", 0),
|
||
"hide_topic": kc.get("hide_topic", 0),
|
||
}
|
||
paywall = {
|
||
"paywall_replace": kc.get("paywall_replace", 0),
|
||
"paywalled_source_open": kc.get("paywalled_source_open", 0),
|
||
}
|
||
replace = {"used": kc.get("replace_used", 0), "none": kc.get("replace_none", 0)}
|
||
|
||
# Game funnel — the growth loop we're instrumenting. Each count is distinct
|
||
# visitor-days (events dedupe per kind/day), so it reads as "people", not actions.
|
||
_GAME_NAMES = ("word", "wordsearch", "bloom", "match")
|
||
_GAME_EVENTS = ("arrival", "started", "completed", "shared") # arrival = share-loop acquisition
|
||
games_by = {
|
||
g: {e: kc.get(f"{g}_{e}", 0) for e in _GAME_EVENTS}
|
||
for g in _GAME_NAMES
|
||
}
|
||
games = {
|
||
"by_game": games_by,
|
||
"totals": {e: sum(games_by[g][e] for g in _GAME_NAMES) for e in _GAME_EVENTS},
|
||
}
|
||
|
||
# Accounts — aggregate counts only (no emails, no per-user listing).
|
||
accounts = {
|
||
"total": scalar("SELECT COUNT(*) FROM users"),
|
||
"new_today": scalar("SELECT COUNT(*) FROM users WHERE date(created_at)=date('now')"),
|
||
"new_7d": scalar("SELECT COUNT(*) FROM users WHERE created_at>=date('now','-7 days')"),
|
||
"new_30d": scalar("SELECT COUNT(*) FROM users WHERE created_at>=date('now',?)", (since,)),
|
||
"active_7d": scalar("SELECT COUNT(DISTINCT user_id) FROM sessions WHERE last_seen_at>=date('now','-7 days')"),
|
||
}
|
||
|
||
# Returning-visitor buckets by distinct active days in the window.
|
||
bucket_rows = conn.execute(
|
||
"SELECT CASE WHEN d>=8 THEN '8+' WHEN d>=4 THEN '4-7' WHEN d>=2 THEN '2-3' ELSE 'new' END b, "
|
||
"COUNT(*) n FROM (SELECT visitor_hash, COUNT(DISTINCT day) d FROM events "
|
||
"WHERE kind='visit' AND visitor_hash!='' AND day>=date('now',?) GROUP BY visitor_hash) GROUP BY b",
|
||
(since,),
|
||
).fetchall()
|
||
buckets = {r["b"]: r["n"] for r in bucket_rows}
|
||
retention = {k: buckets.get(k, 0) for k in ("new", "2-3", "4-7", "8+")}
|
||
|
||
daily = [dict(r) for r in conn.execute(
|
||
"SELECT day, SUM(kind IN ('open','summary_viewed')) AS opens, SUM(kind='visit') AS visits "
|
||
"FROM events WHERE day>=date('now',?) GROUP BY day ORDER BY day", (since,),
|
||
)]
|
||
|
||
content = content_stats(conn)
|
||
sources = source_health(conn)
|
||
feedback_7d = scalar("SELECT COUNT(*) FROM feedback WHERE created_at >= datetime('now','-7 days')")
|
||
feedback_unread = scalar("SELECT COUNT(*) FROM feedback WHERE read_at IS NULL")
|
||
|
||
return {
|
||
"days": days,
|
||
"content": content,
|
||
"sources": sources,
|
||
"attention": _attention(content, sources, feedback_unread),
|
||
"feedback_7d": feedback_7d,
|
||
"feedback_unread": feedback_unread,
|
||
"visitors": visitors,
|
||
"returning": loyalty.get("returning", 0),
|
||
"once": loyalty.get("once", 0),
|
||
"retention": retention,
|
||
"accounts": accounts,
|
||
"funnel": funnel,
|
||
"emotional_mix": emotional_mix,
|
||
"paywall": paywall,
|
||
"replace": replace,
|
||
"games": games,
|
||
"top_articles": top_articles,
|
||
"top_groupings": top_groupings,
|
||
"top_topics": top_topics,
|
||
"shares": shares,
|
||
"daily": daily,
|
||
# Boot-failure seatbelt signal — blank-screen risk surfacing. Bots are
|
||
# excluded from the headline counts: throttled crawlers fail the boot
|
||
# check routinely and would read as real users seeing blank screens.
|
||
"client_errors": {
|
||
"today": scalar(
|
||
f"SELECT COUNT(*) FROM client_errors WHERE created_at >= ? AND {_NOT_BOT_SQL}",
|
||
(local_day_start,),
|
||
),
|
||
"window": scalar(
|
||
f"SELECT COUNT(*) FROM client_errors WHERE created_at>=date('now',?) AND {_NOT_BOT_SQL}",
|
||
(since,),
|
||
),
|
||
},
|
||
}
|
||
|
||
|
||
def existing_article_ids(conn: sqlite3.Connection, ids: list[int]) -> list[int]:
|
||
"""Filter to ids that still exist (FK-safe inserts for save/history/import)."""
|
||
clean = list({int(i) for i in ids})[:1000]
|
||
if not clean:
|
||
return []
|
||
placeholders = ",".join("?" * len(clean))
|
||
return [r[0] for r in conn.execute(
|
||
f"SELECT id FROM articles WHERE id IN ({placeholders})", clean
|
||
)]
|
||
|
||
|
||
def tag_counts(conn: sqlite3.Connection, accepted_only: bool = True) -> dict:
|
||
"""How many shown (accepted, non-duplicate) articles carry each grouping tag."""
|
||
where = "WHERE a.duplicate_of IS NULL" + (" AND s.accepted = 1" if accepted_only else "")
|
||
rows = conn.execute(
|
||
f"""
|
||
SELECT t.tag, COUNT(*) AS count
|
||
FROM article_tags t
|
||
JOIN articles a ON a.id = t.article_id
|
||
JOIN article_scores s ON s.article_id = a.id
|
||
{where}
|
||
GROUP BY t.tag
|
||
"""
|
||
).fetchall()
|
||
return {r["tag"]: r["count"] for r in rows}
|
||
|
||
|
||
def category_counts(conn: sqlite3.Connection, accepted_only: bool = True) -> list[dict]:
|
||
"""Return per topic/flavor article counts for building browse UIs.
|
||
|
||
Joins articles and excludes duplicates so the counts match exactly what the
|
||
feed endpoint will actually return for each topic/flavor.
|
||
"""
|
||
clauses = ["a.duplicate_of IS NULL"]
|
||
clauses.append("s.accepted = 1" if accepted_only else "s.topic IS NOT NULL")
|
||
rows = conn.execute(
|
||
f"""
|
||
SELECT s.topic, s.flavor, COUNT(*) AS count
|
||
FROM article_scores s
|
||
JOIN articles a ON a.id = s.article_id
|
||
WHERE {" AND ".join(clauses)}
|
||
GROUP BY s.topic, s.flavor
|
||
ORDER BY s.topic, s.flavor
|
||
"""
|
||
).fetchall()
|
||
return [dict(row) for row in rows]
|
||
|
||
|
||
def available_dates(conn: sqlite3.Connection, limit: int = 30) -> list[str]:
|
||
rows = conn.execute(
|
||
"SELECT brief_date FROM daily_briefs ORDER BY brief_date DESC LIMIT ?",
|
||
(limit,),
|
||
).fetchall()
|
||
return [row["brief_date"] for row in rows]
|
||
|
||
|
||
def _latest_brief_date(conn: sqlite3.Connection) -> str | None:
|
||
row = conn.execute(
|
||
"SELECT brief_date FROM daily_briefs ORDER BY brief_date DESC LIMIT 1"
|
||
).fetchone()
|
||
return row["brief_date"] if row else None
|