"""Read-only query helpers over the goodNews database. Pure stdlib and framework-agnostic: returns plain dicts so the same functions back both the CLI and the JSON API. All article output is metadata + a link to the original source — never stored bodies. """ from __future__ import annotations import sqlite3 from datetime import UTC, datetime, timedelta from .feeds import MAX_BACKOFF_MINUTES from .localtime import local_now from .paywall import is_paywalled, is_paywalled_for_source # UA substrings that mark automated clients. Crawlers run JS on a throttled # budget and trip the boot-failure beacon routinely — without this filter they # read as real users seeing blank screens. BOT_UA_MARKS = ("headlesschrome", "bot", "spider", "crawl", "python", "curl", "wget", "phantomjs") _NOT_BOT_SQL = " AND ".join(f"instr(lower(user_agent), '{m}')=0" for m in BOT_UA_MARKS) # "Engaged reader" = a distinct visitor-day with DELIBERATE activity, as opposed to a raw # visit (which a JS-capable bot can trip). Counts the gesture-gated 'engaged' beacon OR a # genuine deliberate action. Deliberately EXCLUDES auto-fired/passive kinds (visit, # summary_viewed, open), replace_none, and game *_arrival (a share-loop landing, not engagement). _ENGAGED_GAMES = ("word", "wordsearch", "bloom", "match") ENGAGED_EVENT_KINDS = ( "engaged", "full_story", "source_click", "share_ub", "copy_source", "native_share", "replace_used", "paywall_replace", "paywalled_source_open", "not_today", "less_like_this", "hide_topic", *(f"{g}_{e}" for g in _ENGAGED_GAMES for e in ("started", "completed", "shared")), ) def is_bot_ua(ua: str | None) -> bool: low = (ua or "").lower() return any(m in low for m in BOT_UA_MARKS) # Composite ranking used everywhere a "best first" order is needed. Kept as one # expression so brief, category feeds, and the API all rank identically. RANK_SCORE_SQL = ( "(s.constructive_score + s.agency_score + s.human_benefit_score + src.trust_score " "- s.cortisol_score - s.ragebait_score - s.pr_risk_score)" ) _ARTICLE_COLUMNS = f""" a.id, a.title, a.description, a.canonical_url, a.published_at, a.image_url, a.source_id, src.name AS source_name, s.topic, s.flavor, s.accepted, s.constructive_score, s.cortisol_score, s.ragebait_score, s.agency_score, s.human_benefit_score, s.pr_risk_score, s.reason_code, s.reason_text, s.model_name, src.paywall_override AS paywall_override, src.image_policy AS image_policy, a.source_words, (SELECT group_concat(t.tag) FROM article_tags t WHERE t.article_id = a.id) AS tags, {RANK_SCORE_SQL} AS rank_score """ def paywalled_source_ids(conn: sqlite3.Connection) -> list[int]: """Source ids whose stories are paywalled — the domain rule (PAYWALL_DOMAINS), overridable per source in admin. Computed live so an admin flag takes effect at once. Small set (a handful of sources), so the lookup is cheap.""" rows = conn.execute( "SELECT id, homepage_url, feed_url, paywall_override FROM sources" ).fetchall() return [ r["id"] for r in rows if is_paywalled_for_source((r["homepage_url"] or r["feed_url"]), r["paywall_override"]) ] def feed( conn: sqlite3.Connection, topic: str | None = None, flavor: str | None = None, accepted_only: bool = True, limit: int = 30, offset: int = 0, include_topics: list[str] | None = None, include_flavors: list[str] | None = None, mute_topics: list[str] | None = None, mute_flavors: list[str] | None = None, max_cortisol: int | None = None, max_ragebait: int | None = None, tag: str | None = None, source_id: int | None = None, sort: str = "ranked", follow_sources: list[int] | None = None, follow_tags: list[str] | None = None, since: str | None = None, match: str | None = None, home_country: str | None = None, home_state: str | None = None, geo_scope: str | None = None, # 'near' | 'country' | 'world' relative to the reader's home include_paywalled: bool = False, # default: hide paywalled-source stories (no unreadable news in the feed) ) -> list[dict]: """Return articles with categorical filters applied in SQL. sort="ranked" (default) is best-first by the composite rank; sort="latest" is pure recency (newest first) for the chronological "Latest" feed. Both stay accepted-only and respect the same boundaries. Categorical filters (topic/flavor include & mute, cortisol/ragebait ceilings) must be applied here, not after ranking — otherwise low-ranked-but-matching items (e.g. 'discovery' for a Wonder lane) fall outside any over-fetch window. Word-boundary avoid-terms remain a Python pass on the caller side. """ clauses = ["a.duplicate_of IS NULL", "src.content_visible = 1"] params: list = [] # Full-text search: join the FTS index and MATCH first, so its bound param # leads and relevance can drive the ordering. All the boundary clauses below # still apply, so search mirrors exactly what the visitor feed would show. fts_join = "" if match: fts_join = "JOIN article_search ON article_search.article_id = a.id" clauses.append("article_search MATCH ?") params.append(match) # Hard-exclude paywalled sources (admin-overridable). Added after MATCH so the FTS # bound param keeps leading; the NOT IN list (a handful of ids) follows. if not include_paywalled: pwx = paywalled_source_ids(conn) if pwx: clauses.append("a.source_id NOT IN (%s)" % ",".join("?" * len(pwx))) params.extend(pwx) if accepted_only: clauses.append("s.accepted = 1") if topic: clauses.append("s.topic = ?") params.append(topic.lower()) if flavor: clauses.append("s.flavor = ?") params.append(flavor.lower()) def _in(column: str, values: list[str], negate: bool = False) -> None: vals = [v.lower() for v in values] placeholders = ",".join("?" * len(vals)) op = "NOT IN" if negate else "IN" # COALESCE keeps NULL-category rows from being dropped by NOT IN. clauses.append(f"COALESCE({column}, '') {op} ({placeholders})") params.extend(vals) if include_topics: _in("s.topic", include_topics) if include_flavors: _in("s.flavor", include_flavors) if mute_topics: _in("s.topic", mute_topics, negate=True) if mute_flavors: _in("s.flavor", mute_flavors, negate=True) if max_cortisol is not None: clauses.append("COALESCE(s.cortisol_score, 0) <= ?") params.append(max_cortisol) if max_ragebait is not None: clauses.append("COALESCE(s.ragebait_score, 0) <= ?") params.append(max_ragebait) if tag: clauses.append("EXISTS (SELECT 1 FROM article_tags at WHERE at.article_id = a.id AND at.tag = ?)") params.append(tag.lower()) if source_id: clauses.append("a.source_id = ?") params.append(source_id) if since: # "New since last visit": articles discovered after the reader's last visit. clauses.append("a.discovered_at > ?") params.append(since) # "Following" feed: articles from a followed source OR carrying a followed tag. # Passing either list (even empty) switches to following mode; no follows → none. if follow_sources is not None or follow_tags is not None: ors: list[str] = [] for sid in follow_sources or []: params.append(sid) if follow_sources: ors.append(f"a.source_id IN ({','.join('?' * len(follow_sources))})") ftags = [t.lower() for t in (follow_tags or [])] if ftags: ors.append( f"EXISTS (SELECT 1 FROM article_tags at WHERE at.article_id = a.id " f"AND at.tag IN ({','.join('?' * len(ftags))}))" ) params.extend(ftags) clauses.append("(" + " OR ".join(ors) + ")" if ors else "0") # Home-aware scoping for "Closer to Home" (server-side). Relative to the reader's # chosen home; geo_scope=None leaves the feed exactly as it is today. A STATE match # only counts when geo confidence is high/medium (don't surface "Near you" on a # shaky location). Untagged articles have no places, so they land in 'world' — never # lost while the backfill is still running. if geo_scope == "near": # Anything elevated as "Near you" / "Close to home" requires high/medium geo # confidence — the feature's promise is relevance, so don't surface shaky locals. if home_state and home_country: clauses.append( "g.confidence IN ('high','medium') AND EXISTS (SELECT 1 FROM article_places p " "WHERE p.article_id = a.id AND p.country_code = ? AND p.state_code = ?)") params.extend([home_country, home_state]) elif home_country: clauses.append( "g.confidence IN ('high','medium') AND EXISTS (SELECT 1 FROM article_places p " "WHERE p.article_id = a.id AND p.country_code = ?)") params.append(home_country) elif geo_scope == "country" and home_country: clauses.append("EXISTS (SELECT 1 FROM article_places p WHERE p.article_id = a.id AND p.country_code = ?)") params.append(home_country) if home_state: # "elsewhere in your country" excludes ONLY what actually went to "near" (a # high/medium-confidence home-state match). A low-confidence home-state story # isn't near, so it must still surface here, not vanish between tiers. clauses.append( "NOT (g.confidence IN ('high','medium') AND EXISTS (SELECT 1 FROM article_places p2 " "WHERE p2.article_id = a.id AND p2.country_code = ? AND p2.state_code = ?))") params.extend([home_country, home_state]) elif geo_scope == "world" and home_country: if home_state: # State mode: the "country" tier catches all home-country stories (incl. # low-confidence ones), so world is simply everything outside your country. clauses.append("NOT EXISTS (SELECT 1 FROM article_places p WHERE p.article_id = a.id AND p.country_code = ?)") params.append(home_country) else: # Country-only mode has no "country" tier, so a LOW-confidence home-country # story isn't "near" and must land here rather than vanish between tiers. clauses.append( "NOT (g.confidence IN ('high','medium') AND EXISTS (SELECT 1 FROM article_places p " "WHERE p.article_id = a.id AND p.country_code = ?))") params.append(home_country) where = "WHERE " + " AND ".join(clauses) params.extend([limit, offset]) if match: order_by = "bm25(article_search), COALESCE(a.published_at, a.discovered_at) DESC" # relevance, then recency elif sort == "latest": order_by = "COALESCE(a.published_at, a.discovered_at) DESC, rank_score DESC" else: order_by = "rank_score DESC, COALESCE(a.published_at, a.discovered_at) DESC" rows = conn.execute( f""" SELECT {_ARTICLE_COLUMNS}, g.breadth AS geo_breadth, g.confidence AS geo_confidence, (SELECT group_concat( p.country_code || CASE WHEN p.state_code IS NOT NULL THEN '-' || p.state_code ELSE '' END, ',') FROM article_places p WHERE p.article_id = a.id) AS geo_places FROM articles a JOIN sources src ON src.id = a.source_id JOIN article_scores s ON s.article_id = a.id LEFT JOIN article_geo g ON g.article_id = a.id {fts_join} {where} ORDER BY {order_by} LIMIT ? OFFSET ? """, params, ).fetchall() return [dict(row) for row in rows] def reindex_search(conn: sqlite3.Connection) -> int: """Rebuild the article_search FTS index from the accepted, non-duplicate corpus (title/description/source name/tags). A cheap full rebuild (a few thousand rows); run on each ingest cycle and lazily on first search. Live visibility / boundary filtering is applied at query time, so it doesn't need reindexing.""" conn.execute("DELETE FROM article_search") conn.execute( """ INSERT INTO article_search (article_id, title, body, source_name, tags) SELECT a.id, a.title, COALESCE(a.description, ''), src.name, COALESCE((SELECT group_concat(t.tag, ' ') FROM article_tags t WHERE t.article_id = a.id), '') FROM articles a JOIN sources src ON src.id = a.source_id JOIN article_scores s ON s.article_id = a.id WHERE s.accepted = 1 AND a.duplicate_of IS NULL """ ) conn.commit() return conn.execute("SELECT COUNT(*) FROM article_search").fetchone()[0] # Scope dial: the reader's "emotional radius". Each tier is a closest->widest lead # preference, not a hard filter; 'world' is the implicit final tier. State + region # are confidence-gated (high/medium) so a shaky location is never promoted as local. _STATE_SQL = ("(g.confidence IN ('high','medium') AND EXISTS (SELECT 1 FROM article_places p " "WHERE p.article_id = a.id AND p.country_code = ? AND p.state_code = ?))") _COUNTRY_SQL = "(EXISTS (SELECT 1 FROM article_places p WHERE p.article_id = a.id AND p.country_code = ?))" SCOPES = ("nearby", "region", "country", "world") def _region_sql(n: int) -> str: placeholders = ",".join("?" * n) return ("(g.confidence IN ('high','medium') AND EXISTS (SELECT 1 FROM article_places p " f"WHERE p.article_id = a.id AND p.country_code = ? AND p.state_code IN ({placeholders})))") def home_tiers(home_country: str, home_state: str | None, scope: str) -> list[tuple]: """Ordered [(section_key, predicate_sql, params)] closest->widest for a home + scope. Evaluated first-match (CASE WHEN / composed in order), so tiers needn't be SQL-exclusive. 'world' is implicit (everything not matched). 'region'/'nearby' need a US state; otherwise they gracefully fall back to country (country-only / non-US homes collapse to Country/World). """ from .geo import region_states rs = region_states(home_state) if home_state else [] tiers: list[tuple] = [] if scope == "nearby" and home_state: tiers.append(("state", _STATE_SQL, [home_country, home_state])) if rs: tiers.append(("region", _region_sql(len(rs)), [home_country, *rs])) tiers.append(("country", _COUNTRY_SQL, [home_country])) elif scope == "region" and home_state and rs: tiers.append(("region", _region_sql(len(rs)), [home_country, *rs])) # includes the state tiers.append(("country", _COUNTRY_SQL, [home_country])) else: # country scope, country-only / non-US home, or any fallback tiers.append(("country", _COUNTRY_SQL, [home_country])) return tiers def home_brief(conn: sqlite3.Connection, home_country: str, home_state: str | None = None, scope: str = "nearby", limit: int = 7, window_days: int = 3) -> list[dict]: """Scope-aware local-first landing highlights. Leads with the reader's chosen radius (state / region / country) then blends outward so the set is always full — "closest first", never three stale local stories. Prefers already-summarized stories so the calm read stays rich. Brief-shaped rows tagged with a concrete section key. """ tiers = home_tiers(home_country, home_state, scope) whens, params = [], [] for i, (_key, pred, ps) in enumerate(tiers): whens.append(f"WHEN {pred} THEN {i}") params += ps world_rank = len(tiers) section_case = ("CASE " + " ".join(whens) + f" ELSE {world_rank} END") if whens else "0" section_keys = [k for k, _, _ in tiers] + ["world"] rows = conn.execute( f""" SELECT {_ARTICLE_COLUMNS}, sm.summary AS summary, {section_case} AS section_rank, (sm.summary IS NOT NULL) AS has_summary FROM articles a JOIN sources src ON src.id = a.source_id JOIN article_scores s ON s.article_id = a.id LEFT JOIN article_geo g ON g.article_id = a.id LEFT JOIN article_summaries sm ON sm.article_id = a.id WHERE a.duplicate_of IS NULL AND src.content_visible = 1 AND s.accepted = 1 AND a.discovered_at >= datetime('now', ?) ORDER BY section_rank ASC, has_summary DESC, rank_score DESC, COALESCE(a.published_at, a.discovered_at) DESC LIMIT ? """, params + [f"-{window_days} days", limit], ).fetchall() out = [] for r in rows: d = dict(r) rank = d.pop("section_rank", world_rank) d["__section"] = section_keys[rank] if 0 <= rank < len(section_keys) else "world" out.append(d) return out def brief(conn: sqlite3.Connection, brief_date: str | None = None, limit: int = 10) -> dict: """Return a stored daily brief (latest if no date) with its ranked items.""" target_date = brief_date or _latest_brief_date(conn) if not target_date: return {"brief_date": None, "title": None, "items": []} header = conn.execute( "SELECT brief_date, title, created_at FROM daily_briefs WHERE brief_date = ?", (target_date,), ).fetchone() if not header: return {"brief_date": target_date, "title": None, "created_at": None, "items": []} pwx = paywalled_source_ids(conn) pw_clause = f" AND a.source_id NOT IN ({','.join('?' * len(pwx))})" if pwx else "" rows = conn.execute( f""" SELECT bi.rank, bi.selection_reason, {_ARTICLE_COLUMNS}, (SELECT summary FROM article_summaries WHERE article_id = a.id) AS summary FROM daily_briefs b JOIN daily_brief_items bi ON bi.brief_id = b.id JOIN articles a ON a.id = bi.article_id JOIN sources src ON src.id = a.source_id LEFT JOIN article_scores s ON s.article_id = a.id WHERE b.brief_date = ? AND src.content_visible = 1{pw_clause} ORDER BY bi.rank LIMIT ? """, (target_date, *pwx, limit), ).fetchall() return { "brief_date": header["brief_date"], "title": header["title"], "created_at": header["created_at"], "items": [dict(row) for row in rows], } def saved(conn: sqlite3.Connection, user_id: int, limit: int = 200) -> list[dict]: """Articles a user has saved, newest first (same shape as the feed).""" rows = conn.execute( f""" SELECT {_ARTICLE_COLUMNS} FROM saved_articles sv JOIN articles a ON a.id = sv.article_id JOIN sources src ON src.id = a.source_id LEFT JOIN article_scores s ON s.article_id = a.id WHERE sv.user_id = ? ORDER BY sv.saved_at DESC LIMIT ? """, (user_id, limit), ).fetchall() return [dict(row) for row in rows] def saved_ids(conn: sqlite3.Connection, user_id: int) -> list[int]: return [r[0] for r in conn.execute( "SELECT article_id FROM saved_articles WHERE user_id = ?", (user_id,) )] def history(conn: sqlite3.Connection, user_id: int, limit: int = 200) -> list[dict]: """Articles in a user's account history, most-recent first.""" rows = conn.execute( f""" SELECT {_ARTICLE_COLUMNS}, MAX(h.at) AS seen_at FROM user_history h JOIN articles a ON a.id = h.article_id JOIN sources src ON src.id = a.source_id LEFT JOIN article_scores s ON s.article_id = a.id WHERE h.user_id = ? GROUP BY a.id ORDER BY seen_at DESC LIMIT ? """, (user_id, limit), ).fetchall() return [dict(row) for row in rows] def content_stats(conn: sqlite3.Connection) -> dict: """Corpus / serving health for the dashboard: how much good news is live.""" def scalar(sql, params=()): return conn.execute(sql, params).fetchone()[0] or 0 served = scalar( "SELECT COUNT(*) FROM article_scores s JOIN articles a ON a.id=s.article_id " "WHERE s.accepted=1 AND a.duplicate_of IS NULL" ) accepted_7d = scalar( "SELECT COUNT(*) FROM article_scores s JOIN articles a ON a.id=s.article_id " "WHERE s.accepted=1 AND a.duplicate_of IS NULL " "AND COALESCE(a.published_at, a.discovered_at) >= datetime('now','-7 days')" ) brief = conn.execute( "SELECT brief_date, (SELECT COUNT(*) FROM daily_brief_items WHERE brief_id=daily_briefs.id) n " "FROM daily_briefs ORDER BY brief_date DESC LIMIT 1" ).fetchone() with_image = scalar( "SELECT COUNT(*) FROM article_scores s JOIN articles a ON a.id=s.article_id " "WHERE s.accepted=1 AND a.duplicate_of IS NULL AND a.image_url IS NOT NULL AND a.image_url!=''" ) # Coverage is scoped to LIVE articles (accepted, non-duplicate) so the # percentages can't drift past 100% as rejected/duplicate rows accrue summaries. summaries = scalar( "SELECT COUNT(*) FROM article_summaries m JOIN articles a ON a.id=m.article_id " "JOIN article_scores s ON s.article_id=a.id WHERE s.accepted=1 AND a.duplicate_of IS NULL" ) summaries_with_image = scalar( "SELECT COUNT(*) FROM article_summaries m JOIN articles a ON a.id=m.article_id " "JOIN article_scores s ON s.article_id=a.id WHERE s.accepted=1 AND a.duplicate_of IS NULL " "AND a.image_url IS NOT NULL AND a.image_url!=''" ) brief_with_image = 0 if brief: brief_with_image = scalar( "SELECT COUNT(*) FROM daily_brief_items bi JOIN articles a ON a.id=bi.article_id " "JOIN daily_briefs b ON b.id=bi.brief_id " "WHERE b.brief_date=? AND a.image_url IS NOT NULL AND a.image_url!=''", (brief["brief_date"],), ) return { "served": served, "total": scalar("SELECT COUNT(*) FROM articles"), "rejected": scalar("SELECT COUNT(*) FROM article_scores WHERE accepted=0"), "accepted_7d": accepted_7d, "added_24h": scalar("SELECT COUNT(*) FROM articles WHERE discovered_at >= datetime('now','-1 day')"), "added_7d": scalar("SELECT COUNT(*) FROM articles WHERE discovered_at >= datetime('now','-7 days')"), "summaries": summaries, "summaries_with_image": summaries_with_image, "with_image": with_image, # Accepted, imageless articles whose enrichment was tried recently and # came up empty (no usable og:image) — the image "misses" to watch. "recent_enrich_fail": scalar( "SELECT COUNT(*) FROM article_scores s JOIN articles a ON a.id=s.article_id " "WHERE s.accepted=1 AND a.duplicate_of IS NULL AND (a.image_url IS NULL OR a.image_url='') " "AND a.image_checked_at >= datetime('now','-1 day')" ), "active_sources": scalar("SELECT COUNT(*) FROM sources WHERE active=1"), "total_sources": scalar("SELECT COUNT(*) FROM sources"), "latest_brief_date": brief["brief_date"] if brief else None, "latest_brief_size": brief["n"] if brief else 0, "brief_with_image": brief_with_image, } def source_health(conn: sqlite3.Connection) -> list[dict]: """Per source (active, paused, AND retired), the data an operator needs to manage feeds: failure streak, last error/success/attempt, computed next poll, and the backing metrics (served/accepted/total counts + acceptance & duplicate rates) so Pause/Flag decisions aren't vibes-based. next_due_at = last attempt + MIN(cap, interval * (1 + consecutive_failures)), mirroring feeds.due_source_rows; NULL last attempt means "due now". Paused sources are included (their articles stay live; only polling stops). """ rows = conn.execute( """ SELECT s.id, s.name, s.feed_url, s.homepage_url, s.default_category AS category, s.active, s.status, s.content_visible, s.retry_after_at, s.consecutive_failures AS failures, s.review_flag, s.review_reason, s.paywall_override, s.image_policy, s.poll_interval_minutes AS interval_minutes, s.last_success_at, s.last_error_at, substr(s.last_error, 1, 160) AS last_error, (SELECT MAX(r.finished_at) FROM ingest_runs r WHERE r.source_id = s.id AND r.finished_at IS NOT NULL) AS last_attempt, (SELECT COUNT(*) FROM articles a WHERE a.source_id = s.id) AS total_articles, (SELECT COUNT(*) FROM articles a JOIN article_scores sc ON sc.article_id = a.id WHERE a.source_id = s.id AND sc.accepted = 1) AS accepted_total, (SELECT COUNT(*) FROM articles a JOIN article_scores sc ON sc.article_id = a.id WHERE a.source_id = s.id AND sc.reason_code = 'non_english') AS non_english, (SELECT COUNT(*) FROM articles a WHERE a.source_id = s.id AND a.duplicate_of IS NOT NULL) AS duplicates, (SELECT COUNT(*) FROM articles a JOIN article_scores sc ON sc.article_id = a.id WHERE a.source_id = s.id AND sc.accepted = 1 AND a.duplicate_of IS NULL) AS served, (SELECT COUNT(*) FROM articles a JOIN article_scores sc ON sc.article_id = a.id WHERE a.source_id = s.id AND sc.accepted = 1 AND a.duplicate_of IS NULL AND a.image_url IS NOT NULL AND a.image_url != '') AS images, datetime( (SELECT MAX(r.finished_at) FROM ingest_runs r WHERE r.source_id = s.id AND r.finished_at IS NOT NULL), '+' || MIN(?, s.poll_interval_minutes * (1 + s.consecutive_failures)) || ' minutes' ) AS next_due_at FROM sources s ORDER BY s.active DESC, s.consecutive_failures DESC, served DESC, s.name """, (MAX_BACKOFF_MINUTES,), ).fetchall() out = [] for r in rows: d = dict(r) total = d["total_articles"] or 0 accepted = d["accepted_total"] or 0 non_english = d.get("non_english") or 0 # Acceptance is judged over articles actually scored in English — non-English # items are HELD (awaiting translation), not calm-filter rejections, so they # don't drag a multilingual source's rate down. judged = total - non_english d["acceptance_rate"] = round(100 * accepted / judged) if judged else None d["non_english"] = non_english d["non_english_rate"] = round(100 * non_english / total) if total else None d["duplicate_rate"] = round(100 * d["duplicates"] / total) if total else None # Curation quality: of what this source got ACCEPTED, how much was a # duplicate of content already served (accepted_total − served = accepted dupes). d["accepted_dup_rate"] = round(100 * (accepted - d["served"]) / accepted) if accepted else None d["image_coverage"] = round(100 * (d["images"] or 0) / d["served"]) if d["served"] else None # Paywall is a domain-level hint + a per-source override; show the EFFECTIVE flag. d["paywalled"] = is_paywalled_for_source(d.get("homepage_url") or d.get("feed_url"), d.get("paywall_override")) # Match the REAL scheduler gate: due = the later of the streak-backoff time # and any retry_after_at rest (UTC strings sort chronologically). due_times = [t for t in (d["next_due_at"], d["retry_after_at"]) if t] d["next_due_at"] = max(due_times) if due_times else None out.append(d) return out # Attention thresholds — conservative + volume-gated, so the strip is a calm # operator nudge rather than a noisy scold. Items are aggregated (one line per # condition with a count), not one line per offending source. _STALE_DAYS = 10 _REJECT_MIN_INGESTED = 20 _REJECT_RATE = 25 # acceptance below this % (with enough volume) _DUP_MIN_ACCEPTED = 10 _DUP_RATE = 50 # accepted-duplicate above this % _IMG_MIN_SERVED = 10 _IMG_COVERAGE = 25 # per-source image coverage below this % _LONG_REST_HOURS = 12 def _attention(content: dict, sources: list[dict], feedback_unread: int, now: datetime | None = None) -> list[dict]: """The 'Attention Needed' strip: what an operator should look at, soft-toned (warn = act soon, info = worth a glance). Derived from the same data shown elsewhere, so it never disagrees with the detail sections.""" items: list[dict] = [] n = lambda c: "" if c == 1 else "s" # noqa: E731 — tiny pluralizer now = now or datetime.now(UTC) active = [s for s in sources if (s.get("status") or ("active" if s.get("active") else "paused")) == "active"] resting = [s for s in active if (s.get("failures") or 0) > 0] if resting: items.append({"level": "warn", "text": f"{len(resting)} source{n(len(resting))} backing off after failures"}) flagged = [s for s in sources if s.get("review_flag")] if flagged: items.append({"level": "warn", "text": f"{len(flagged)} source{n(len(flagged))} flagged for review"}) # Stale: active, visible feeds whose last success is well in the past. stale_cutoff = (now - timedelta(days=_STALE_DAYS)).strftime("%Y-%m-%d %H:%M:%S") stale = [s for s in active if s.get("content_visible", 1) and s.get("last_success_at") and s["last_success_at"] < stale_cutoff] if stale: items.append({"level": "warn", "text": f"{len(stale)} source{n(len(stale))} haven't updated in over {_STALE_DAYS} days"}) # High rejection: enough ingested volume, low acceptance. rejecting = [s for s in active if (s.get("total_articles") or 0) >= _REJECT_MIN_INGESTED and s.get("acceptance_rate") is not None and s["acceptance_rate"] < _REJECT_RATE] if rejecting: items.append({"level": "info", "text": f"{len(rejecting)} source{n(len(rejecting))} accepting under {_REJECT_RATE}% of submissions"}) # High accepted-duplicate: enough accepted volume, mostly echoing others. duping = [s for s in active if (s.get("accepted_total") or 0) >= _DUP_MIN_ACCEPTED and s.get("accepted_dup_rate") is not None and s["accepted_dup_rate"] > _DUP_RATE] if duping: items.append({"level": "info", "text": f"{len(duping)} source{n(len(duping))} mostly duplicating other feeds"}) # Low image coverage (info, not a warning). thin = [s for s in active if (s.get("served") or 0) >= _IMG_MIN_SERVED and s.get("image_coverage") is not None and s["image_coverage"] < _IMG_COVERAGE] if thin: items.append({"level": "info", "text": f"{len(thin)} source{n(len(thin))} with thin image coverage (under {_IMG_COVERAGE}%)"}) # Long rate-limit rest (info). rest_cutoff = (now + timedelta(hours=_LONG_REST_HOURS)).strftime("%Y-%m-%d %H:%M:%S") long_rest = [s for s in active if s.get("retry_after_at") and s["retry_after_at"] > rest_cutoff] if long_rest: items.append({"level": "info", "text": f"{len(long_rest)} source{n(len(long_rest))} rate-limited for {_LONG_REST_HOURS}h+"}) # Site-wide signals. served = content.get("served") or 0 with_image = content.get("with_image") or 0 if served and (with_image / served) < 0.70: items.append({"level": "info", "text": f"Image coverage at {round(100 * with_image / served)}% (aim for 70%+)"}) brief_size = content.get("latest_brief_size") or 0 if brief_size < 7: items.append({"level": "info", "text": f"Today's brief has only {brief_size} item{n(brief_size)}"}) if feedback_unread: items.append({"level": "info", "text": f"{feedback_unread} unread feedback message{n(feedback_unread)}"}) return items # --- Source article inspector: the real articles behind the source metrics ----- _SRC_ART_FILTERS = { "accepted": "AND s.accepted = 1", # 'rejected' = calm-filter rejections only; non-English is HELD, its own bucket. "rejected": "AND s.accepted = 0 AND COALESCE(s.reason_code,'') != 'non_english'", "held": "AND s.reason_code = 'non_english'", "no_image": "AND (a.image_url IS NULL OR a.image_url = '')", "duplicates": "AND a.duplicate_of IS NOT NULL", } def source_articles(conn: sqlite3.Connection, source_id: int, filter: str = "all", limit: int = 25, offset: int = 0) -> list[dict]: """The actual ingested articles for a source, newest first — so admins can verify the metric (paywall/image/acceptance) against real evidence.""" ov = conn.execute("SELECT paywall_override FROM sources WHERE id = ?", (source_id,)).fetchone() override = ov["paywall_override"] if ov else None where = _SRC_ART_FILTERS.get(filter, "") rows = conn.execute( f""" SELECT a.id, a.title, a.canonical_url, a.published_at, a.discovered_at, a.image_url, a.duplicate_of, s.accepted, s.reason_code, s.reason_text, s.topic, s.flavor FROM articles a LEFT JOIN article_scores s ON s.article_id = a.id WHERE a.source_id = ? {where} ORDER BY COALESCE(a.published_at, a.discovered_at) DESC LIMIT ? OFFSET ? """, (source_id, limit, offset), ).fetchall() return [ { "id": r["id"], "title": r["title"], "url": r["canonical_url"], "published_at": r["published_at"] or r["discovered_at"], "accepted": r["accepted"], "reason": r["reason_text"] or r["reason_code"], # the "why" behind accept/reject "held": r["reason_code"] == "non_english", # held for language, not rejected "topic": r["topic"], "flavor": r["flavor"], "paywalled": is_paywalled_for_source(r["canonical_url"], override), # effective (domain rule + override) "has_image": bool(r["image_url"]), "duplicate": r["duplicate_of"] is not None, } for r in rows ] def source_articles_summary(conn: sqlite3.Connection, source_id: int) -> dict: """Counts behind the table metrics + the source-level paywall rule, so the panel header reads e.g. '120 · 96 accepted · 24 rejected · 3 no image · paywall: ON'.""" agg = conn.execute( """ SELECT COUNT(*) total, COALESCE(SUM(s.accepted = 1), 0) accepted, COALESCE(SUM(s.accepted = 0 AND COALESCE(s.reason_code,'') != 'non_english'), 0) rejected, COALESCE(SUM(s.reason_code = 'non_english'), 0) non_english, COALESCE(SUM(a.image_url IS NULL OR a.image_url = ''), 0) no_image, COALESCE(SUM(a.duplicate_of IS NOT NULL), 0) duplicates FROM articles a LEFT JOIN article_scores s ON s.article_id = a.id WHERE a.source_id = ? """, (source_id,), ).fetchone() srow = conn.execute("SELECT homepage_url, feed_url, paywall_override FROM sources WHERE id = ?", (source_id,)).fetchone() override = srow["paywall_override"] if srow else None url = (srow["homepage_url"] or srow["feed_url"]) if srow else None return { "total": agg["total"], "accepted": agg["accepted"], "rejected": agg["rejected"], "non_english": agg["non_english"], # held for language (not a calm-filter rejection) "no_image": agg["no_image"], "duplicates": agg["duplicates"], "paywalled": is_paywalled_for_source(url, override), # effective "paywall_domain": is_paywalled(url), # what the domain rule alone says "paywall_override": override, # null | 'free' | 'paywalled' — the basis } def admin_stats(conn: sqlite3.Connection, days: int = 30) -> dict: """Aggregate, non-personal usage stats for the admin dashboard.""" since = f"-{days} days" # "Today" for timestamp-based counters is the SITE-LOCAL day (GOODNEWS_TZ), not # UTC: otherwise an evening error (e.g. 22:53 local) lands on the next UTC day and # reads as a fresh "today" the following morning — the exact false-alarm we hit. local_day_start = (local_now().replace(hour=0, minute=0, second=0, microsecond=0) .astimezone(UTC).strftime("%Y-%m-%d %H:%M:%S")) def scalar(sql, params=()): return conn.execute(sql, params).fetchone()[0] or 0 eng_ph = ",".join("?" * len(ENGAGED_EVENT_KINDS)) visitors = { # Recorded visits — the raw/noisy count (one daily 'visit' beacon per device). "today": scalar("SELECT COUNT(DISTINCT visitor_hash) FROM events " "WHERE kind='visit' AND visitor_hash!='' AND day=date('now')"), "d7": scalar("SELECT COUNT(DISTINCT visitor_hash) FROM events " "WHERE kind='visit' AND visitor_hash!='' AND day>=date('now','-7 days')"), "d30": scalar("SELECT COUNT(DISTINCT visitor_hash) FROM events " "WHERE kind='visit' AND visitor_hash!='' AND day>=date('now',?)", (since,)), # Engaged readers — distinct visitor-day with deliberate activity (the honest number). "engaged_today": scalar(f"SELECT COUNT(DISTINCT visitor_hash) FROM events " f"WHERE kind IN ({eng_ph}) AND visitor_hash!='' AND day=date('now')", ENGAGED_EVENT_KINDS), "engaged_d7": scalar(f"SELECT COUNT(DISTINCT visitor_hash) FROM events " f"WHERE kind IN ({eng_ph}) AND visitor_hash!='' AND day>=date('now','-7 days')", ENGAGED_EVENT_KINDS), "engaged_d30": scalar(f"SELECT COUNT(DISTINCT visitor_hash) FROM events " f"WHERE kind IN ({eng_ph}) AND visitor_hash!='' AND day>=date('now',?)", (*ENGAGED_EVENT_KINDS, since)), } # Returning (seen on ≥2 distinct days) vs one-and-done, over the window. rows = conn.execute( "SELECT CASE WHEN d>=2 THEN 'returning' ELSE 'once' END g, COUNT(*) n FROM (" " SELECT visitor_hash, COUNT(DISTINCT day) d FROM events " " WHERE kind='visit' AND visitor_hash!='' AND day>=date('now',?) GROUP BY visitor_hash" ") GROUP BY g", (since,), ).fetchall() loyalty = {r["g"]: r["n"] for r in rows} # An "open" = opening the article via our summary page (legacy 'open' + 'summary_viewed'). OPEN = "e.kind IN ('open','summary_viewed')" top_articles = [dict(r) for r in conn.execute( f"SELECT e.article_id AS id, a.title, src.name AS source, COUNT(*) AS opens " f"FROM events e JOIN articles a ON a.id=e.article_id JOIN sources src ON src.id=a.source_id " f"WHERE {OPEN} AND e.article_id>0 AND e.day>=date('now',?) " f"GROUP BY e.article_id ORDER BY opens DESC LIMIT 12", (since,), )] top_groupings = [dict(r) for r in conn.execute( f"SELECT t.tag, COUNT(*) AS opens FROM events e JOIN article_tags t ON t.article_id=e.article_id " f"WHERE {OPEN} AND e.day>=date('now',?) GROUP BY t.tag ORDER BY opens DESC LIMIT 12", (since,), )] top_topics = [dict(r) for r in conn.execute( f"SELECT s.topic, COUNT(*) AS opens FROM events e JOIN article_scores s ON s.article_id=e.article_id " f"WHERE {OPEN} AND s.topic IS NOT NULL AND e.day>=date('now',?) " f"GROUP BY s.topic ORDER BY opens DESC", (since,), )] # Counts per event kind over the window (each = distinct visitor-days, by dedup). kc_rows = conn.execute( "SELECT kind, COUNT(*) n FROM events WHERE day>=date('now',?) GROUP BY kind", (since,) ).fetchall() kc = {r["kind"]: r["n"] for r in kc_rows} shares = {k: kc.get(k, 0) for k in ("share_ub", "copy_source", "native_share", "source_click")} summary_views = kc.get("summary_viewed", 0) source_clicks = kc.get("source_click", 0) funnel = { "summary_viewed": summary_views, "source_click": source_clicks, "full_story": kc.get("full_story", 0), "source_rate": round(100 * source_clicks / summary_views) if summary_views else 0, } emotional_mix = { "not_today": kc.get("not_today", 0), "less_like_this": kc.get("less_like_this", 0), "hide_topic": kc.get("hide_topic", 0), } paywall = { "paywall_replace": kc.get("paywall_replace", 0), "paywalled_source_open": kc.get("paywalled_source_open", 0), } replace = {"used": kc.get("replace_used", 0), "none": kc.get("replace_none", 0)} # Game funnel — the growth loop we're instrumenting. Each count is distinct # visitor-days (events dedupe per kind/day), so it reads as "people", not actions. _GAME_NAMES = ("word", "wordsearch", "bloom", "match") _GAME_EVENTS = ("arrival", "started", "completed", "shared") # arrival = share-loop acquisition games_by = { g: {e: kc.get(f"{g}_{e}", 0) for e in _GAME_EVENTS} for g in _GAME_NAMES } games = { "by_game": games_by, "totals": {e: sum(games_by[g][e] for g in _GAME_NAMES) for e in _GAME_EVENTS}, } # Accounts — aggregate counts only (no emails, no per-user listing). accounts = { "total": scalar("SELECT COUNT(*) FROM users"), "new_today": scalar("SELECT COUNT(*) FROM users WHERE date(created_at)=date('now')"), "new_7d": scalar("SELECT COUNT(*) FROM users WHERE created_at>=date('now','-7 days')"), "new_30d": scalar("SELECT COUNT(*) FROM users WHERE created_at>=date('now',?)", (since,)), "active_7d": scalar("SELECT COUNT(DISTINCT user_id) FROM sessions WHERE last_seen_at>=date('now','-7 days')"), } # Returning-visitor buckets by distinct active days in the window. bucket_rows = conn.execute( "SELECT CASE WHEN d>=8 THEN '8+' WHEN d>=4 THEN '4-7' WHEN d>=2 THEN '2-3' ELSE 'new' END b, " "COUNT(*) n FROM (SELECT visitor_hash, COUNT(DISTINCT day) d FROM events " "WHERE kind='visit' AND visitor_hash!='' AND day>=date('now',?) GROUP BY visitor_hash) GROUP BY b", (since,), ).fetchall() buckets = {r["b"]: r["n"] for r in bucket_rows} retention = {k: buckets.get(k, 0) for k in ("new", "2-3", "4-7", "8+")} daily = [dict(r) for r in conn.execute( "SELECT day, SUM(kind IN ('open','summary_viewed')) AS opens, SUM(kind='visit') AS visits " "FROM events WHERE day>=date('now',?) GROUP BY day ORDER BY day", (since,), )] content = content_stats(conn) sources = source_health(conn) feedback_7d = scalar("SELECT COUNT(*) FROM feedback WHERE created_at >= datetime('now','-7 days')") feedback_unread = scalar("SELECT COUNT(*) FROM feedback WHERE read_at IS NULL") return { "days": days, "content": content, "sources": sources, "attention": _attention(content, sources, feedback_unread), "feedback_7d": feedback_7d, "feedback_unread": feedback_unread, "visitors": visitors, "returning": loyalty.get("returning", 0), "once": loyalty.get("once", 0), "retention": retention, "accounts": accounts, "funnel": funnel, "emotional_mix": emotional_mix, "paywall": paywall, "replace": replace, "games": games, "top_articles": top_articles, "top_groupings": top_groupings, "top_topics": top_topics, "shares": shares, "daily": daily, # Boot-failure seatbelt signal — blank-screen risk surfacing. Bots are # excluded from the headline counts: throttled crawlers fail the boot # check routinely and would read as real users seeing blank screens. "client_errors": { "today": scalar( f"SELECT COUNT(*) FROM client_errors WHERE created_at >= ? AND {_NOT_BOT_SQL}", (local_day_start,), ), "window": scalar( f"SELECT COUNT(*) FROM client_errors WHERE created_at>=date('now',?) AND {_NOT_BOT_SQL}", (since,), ), # Drives the headline now: how many still need a look (clears as you mark them read). "unread": scalar( f"SELECT COUNT(*) FROM client_errors WHERE read_at IS NULL AND {_NOT_BOT_SQL}", ), }, } def existing_article_ids(conn: sqlite3.Connection, ids: list[int]) -> list[int]: """Filter to ids that still exist (FK-safe inserts for save/history/import).""" clean = list({int(i) for i in ids})[:1000] if not clean: return [] placeholders = ",".join("?" * len(clean)) return [r[0] for r in conn.execute( f"SELECT id FROM articles WHERE id IN ({placeholders})", clean )] def tag_counts(conn: sqlite3.Connection, accepted_only: bool = True) -> dict: """How many shown (accepted, non-duplicate) articles carry each grouping tag.""" where = "WHERE a.duplicate_of IS NULL" + (" AND s.accepted = 1" if accepted_only else "") rows = conn.execute( f""" SELECT t.tag, COUNT(*) AS count FROM article_tags t JOIN articles a ON a.id = t.article_id JOIN article_scores s ON s.article_id = a.id {where} GROUP BY t.tag """ ).fetchall() return {r["tag"]: r["count"] for r in rows} def category_counts(conn: sqlite3.Connection, accepted_only: bool = True) -> list[dict]: """Return per topic/flavor article counts for building browse UIs. Joins articles and excludes duplicates so the counts match exactly what the feed endpoint will actually return for each topic/flavor. """ clauses = ["a.duplicate_of IS NULL"] clauses.append("s.accepted = 1" if accepted_only else "s.topic IS NOT NULL") rows = conn.execute( f""" SELECT s.topic, s.flavor, COUNT(*) AS count FROM article_scores s JOIN articles a ON a.id = s.article_id WHERE {" AND ".join(clauses)} GROUP BY s.topic, s.flavor ORDER BY s.topic, s.flavor """ ).fetchall() return [dict(row) for row in rows] def available_dates(conn: sqlite3.Connection, limit: int = 30) -> list[str]: rows = conn.execute( "SELECT brief_date FROM daily_briefs ORDER BY brief_date DESC LIMIT ?", (limit,), ).fetchall() return [row["brief_date"] for row in rows] def _latest_brief_date(conn: sqlite3.Connection) -> str | None: row = conn.execute( "SELECT brief_date FROM daily_briefs ORDER BY brief_date DESC LIMIT 1" ).fetchone() return row["brief_date"] if row else None