Files
upbeatBytes/goodnews/queries.py
T
thejayman77 38889f76e5 Source feeds: click a source to see its publication feed
Click a source name on any card → a feed of just that source's articles,
newest-first, still accepted / non-duplicate / boundary-filtered (the calm
promise isn't bypassed). A natural way to follow a publication's feel.

* queries.feed + /api/feed: source_id filter; Article output gains source_id.
* Frontend: source label is a button → transient 'source:<id>' view (like
  'tag:<slug>'), rendered in the feed grid with Load more, header = source name.
* Ad-hoc, not a pinned lane. Foundation for a future source page (metadata) +
  Follow; shareable /source/<slug> route and source_view analytics come then.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-08 08:30:33 -04:00

461 lines
18 KiB
Python

"""Read-only query helpers over the goodNews database.
Pure stdlib and framework-agnostic: returns plain dicts so the same functions
back both the CLI and the JSON API. All article output is metadata + a link to
the original source — never stored bodies.
"""
from __future__ import annotations
import sqlite3
from .feeds import MAX_BACKOFF_MINUTES
# Composite ranking used everywhere a "best first" order is needed. Kept as one
# expression so brief, category feeds, and the API all rank identically.
RANK_SCORE_SQL = (
"(s.constructive_score + s.agency_score + s.human_benefit_score + src.trust_score "
"- s.cortisol_score - s.ragebait_score - s.pr_risk_score)"
)
_ARTICLE_COLUMNS = f"""
a.id,
a.title,
a.description,
a.canonical_url,
a.published_at,
a.image_url,
a.source_id,
src.name AS source_name,
s.topic,
s.flavor,
s.accepted,
s.constructive_score,
s.cortisol_score,
s.ragebait_score,
s.agency_score,
s.human_benefit_score,
s.pr_risk_score,
s.reason_code,
s.reason_text,
s.model_name,
(SELECT group_concat(t.tag) FROM article_tags t WHERE t.article_id = a.id) AS tags,
{RANK_SCORE_SQL} AS rank_score
"""
def feed(
conn: sqlite3.Connection,
topic: str | None = None,
flavor: str | None = None,
accepted_only: bool = True,
limit: int = 30,
offset: int = 0,
include_topics: list[str] | None = None,
include_flavors: list[str] | None = None,
mute_topics: list[str] | None = None,
mute_flavors: list[str] | None = None,
max_cortisol: int | None = None,
max_ragebait: int | None = None,
tag: str | None = None,
source_id: int | None = None,
sort: str = "ranked",
) -> list[dict]:
"""Return articles with categorical filters applied in SQL.
sort="ranked" (default) is best-first by the composite rank; sort="latest"
is pure recency (newest first) for the chronological "Latest" feed. Both
stay accepted-only and respect the same boundaries.
Categorical filters (topic/flavor include & mute, cortisol/ragebait ceilings)
must be applied here, not after ranking — otherwise low-ranked-but-matching
items (e.g. 'discovery' for a Wonder lane) fall outside any over-fetch window.
Word-boundary avoid-terms remain a Python pass on the caller side.
"""
clauses = ["a.duplicate_of IS NULL"]
params: list = []
if accepted_only:
clauses.append("s.accepted = 1")
if topic:
clauses.append("s.topic = ?")
params.append(topic.lower())
if flavor:
clauses.append("s.flavor = ?")
params.append(flavor.lower())
def _in(column: str, values: list[str], negate: bool = False) -> None:
vals = [v.lower() for v in values]
placeholders = ",".join("?" * len(vals))
op = "NOT IN" if negate else "IN"
# COALESCE keeps NULL-category rows from being dropped by NOT IN.
clauses.append(f"COALESCE({column}, '') {op} ({placeholders})")
params.extend(vals)
if include_topics:
_in("s.topic", include_topics)
if include_flavors:
_in("s.flavor", include_flavors)
if mute_topics:
_in("s.topic", mute_topics, negate=True)
if mute_flavors:
_in("s.flavor", mute_flavors, negate=True)
if max_cortisol is not None:
clauses.append("COALESCE(s.cortisol_score, 0) <= ?")
params.append(max_cortisol)
if max_ragebait is not None:
clauses.append("COALESCE(s.ragebait_score, 0) <= ?")
params.append(max_ragebait)
if tag:
clauses.append("EXISTS (SELECT 1 FROM article_tags at WHERE at.article_id = a.id AND at.tag = ?)")
params.append(tag.lower())
if source_id:
clauses.append("a.source_id = ?")
params.append(source_id)
where = "WHERE " + " AND ".join(clauses)
params.extend([limit, offset])
order_by = (
"COALESCE(a.published_at, a.discovered_at) DESC, rank_score DESC"
if sort == "latest"
else "rank_score DESC, COALESCE(a.published_at, a.discovered_at) DESC"
)
rows = conn.execute(
f"""
SELECT {_ARTICLE_COLUMNS}
FROM articles a
JOIN sources src ON src.id = a.source_id
JOIN article_scores s ON s.article_id = a.id
{where}
ORDER BY {order_by}
LIMIT ? OFFSET ?
""",
params,
).fetchall()
return [dict(row) for row in rows]
def brief(conn: sqlite3.Connection, brief_date: str | None = None, limit: int = 10) -> dict:
"""Return a stored daily brief (latest if no date) with its ranked items."""
target_date = brief_date or _latest_brief_date(conn)
if not target_date:
return {"brief_date": None, "title": None, "items": []}
header = conn.execute(
"SELECT brief_date, title, created_at FROM daily_briefs WHERE brief_date = ?",
(target_date,),
).fetchone()
if not header:
return {"brief_date": target_date, "title": None, "created_at": None, "items": []}
rows = conn.execute(
f"""
SELECT bi.rank, bi.selection_reason, {_ARTICLE_COLUMNS},
(SELECT summary FROM article_summaries WHERE article_id = a.id) AS summary
FROM daily_briefs b
JOIN daily_brief_items bi ON bi.brief_id = b.id
JOIN articles a ON a.id = bi.article_id
JOIN sources src ON src.id = a.source_id
LEFT JOIN article_scores s ON s.article_id = a.id
WHERE b.brief_date = ?
ORDER BY bi.rank
LIMIT ?
""",
(target_date, limit),
).fetchall()
return {
"brief_date": header["brief_date"],
"title": header["title"],
"created_at": header["created_at"],
"items": [dict(row) for row in rows],
}
def saved(conn: sqlite3.Connection, user_id: int, limit: int = 200) -> list[dict]:
"""Articles a user has saved, newest first (same shape as the feed)."""
rows = conn.execute(
f"""
SELECT {_ARTICLE_COLUMNS}
FROM saved_articles sv
JOIN articles a ON a.id = sv.article_id
JOIN sources src ON src.id = a.source_id
LEFT JOIN article_scores s ON s.article_id = a.id
WHERE sv.user_id = ?
ORDER BY sv.saved_at DESC
LIMIT ?
""",
(user_id, limit),
).fetchall()
return [dict(row) for row in rows]
def saved_ids(conn: sqlite3.Connection, user_id: int) -> list[int]:
return [r[0] for r in conn.execute(
"SELECT article_id FROM saved_articles WHERE user_id = ?", (user_id,)
)]
def history(conn: sqlite3.Connection, user_id: int, limit: int = 200) -> list[dict]:
"""Articles in a user's account history, most-recent first."""
rows = conn.execute(
f"""
SELECT {_ARTICLE_COLUMNS}, MAX(h.at) AS seen_at
FROM user_history h
JOIN articles a ON a.id = h.article_id
JOIN sources src ON src.id = a.source_id
LEFT JOIN article_scores s ON s.article_id = a.id
WHERE h.user_id = ?
GROUP BY a.id
ORDER BY seen_at DESC
LIMIT ?
""",
(user_id, limit),
).fetchall()
return [dict(row) for row in rows]
def content_stats(conn: sqlite3.Connection) -> dict:
"""Corpus / serving health for the dashboard: how much good news is live."""
def scalar(sql, params=()):
return conn.execute(sql, params).fetchone()[0] or 0
served = scalar(
"SELECT COUNT(*) FROM article_scores s JOIN articles a ON a.id=s.article_id "
"WHERE s.accepted=1 AND a.duplicate_of IS NULL"
)
accepted_7d = scalar(
"SELECT COUNT(*) FROM article_scores s JOIN articles a ON a.id=s.article_id "
"WHERE s.accepted=1 AND a.duplicate_of IS NULL "
"AND COALESCE(a.published_at, a.discovered_at) >= datetime('now','-7 days')"
)
brief = conn.execute(
"SELECT brief_date, (SELECT COUNT(*) FROM daily_brief_items WHERE brief_id=daily_briefs.id) n "
"FROM daily_briefs ORDER BY brief_date DESC LIMIT 1"
).fetchone()
return {
"served": served,
"total": scalar("SELECT COUNT(*) FROM articles"),
"rejected": scalar("SELECT COUNT(*) FROM article_scores WHERE accepted=0"),
"accepted_7d": accepted_7d,
"added_24h": scalar("SELECT COUNT(*) FROM articles WHERE discovered_at >= datetime('now','-1 day')"),
"summaries": scalar("SELECT COUNT(*) FROM article_summaries"),
"active_sources": scalar("SELECT COUNT(*) FROM sources WHERE active=1"),
"total_sources": scalar("SELECT COUNT(*) FROM sources"),
"latest_brief_date": brief["brief_date"] if brief else None,
"latest_brief_size": brief["n"] if brief else 0,
}
def source_health(conn: sqlite3.Connection) -> list[dict]:
"""Per active source: failure streak, last error, accepted contribution, and
the computed next-poll time (so the backoff/'resting until' state is visible).
next_due_at = last attempt + MIN(cap, interval * (1 + consecutive_failures)),
mirroring feeds.due_source_rows; NULL last attempt means "due now".
"""
rows = conn.execute(
"""
SELECT
s.id, s.name, s.default_category AS category, s.active,
s.consecutive_failures AS failures,
s.poll_interval_minutes AS interval_minutes,
s.last_success_at, s.last_error_at, substr(s.last_error, 1, 160) AS last_error,
(SELECT MAX(r.finished_at) FROM ingest_runs r
WHERE r.source_id = s.id AND r.finished_at IS NOT NULL) AS last_attempt,
(SELECT COUNT(*) FROM articles a JOIN article_scores sc ON sc.article_id = a.id
WHERE a.source_id = s.id AND sc.accepted = 1 AND a.duplicate_of IS NULL) AS served,
datetime(
(SELECT MAX(r.finished_at) FROM ingest_runs r
WHERE r.source_id = s.id AND r.finished_at IS NOT NULL),
'+' || MIN(?, s.poll_interval_minutes * (1 + s.consecutive_failures)) || ' minutes'
) AS next_due_at
FROM sources s
WHERE s.active = 1
ORDER BY s.consecutive_failures DESC, served DESC, s.name
""",
(MAX_BACKOFF_MINUTES,),
).fetchall()
return [dict(r) for r in rows]
def admin_stats(conn: sqlite3.Connection, days: int = 30) -> dict:
"""Aggregate, non-personal usage stats for the admin dashboard."""
since = f"-{days} days"
def scalar(sql, params=()):
return conn.execute(sql, params).fetchone()[0] or 0
visitors = {
"today": scalar("SELECT COUNT(DISTINCT visitor_hash) FROM events "
"WHERE kind='visit' AND visitor_hash!='' AND day=date('now')"),
"d7": scalar("SELECT COUNT(DISTINCT visitor_hash) FROM events "
"WHERE kind='visit' AND visitor_hash!='' AND day>=date('now','-7 days')"),
"d30": scalar("SELECT COUNT(DISTINCT visitor_hash) FROM events "
"WHERE kind='visit' AND visitor_hash!='' AND day>=date('now',?)", (since,)),
}
# Returning (seen on ≥2 distinct days) vs one-and-done, over the window.
rows = conn.execute(
"SELECT CASE WHEN d>=2 THEN 'returning' ELSE 'once' END g, COUNT(*) n FROM ("
" SELECT visitor_hash, COUNT(DISTINCT day) d FROM events "
" WHERE kind='visit' AND visitor_hash!='' AND day>=date('now',?) GROUP BY visitor_hash"
") GROUP BY g", (since,),
).fetchall()
loyalty = {r["g"]: r["n"] for r in rows}
# An "open" = opening the article via our summary page (legacy 'open' + 'summary_viewed').
OPEN = "e.kind IN ('open','summary_viewed')"
top_articles = [dict(r) for r in conn.execute(
f"SELECT e.article_id AS id, a.title, src.name AS source, COUNT(*) AS opens "
f"FROM events e JOIN articles a ON a.id=e.article_id JOIN sources src ON src.id=a.source_id "
f"WHERE {OPEN} AND e.article_id>0 AND e.day>=date('now',?) "
f"GROUP BY e.article_id ORDER BY opens DESC LIMIT 12", (since,),
)]
top_groupings = [dict(r) for r in conn.execute(
f"SELECT t.tag, COUNT(*) AS opens FROM events e JOIN article_tags t ON t.article_id=e.article_id "
f"WHERE {OPEN} AND e.day>=date('now',?) GROUP BY t.tag ORDER BY opens DESC LIMIT 12", (since,),
)]
top_topics = [dict(r) for r in conn.execute(
f"SELECT s.topic, COUNT(*) AS opens FROM events e JOIN article_scores s ON s.article_id=e.article_id "
f"WHERE {OPEN} AND s.topic IS NOT NULL AND e.day>=date('now',?) "
f"GROUP BY s.topic ORDER BY opens DESC", (since,),
)]
# Counts per event kind over the window (each = distinct visitor-days, by dedup).
kc_rows = conn.execute(
"SELECT kind, COUNT(*) n FROM events WHERE day>=date('now',?) GROUP BY kind", (since,)
).fetchall()
kc = {r["kind"]: r["n"] for r in kc_rows}
shares = {k: kc.get(k, 0) for k in ("share_ub", "copy_source", "native_share", "source_click")}
summary_views = kc.get("summary_viewed", 0)
source_clicks = kc.get("source_click", 0)
funnel = {
"summary_viewed": summary_views,
"source_click": source_clicks,
"full_story": kc.get("full_story", 0),
"source_rate": round(100 * source_clicks / summary_views) if summary_views else 0,
}
emotional_mix = {
"not_today": kc.get("not_today", 0),
"less_like_this": kc.get("less_like_this", 0),
"hide_topic": kc.get("hide_topic", 0),
}
paywall = {
"paywall_replace": kc.get("paywall_replace", 0),
"paywalled_source_open": kc.get("paywalled_source_open", 0),
}
replace = {"used": kc.get("replace_used", 0), "none": kc.get("replace_none", 0)}
# Accounts — aggregate counts only (no emails, no per-user listing).
accounts = {
"total": scalar("SELECT COUNT(*) FROM users"),
"new_today": scalar("SELECT COUNT(*) FROM users WHERE date(created_at)=date('now')"),
"new_7d": scalar("SELECT COUNT(*) FROM users WHERE created_at>=date('now','-7 days')"),
"new_30d": scalar("SELECT COUNT(*) FROM users WHERE created_at>=date('now',?)", (since,)),
"active_7d": scalar("SELECT COUNT(DISTINCT user_id) FROM sessions WHERE last_seen_at>=date('now','-7 days')"),
}
# Returning-visitor buckets by distinct active days in the window.
bucket_rows = conn.execute(
"SELECT CASE WHEN d>=8 THEN '8+' WHEN d>=4 THEN '4-7' WHEN d>=2 THEN '2-3' ELSE 'new' END b, "
"COUNT(*) n FROM (SELECT visitor_hash, COUNT(DISTINCT day) d FROM events "
"WHERE kind='visit' AND visitor_hash!='' AND day>=date('now',?) GROUP BY visitor_hash) GROUP BY b",
(since,),
).fetchall()
buckets = {r["b"]: r["n"] for r in bucket_rows}
retention = {k: buckets.get(k, 0) for k in ("new", "2-3", "4-7", "8+")}
daily = [dict(r) for r in conn.execute(
"SELECT day, SUM(kind IN ('open','summary_viewed')) AS opens, SUM(kind='visit') AS visits "
"FROM events WHERE day>=date('now',?) GROUP BY day ORDER BY day", (since,),
)]
return {
"days": days,
"content": content_stats(conn),
"sources": source_health(conn),
"visitors": visitors,
"returning": loyalty.get("returning", 0),
"once": loyalty.get("once", 0),
"retention": retention,
"accounts": accounts,
"funnel": funnel,
"emotional_mix": emotional_mix,
"paywall": paywall,
"replace": replace,
"top_articles": top_articles,
"top_groupings": top_groupings,
"top_topics": top_topics,
"shares": shares,
"daily": daily,
}
def existing_article_ids(conn: sqlite3.Connection, ids: list[int]) -> list[int]:
"""Filter to ids that still exist (FK-safe inserts for save/history/import)."""
clean = list({int(i) for i in ids})[:1000]
if not clean:
return []
placeholders = ",".join("?" * len(clean))
return [r[0] for r in conn.execute(
f"SELECT id FROM articles WHERE id IN ({placeholders})", clean
)]
def tag_counts(conn: sqlite3.Connection, accepted_only: bool = True) -> dict:
"""How many shown (accepted, non-duplicate) articles carry each grouping tag."""
where = "WHERE a.duplicate_of IS NULL" + (" AND s.accepted = 1" if accepted_only else "")
rows = conn.execute(
f"""
SELECT t.tag, COUNT(*) AS count
FROM article_tags t
JOIN articles a ON a.id = t.article_id
JOIN article_scores s ON s.article_id = a.id
{where}
GROUP BY t.tag
"""
).fetchall()
return {r["tag"]: r["count"] for r in rows}
def category_counts(conn: sqlite3.Connection, accepted_only: bool = True) -> list[dict]:
"""Return per topic/flavor article counts for building browse UIs.
Joins articles and excludes duplicates so the counts match exactly what the
feed endpoint will actually return for each topic/flavor.
"""
clauses = ["a.duplicate_of IS NULL"]
clauses.append("s.accepted = 1" if accepted_only else "s.topic IS NOT NULL")
rows = conn.execute(
f"""
SELECT s.topic, s.flavor, COUNT(*) AS count
FROM article_scores s
JOIN articles a ON a.id = s.article_id
WHERE {" AND ".join(clauses)}
GROUP BY s.topic, s.flavor
ORDER BY s.topic, s.flavor
"""
).fetchall()
return [dict(row) for row in rows]
def available_dates(conn: sqlite3.Connection, limit: int = 30) -> list[str]:
rows = conn.execute(
"SELECT brief_date FROM daily_briefs ORDER BY brief_date DESC LIMIT ?",
(limit,),
).fetchall()
return [row["brief_date"] for row in rows]
def _latest_brief_date(conn: sqlite3.Connection) -> str | None:
row = conn.execute(
"SELECT brief_date FROM daily_briefs ORDER BY brief_date DESC LIMIT 1"
).fetchone()
return row["brief_date"] if row else None