Sync repo to deployed state: SEO recovery, Publishing Desk, Play games, emoji picker

The deploy pipeline runs from the working tree, so a wave of shipped features
had never been committed. This snapshots git to what's actually running.

SEO impression recovery (live + verified):
- Duplicate /a/{id} now 301-redirect to their canonical twin instead of 404
  (a hard 404 silently dropped already-indexed URLs and tanked impressions).
- Dedup representative selection reworked: accepted/serveable -> established
  rep (URL stability) -> quality score, so an accepted page never retires to a
  rejected rep and an indexed canonical doesn't churn when a newer twin arrives.
- HEAD /a/{id} returns the same status as GET (api_route GET+HEAD) instead of
  falling through to the static mount and 404ing.
- `dedup --force-recluster`: cycle-locked, model-free re-cluster to re-apply the
  policy to the existing corpus (shared cycle_lock context manager).
- CLI honors GOODNEWS_DB for its default --db (was silently ignored).

Publishing Desk (admin tool to post highlights to X via Web Intents):
- publishing.py queue/rank/handle-resolution; admin UI; full searchable emoji
  picker (bundled data, no CDN) for the blurb editor.

Play games + site:
- Bloom (word-wheel), Memory Match, daily ritual set, Zen Den (dev-gated).
- English-only language gate; source prospecting; paywall + dedup hardening.

Tests: full suite green (349). Ignores tightened (node_modules, data/*.db).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
jay
2026-06-18 11:32:27 -04:00
parent 2dbe73430c
commit 89c0fbe1f6
66 changed files with 6138 additions and 109 deletions
+352 -17
View File
@@ -18,10 +18,13 @@ import hashlib
import hmac
import io
import json
import logging
import os
import re
import secrets
import sqlite3
import threading
import time
from collections import Counter
from contextlib import contextmanager
from datetime import datetime, timezone
@@ -33,7 +36,7 @@ from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from . import auth, email_send, feeds, games, oauth_google, queries, share, sources, summarize
from . import auth, bloom, email_send, feeds, games, oauth_google, publishing, queries, share, sources, summarize
from .localtime import local_today
from .markup import reply_html_to_text, sanitize_reply_html
from .db import connect
@@ -55,6 +58,8 @@ _EDGE_DERIVED = "public, max-age=0, s-maxage=120, stale-while-revalidate=120"
_EDGE_FEED = "public, max-age=0, s-maxage=45, stale-while-revalidate=30" # global feed (URL-keyed, shareable only)
_PRIVATE = "private, no-store" # never share across users
log = logging.getLogger("goodnews.api")
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_DB = ROOT / "data" / "goodnews.sqlite3"
# Prefer the built SvelteKit site; fall back to the legacy single-page harness.
@@ -147,6 +152,32 @@ def _user_out(user: sqlite3.Row) -> dict:
# scrapers don't each kick off a duplicate LLM call.
_summarizing: set[int] = set()
# In-process cache of fully-rendered /a/{id} share pages. We're direct-origin (no
# CDN), so Cache-Control alone can't shield the box from crawler bursts hitting the
# sitemap's article URLs while the LAN LLM / cycle is loading it. Only COMPLETE
# pages (summary + explanation present) are cached, so a "still generating" page is
# never pinned; a short TTL still picks up edits. Per-process (fine across workers).
# INVARIANT: the share page is PUBLIC/anonymous — the cache key is article_id alone.
# If /a/{id} ever personalizes (per-viewer content), key by viewer or drop the cache,
# or one visitor's variant would be served to another.
_SHARE_CACHE: dict[int, tuple[float, str]] = {}
_SHARE_TTL = 900.0 # 15 min
_SHARE_CACHE_MAX = 512
def _share_cache_get(aid: int) -> str | None:
hit = _SHARE_CACHE.get(aid)
if hit and (time.monotonic() - hit[0]) < _SHARE_TTL:
return hit[1]
return None
def _share_cache_put(aid: int, html: str) -> None:
if len(_SHARE_CACHE) >= _SHARE_CACHE_MAX:
oldest = min(_SHARE_CACHE, key=lambda k: _SHARE_CACHE[k][0])
_SHARE_CACHE.pop(oldest, None)
_SHARE_CACHE[aid] = (time.monotonic(), html)
def _run_summary(article_id: int) -> None:
try:
@@ -158,6 +189,29 @@ def _run_summary(article_id: int) -> None:
_summarizing.discard(article_id)
# Publishing Desk: the "Build queue" job runs in the background (one bounded
# comparative LLM call can be slow); the admin polls the queue endpoint. Mirrors the
# summary-kick pattern — never holds an HTTP request open on the model. The lock makes
# the check-and-set atomic so two rapid clicks can't launch two expensive jobs.
_publish_build: dict = {"building": False, "result": None, "error": None}
_publish_build_lock = threading.Lock()
def _run_publish_build() -> None:
try:
try:
client = LocalModelClient.from_env()
except Exception: # noqa: BLE001 — model down → deterministic fallback inside build_queue
client = None
with get_conn() as conn:
res = publishing.build_queue(conn, PUBLIC_BASE_URL, client=client)
_publish_build.update(result=res, error=None)
except Exception as exc: # noqa: BLE001 — surface, don't crash the worker
_publish_build.update(error=str(exc)[:300])
finally:
_publish_build["building"] = False
def _kick_summary(article_id: int, background_tasks: BackgroundTasks) -> None:
if article_id in _summarizing:
return
@@ -332,7 +386,7 @@ class SourcePreview(BaseModel):
sampled: int
classified: bool
accepted: int
acceptance_rate: float
acceptance_rate: float | None # None when there are no English items to judge (all held)
avg_cortisol: float
avg_ragebait: float
avg_pr_risk: float
@@ -357,6 +411,54 @@ class GameStateBody(BaseModel):
state: dict = {}
class PublishStatusBody(BaseModel):
status: str
draft_text: str | None = None
final_text: str | None = None
post_url: str | None = None
snooze_until: str | None = None
class PublishDraftBody(BaseModel):
draft_text: str = ""
class EntityHandleBody(BaseModel):
entity_name: str
handle: str
profile_url: str | None = None
class GameStateItem(BaseModel):
game: str
variant: str
state: dict = {}
class GameStateBatchBody(BaseModel):
date: str
items: list[GameStateItem] = []
class BloomReportBody(BaseModel):
word: str = ""
date: str | None = None
mode: str | None = None
format: str | None = None
letters: str | None = None
reason: str | None = None
class BloomOverrideBody(BaseModel):
word: str = ""
action: str = "allow" # 'allow' | 'block'
reason: str | None = None
class BloomReportActionBody(BaseModel):
action: str = "" # 'approve' | 'block' | 'dismiss'
class WordPoolBody(BaseModel):
word: str
@@ -495,6 +597,13 @@ _EVENT_KINDS = {
}
def _fts_query(q: str) -> str:
"""Raw search box → safe FTS5 query: alnum terms only (no operator/quote
injection), each prefix-matched and AND'd together. '' when nothing usable."""
terms = re.findall(r"[A-Za-z0-9]+", q or "")[:8]
return " ".join(f"{t}*" for t in terms)
def _visitor_hash(token: str | None) -> str:
token = (token or "").strip()[:200]
if not token:
@@ -660,22 +769,38 @@ def create_app() -> FastAPI:
state: str | None = None,
error: str | None = None,
) -> RedirectResponse:
fail = RedirectResponse(f"{PUBLIC_BASE_URL}/auth/verify?error=google", status_code=302)
if error or not code or not state:
return fail
# The user always sees the same generic error=google (no detail leaked),
# but we log WHY internally so device/host-specific failures (e.g. a www
# vs apex cookie loss, a state mismatch, a token-exchange error) are
# diagnosable instead of all looking identical.
def fail(reason: str, exc: Exception | None = None) -> RedirectResponse:
host = request.headers.get("Host", "?")
if exc is not None:
log.warning("google callback failed: %s (host=%s): %s", reason, host, exc)
else:
log.warning("google callback failed: %s (host=%s)", reason, host)
return RedirectResponse(f"{PUBLIC_BASE_URL}/auth/verify?error=google", status_code=302)
if error:
return fail(f"provider_error:{error}")
if not code or not state:
return fail("missing_code_or_state")
saved = _unsign(request.cookies.get(OAUTH_COOKIE))
if not saved:
return fail
# Most likely the host-only ub_oauth cookie was set on a different
# host than this callback (www vs apex). Canonicalizing www→apex at
# the edge prevents this.
return fail("missing_oauth_cookie")
saved_state, _, verifier = saved.partition(":")
if not hmac.compare_digest(saved_state, state):
return fail
return fail("state_mismatch")
try:
tokens = oauth_google.exchange_code(code, _google_redirect_uri(), verifier)
info = oauth_google.verify_id_token(tokens["id_token"])
if not info.get("picture") and tokens.get("access_token"):
info["picture"] = oauth_google.fetch_userinfo(tokens["access_token"]).get("picture")
except Exception:
return fail
except Exception as exc: # noqa: BLE001 — log reason, show generic error
return fail("token_exchange_or_verify", exc)
with get_conn() as conn:
user_id = auth.find_or_create_user(
conn, info["email"], "google", info["sub"],
@@ -925,13 +1050,19 @@ def create_app() -> FastAPI:
# --- Public share/landing page for an article -------------------------
@app.get("/a/{article_id}", response_class=HTMLResponse)
# GET + HEAD: FastAPI's @app.get registers GET only (no auto-HEAD), so a HEAD would
# fall through to the catch-all StaticFiles mount at "/" and 404. Register both so
# HEAD returns the same status (200/301/404) as GET, sans body.
@app.api_route("/a/{article_id}", methods=["GET", "HEAD"], response_class=HTMLResponse)
def share_page(article_id: str, background_tasks: BackgroundTasks) -> HTMLResponse:
not_found = HTMLResponse(share.render_not_found(PUBLIC_BASE_URL), status_code=404)
try:
aid = int(article_id)
except (TypeError, ValueError):
return not_found # malformed id → calm 404, no stack trace
cached = _share_cache_get(aid)
if cached is not None: # serve a rendered page without touching SQLite/render
return HTMLResponse(cached, headers={"Cache-Control": "public, max-age=300"})
with get_conn() as conn:
row = conn.execute(
"SELECT a.id, a.title, a.description, a.image_url, a.canonical_url, "
@@ -941,16 +1072,45 @@ def create_app() -> FastAPI:
"LEFT JOIN article_scores s ON s.article_id = a.id WHERE a.id = ?",
(aid,),
).fetchone()
# Only render real, accepted, non-duplicate stories.
if not row or row["duplicate_of"] is not None or not row["accepted"]:
if not row:
return not_found
# A duplicate's URL may already be indexed by Google. A hard 404 silently
# drops it (and any newer twin that arrives later retires the OLDER, already
# indexed URL) — that's what tanked impressions. So 301 to the canonical twin
# instead: Google consolidates the page onto the survivor. dedup stores a star
# (dup -> rep, rep.duplicate_of IS NULL); we still follow a short chain with a
# cycle guard as cheap insurance.
if row["duplicate_of"] is not None:
seen, cur, target = {aid}, row["duplicate_of"], None
for _ in range(8):
if cur in seen:
break
seen.add(cur)
r2 = conn.execute(
"SELECT a.id, a.duplicate_of, s.accepted FROM articles a "
"LEFT JOIN article_scores s ON s.article_id = a.id WHERE a.id = ?",
(cur,),
).fetchone()
if not r2:
break
if r2["duplicate_of"] is None:
target = r2 if r2["accepted"] else None
break
cur = r2["duplicate_of"]
if target is not None:
return RedirectResponse(f"/a/{target['id']}", status_code=301)
return not_found # canonical itself is gone/rejected → genuinely 404
if not row["accepted"]:
return not_found
summary = summarize.get_summary(conn, aid)
explanation = summarize.get_explanation(conn, aid)
if not summary or not explanation:
complete = bool(summary and explanation)
if not complete:
_kick_summary(aid, background_tasks) # generate/top-up for next time; page polls
return HTMLResponse(
share.render_share_page(dict(row), PUBLIC_BASE_URL, summary=summary, explanation=explanation)
)
html = share.render_share_page(dict(row), PUBLIC_BASE_URL, summary=summary, explanation=explanation)
if complete:
_share_cache_put(aid, html) # cache only the finished page (never the "generating" state)
return HTMLResponse(html, headers={"Cache-Control": "public, max-age=300" if complete else "no-cache"})
# --- Privacy-respecting first-party analytics -------------------------
@@ -1305,6 +1465,76 @@ def create_app() -> FastAPI:
cand = conn.execute("SELECT * FROM source_candidates WHERE id = ?", (cid,)).fetchone()
return _candidate_dict(cand)
@app.post("/api/admin/candidates/{cid}/restore")
def admin_candidate_restore(cid: int, request: Request) -> dict:
# Send a rejected candidate back to staging for another look.
with get_conn() as conn:
_require_admin(conn, request)
if not sources.restore_candidate(conn, cid):
raise HTTPException(status_code=404, detail="no rejected candidate with that id")
cand = conn.execute("SELECT * FROM source_candidates WHERE id = ?", (cid,)).fetchone()
return _candidate_dict(cand)
# --- Publishing Desk (admin): outbound-share queue for X (platform-neutral) ---
@app.post("/api/admin/publishing/build")
def admin_publishing_build(request: Request, background_tasks: BackgroundTasks) -> dict:
# Kick the queue build in the background (the comparative LLM call can be slow);
# the client polls /queue. No-op if a build is already running.
with get_conn() as conn:
_require_admin(conn, request)
with _publish_build_lock: # atomic check-and-set: one job at a time
if not _publish_build["building"]:
_publish_build.update(building=True, result=None, error=None)
background_tasks.add_task(_run_publish_build)
return {"building": True}
@app.get("/api/admin/publishing/queue")
def admin_publishing_queue(request: Request, archived: bool = False) -> dict:
with get_conn() as conn:
_require_admin(conn, request)
items = publishing.list_queue(conn, include_archived=archived)
return {"building": _publish_build["building"], "last": _publish_build.get("result"),
"error": _publish_build.get("error"), "items": items}
@app.post("/api/admin/publishing/{sid}/status")
def admin_publishing_status(sid: int, body: PublishStatusBody, request: Request) -> dict:
with get_conn() as conn:
_require_admin(conn, request)
ok = publishing.set_status(conn, sid, body.status, draft_text=body.draft_text,
final_text=body.final_text, post_url=body.post_url,
snooze_until=body.snooze_until)
if not ok:
raise HTTPException(status_code=400, detail="bad status or id")
return {"ok": True}
@app.post("/api/admin/publishing/{sid}/draft")
def admin_publishing_draft(sid: int, body: PublishDraftBody, request: Request) -> dict:
with get_conn() as conn:
_require_admin(conn, request)
ok = publishing.save_draft(conn, sid, body.draft_text)
if not ok:
raise HTTPException(status_code=404, detail="no such share")
return {"ok": True}
@app.post("/api/admin/publishing/{sid}/restore")
def admin_publishing_restore(sid: int, request: Request) -> dict:
with get_conn() as conn:
_require_admin(conn, request)
ok = publishing.restore(conn, sid)
if not ok:
raise HTTPException(status_code=400, detail="not a restorable (skipped/snoozed) share")
return {"ok": True}
@app.post("/api/admin/publishing/handles")
def admin_publishing_add_handle(body: EntityHandleBody, request: Request) -> dict:
# Save a verified handle (e.g. after confirming one via 'Find on X').
with get_conn() as conn:
_require_admin(conn, request)
ok = publishing.add_entity_handle(conn, body.entity_name, body.handle, body.profile_url)
if not ok:
raise HTTPException(status_code=400, detail="bad entity or handle")
return {"ok": True}
# --- CSV exports (admin-gated, for inspection / archiving) ---------------
def _csv_cell(v):
@@ -1593,6 +1823,32 @@ def create_app() -> FastAPI:
items=[Article.from_row(r) for r in rows],
)
@app.get("/api/search", response_model=FeedResponse)
def search(response: Response, q: str = Query("", max_length=120),
prefs: str | None = Query(None), limit: int = Query(30, ge=1, le=60),
offset: int = Query(0, ge=0)) -> FeedResponse:
# Public article search across the visitor-facing corpus. Mirrors the feed's
# boundaries (accepted/visible/non-duplicate + the reader's Calm Filters /
# avoid-terms) but NOT a lane scope — you searched on purpose. Ranked by
# relevance (bm25), recency as a tie-break. Per-reader → never edge-cached.
response.headers["Cache-Control"] = _PRIVATE
fts = _fts_query(q)
if not fts:
return FeedResponse(topic=None, flavor=None, count=0, items=[])
fp = prefs_from_json(prefs)
now = datetime.now(timezone.utc)
kw = _prefs_sql_kw(fp, now)
with get_conn() as conn:
if not conn.execute("SELECT 1 FROM article_search LIMIT 1").fetchone():
queries.reindex_search(conn) # lazy build (fresh deploy / before first cycle)
fetch_n = min(2000, (offset + limit) * 4 + 40) if fp.avoid_terms else (offset + limit)
raw = queries.feed(conn, accepted_only=True, limit=fetch_n, offset=0, match=fts, **kw)
kept = filter_articles(raw, fp, now) if fp.avoid_terms else raw # word-boundary avoid-terms
items = kept[offset:offset + limit]
# Keep relevance order (don't paywall-reorder); the badge still shows true status.
return FeedResponse(topic=None, flavor=None, count=len(items),
items=[Article.from_row(r) for r in items])
@app.get("/api/puzzle/{game}")
def daily_puzzle(game: str, variant: str = Query("5")) -> dict:
with get_conn() as conn:
@@ -1600,8 +1856,29 @@ def create_app() -> FastAPI:
return games.word_puzzle_response(conn, local_today(), variant)
if game == "wordsearch":
return games.wordsearch_response(conn, local_today(), variant)
if game == "bloom":
return bloom.bloom_response(conn, local_today())
raise HTTPException(status_code=404, detail="no such puzzle")
@app.get("/api/puzzle/bloom/free")
def bloom_free(response: Response, format: str = "center", seed: str | None = None) -> dict:
# A free-play wheel: deterministic by `seed` (client stores it to resume),
# random when none is given. Center Circle or Wild Bloom. No DB, no sync.
fmt = "wild" if format == "wild" else "center"
s = seed if (seed and re.fullmatch(r"[A-Za-z0-9_-]{1,32}", seed)) else secrets.token_urlsafe(6)
response.headers["Cache-Control"] = "no-store"
with get_conn() as conn:
return bloom.bloom_free_response(conn, s, fmt)
@app.post("/api/bloom/report")
def bloom_report(body: BloomReportBody) -> dict:
# A player flagging a rejected word as "should count". Public + deduped;
# lands in the admin queue (approve→allow / block / dismiss).
with get_conn() as conn:
ok = bloom.add_report(conn, body.word, body.date, body.mode, body.format,
body.letters, body.reason)
return {"ok": bool(ok)}
@app.post("/api/puzzle/word/guess")
def word_guess(body: WordGuessRequest) -> dict:
if body.variant not in games.WORD_VARIANTS:
@@ -1615,7 +1892,9 @@ def create_app() -> FastAPI:
# --- Cross-device game state sync (signed-in only; merged server-side) ---
def _game_ok(game: str, variant: str) -> bool:
return (game == "word" and variant in games.WORD_VARIANTS) or \
(game == "wordsearch" and variant in games.WS_TIERS)
(game == "wordsearch" and variant in games.WS_TIERS) or \
(game == "bloom" and variant == "") or \
(game == "match" and variant in games.MATCH_VARIANTS) # "<tier>-<format>"
def _valid_pdate(d: str) -> bool:
return bool(re.match(r"^\d{4}-\d{2}-\d{2}$", d or "")) # plain YYYY-MM-DD, no junk rows
@@ -1647,6 +1926,27 @@ def create_app() -> FastAPI:
merged = games.save_game_state(conn, user["id"], body.game, body.variant, body.date, body.state or {})
return {"state": merged}
@app.put("/api/games/state/batch")
def game_state_put_batch(body: GameStateBatchBody, request: Request) -> dict:
"""Reconcile many (game, variant) states for one date in a SINGLE request, so
the hub doesn't fan out a dozen calls on every /play load. Each item is
validated/sanitized/merged exactly like the single PUT; unknown or oversized
items are dropped (not fatal). Signed-out → echo (no sync), same as the single
endpoint, so cross-device pull is preserved for signed-in users."""
if not _valid_pdate(body.date):
raise HTTPException(status_code=400, detail="bad date")
items = [it for it in body.items[:32]
if _game_ok(it.game, it.variant) and len(json.dumps(it.state)) <= 20000]
with get_conn() as conn:
user = _current_user(conn, request)
if not user:
return {"states": [{"game": it.game, "variant": it.variant, "state": it.state} for it in items]}
out = []
for it in items:
merged = games.save_game_state(conn, user["id"], it.game, it.variant, body.date, it.state or {})
out.append({"game": it.game, "variant": it.variant, "state": merged})
return {"states": out}
@app.get("/api/games/stats")
def game_stats_get(game: str, variant: str, request: Request) -> dict:
if not _game_ok(game, variant):
@@ -1656,6 +1956,41 @@ def create_app() -> FastAPI:
return {"stats": games.game_stats(conn, user["id"], game, variant) if user else None}
# --- Admin: Daily Word pool curation ---
# --- Admin: Bloom word curation (runtime, no deploy) ---
@app.get("/api/admin/bloom/reports")
def admin_bloom_reports(request: Request, status: str = "pending") -> dict:
with get_conn() as conn:
_require_admin(conn, request)
st = status if status in ("pending", "approved", "blocked", "dismissed") else "pending"
return {"status": st, "reports": bloom.list_reports(conn, st),
"overrides": bloom.list_overrides(conn)}
@app.post("/api/admin/bloom/reports/{report_id}")
def admin_bloom_resolve(report_id: int, body: BloomReportActionBody, request: Request) -> dict:
with get_conn() as conn:
admin = _require_admin(conn, request)
ok = bloom.resolve_report(conn, report_id, body.action, by=admin["email"])
if not ok:
raise HTTPException(status_code=400, detail="bad report or action")
return {"ok": True}
@app.post("/api/admin/bloom/overrides")
def admin_bloom_override(body: BloomOverrideBody, request: Request) -> dict:
with get_conn() as conn:
admin = _require_admin(conn, request)
ok = bloom.set_override(conn, body.word, body.action, reason=body.reason, by=admin["email"])
if not ok:
raise HTTPException(status_code=422,
detail="allow needs a real ≥4-letter word with no 'S'; block accepts any word")
return {"ok": True}
@app.delete("/api/admin/bloom/overrides/{word}")
def admin_bloom_override_clear(word: str, request: Request) -> dict:
with get_conn() as conn:
_require_admin(conn, request)
bloom.clear_override(conn, word)
return {"ok": True}
@app.get("/api/admin/word/lookup")
def admin_word_lookup(word: str, request: Request) -> dict:
with get_conn() as conn:
+317
View File
@@ -0,0 +1,317 @@
"""Bloom — the daily word wheel (Center Circle / Wild Bloom).
DESIGN and ACCEPTANCE are decoupled:
• DESIGN (wheel selection, tiers, pangram, the Full-Bloom target) uses the small
COMMON list only — deterministic, stored in daily_puzzles, and unaffected by
curation. Tiers are scored on COMMON so "Flourishing" is always reachable with
everyday vocabulary, and "Full Bloom" = finding the whole *designed* puzzle
(the broad bonus words are extra credit beyond it, never required).
• ACCEPTANCE is BROAD and DYNAMIC — every valid dictionary word buildable from
the wheel, computed at RESPONSE TIME as: broad dict {allow} {block}, where
allow/block are runtime admin overrides (bloom_word_overrides). So a missed
word can be allowed (or a junk word blocked) with NO deploy or regeneration.
Accept words never sit in the network response: clients validate against salted
hashes and compute their own score/tier/pangram from the 7 letters.
"""
from __future__ import annotations
import hashlib
import json
import random
import sqlite3
from itertools import combinations
from pathlib import Path
_DATA = Path(__file__).parent / "data"
_W = json.loads((_DATA / "bloom_words.json").read_text())
ACCEPT: list[str] = _W["accept"] # broad: all valid dictionary words
_COMMON: set[str] = set(_W["common"]) # tight: design / tiers / pangrams only
_COMMON_LS: list[tuple[str, frozenset]] = [(w, frozenset(w)) for w in _COMMON]
_AVOID: set[str] = set(json.loads((_DATA / "bloom_avoid.json").read_text()))
# Broad accept words bucketed by distinct-letter set, so the accepted set for a
# 7-letter wheel is gathered by unioning its ≤127 letter-subsets (fast) — no scan
# of the whole ~68k list per request.
_BY_SET: dict[frozenset, list[str]] = {}
for _w in ACCEPT:
_BY_SET.setdefault(frozenset(_w), []).append(_w)
# Candidate wheels = letter-sets of 7-distinct-letter COMMON words (every wheel
# has ≥1 recognizable pangram). Sorted for deterministic order.
_PANGRAM_SETS: dict[frozenset, list[str]] = {}
for _w in _COMMON:
_s = frozenset(_w)
if len(_s) == 7:
_PANGRAM_SETS.setdefault(_s, []).append(_w)
_CANDIDATES: list[frozenset] = sorted(_PANGRAM_SETS, key=lambda s: "".join(sorted(s)))
MIN_COMMON_WORDS, MAX_COMMON_WORDS = 14, 45
PANGRAM_BONUS = 7
# 8 / 30 / 70 — Flourishing at 70% keeps Bloom from becoming a completionist
# grind. Do NOT raise Flourishing above 0.70 (Codex).
TIER_PCTS: tuple[tuple[str, float], ...] = (
("Sprouting", 0.0), ("Budding", 0.08), ("Blooming", 0.30), ("Flourishing", 0.70),
)
TOP_TIER_PCT = 0.70
def score_word(word: str) -> int:
"""4-letter word = 1 point; longer = its length. Pangram bonus added on top."""
return 1 if len(word) == 4 else len(word)
def score_words(payload: dict, words) -> int:
"""Score found words for a wheel (pangram = uses all 7 letters). Used for the
player's running score AND the Full-Bloom check (vs the design's max_score)."""
letters = frozenset(payload["center"]) | frozenset(payload["outer"])
total = 0
for w in words:
total += score_word(w)
if frozenset(w) == letters:
total += PANGRAM_BONUS
return total
# --- DESIGN: common-only, deterministic, stored --------------------------------
def tiers_for(common_max: int) -> list[dict]:
return [{"name": n, "score": int(p * common_max)} for n, p in TIER_PCTS]
def _design(letters: frozenset, center: str):
"""Center-mode design from the COMMON list only."""
commons = [w for (w, s) in _COMMON_LS if center in w and s <= letters]
pangrams = [w for w in commons if frozenset(w) == letters]
common_max = sum(score_word(w) for w in commons) + PANGRAM_BONUS * len(pangrams)
display = sorted((p for p in pangrams if p not in _AVOID), key=lambda p: (len(p), p))
return commons, display, common_max
def _design_wild(letters: frozenset):
"""Wild design (no required center) from the COMMON list only."""
commons = [w for (w, s) in _COMMON_LS if s <= letters]
pangrams = [w for w in commons if frozenset(w) == letters]
common_max = sum(score_word(w) for w in commons) + PANGRAM_BONUS * len(pangrams)
display = sorted((p for p in pangrams if p not in _AVOID), key=lambda p: (len(p), p))
vowels = [c for c in sorted(letters) if c in "aeiou"]
return commons, display, common_max, (vowels[0] if vowels else sorted(letters)[0])
def _payload(letters: frozenset, center: str, display, common_max: int) -> dict:
return {
"center": center,
"outer": sorted(letters - {center}),
"pangram": display[0],
"tiers": tiers_for(common_max),
# Full Bloom = finding the whole designed (common) puzzle; broad bonus
# words push score past this but are never required.
"max_score": common_max,
}
def _generate(seed_str: str, fmt: str) -> dict:
"""Deterministically pick a wheel design for a seed + format."""
rng = random.Random(int(hashlib.sha256(seed_str.encode()).hexdigest(), 16))
order = _CANDIDATES[:]
rng.shuffle(order)
for letters in order:
if fmt == "wild":
commons, display, cmax, center = _design_wild(letters)
if len(commons) >= MIN_COMMON_WORDS and display:
return _payload(letters, center, display, cmax)
else:
centers = sorted(letters)
rng.shuffle(centers)
for center in centers:
commons, display, cmax = _design(letters, center)
if MIN_COMMON_WORDS <= len(commons) <= MAX_COMMON_WORDS and display:
return _payload(letters, center, display, cmax)
raise RuntimeError("bloom: no valid wheel found") # impossible with the vendored dict
def build_puzzle(date: str) -> dict:
"""The day's shared Center Circle wheel design (deterministic by date)."""
return {"date": date, **_generate(f"bloom:{date}", "center")}
def build_free(seed: str, fmt: str = "center") -> dict:
"""A free-play wheel design (deterministic by seed) — Center Circle or Wild."""
fmt = "wild" if fmt == "wild" else "center"
return {"seed": seed, "format": fmt, **_generate(f"free:{fmt}:{seed}", fmt)}
# --- ACCEPTANCE: broad + runtime overrides, computed at response time ----------
def overrides(conn: sqlite3.Connection) -> tuple[set, set]:
allow, block = set(), set()
for r in conn.execute("SELECT word, action FROM bloom_word_overrides"):
(allow if r["action"] == "allow" else block).add(r["word"])
return allow, block
def _broad_words_for(letters: frozenset) -> list[str]:
"""Every broad-dictionary word buildable from `letters` (distinct-set ⊆ letters)."""
ls = sorted(letters)
out = []
for r in range(1, len(ls) + 1):
for combo in combinations(ls, r):
out.extend(_BY_SET.get(frozenset(combo), ()))
return out
def accepted_words(conn: sqlite3.Connection, center: str, outer, require_center: bool) -> list[str]:
"""The wheel's accepted set RIGHT NOW: broad words buildable from the letters
(optionally requiring the center), plus allow-overrides, minus block-overrides."""
letters = frozenset(outer) | {center}
allow, block = overrides(conn)
seen, out = set(), []
for w in _broad_words_for(letters):
if w in seen or w in block:
continue
if require_center and center not in w:
continue
seen.add(w)
out.append(w)
for w in allow: # allow words that may not be in the broad dict
if w in seen or w in block or len(w) < 4 or "s" in w:
continue
if not (frozenset(w) <= letters) or (require_center and center not in w):
continue
seen.add(w)
out.append(w)
return sorted(out)
# --- daily_puzzles storage -----------------------------------------------------
def generate_bloom_puzzle(conn: sqlite3.Connection, date: str) -> dict:
"""Ensure the day's Bloom DESIGN exists in daily_puzzles. Idempotent, pure code."""
existing = conn.execute(
"SELECT payload_json FROM daily_puzzles WHERE puzzle_date=? AND game='bloom' AND variant=''", (date,)
).fetchone()
if existing:
return json.loads(existing["payload_json"])
payload = build_puzzle(date)
conn.execute(
"INSERT OR IGNORE INTO daily_puzzles (puzzle_date, game, variant, payload_json) VALUES (?, 'bloom', '', ?)",
(date, json.dumps(payload)),
)
conn.commit()
row = conn.execute(
"SELECT payload_json FROM daily_puzzles WHERE puzzle_date=? AND game='bloom' AND variant=''", (date,)
).fetchone()
return json.loads(row["payload_json"])
def stored_payload(conn: sqlite3.Connection, date: str) -> dict | None:
"""The day's design IF it already exists — never generates (used by the state
sanitizer, which must not trigger generation)."""
row = conn.execute(
"SELECT payload_json FROM daily_puzzles WHERE puzzle_date=? AND game='bloom' AND variant=''", (date,)
).fetchone()
return json.loads(row["payload_json"]) if row else None
def word_hash(salt: str, word: str) -> str:
return hashlib.sha256(f"{salt}:{word}".encode()).hexdigest()
def _response(salt: str, p: dict, words: list[str], extra: dict) -> dict:
return {
"game": "bloom",
"center": p["center"],
"outer": p["outer"],
"accepted": [word_hash(salt, w) for w in words], # NO plaintext words leak
"max_score": p["max_score"], # Full Bloom = designed puzzle
"tiers": p["tiers"],
**extra,
}
def bloom_response(conn: sqlite3.Connection, date: str) -> dict:
"""Daily Center Circle — accepted set computed live (broad + overrides)."""
p = generate_bloom_puzzle(conn, date)
words = accepted_words(conn, p["center"], p["outer"], require_center=True)
return _response(date, p, words, {"date": date})
def bloom_free_response(conn: sqlite3.Connection, seed: str, fmt: str) -> dict:
"""Free-play wheel keyed by `seed` (resumable). Accepted set computed live."""
p = build_free(seed, fmt)
words = accepted_words(conn, p["center"], p["outer"], require_center=p["format"] != "wild")
return _response(seed, p, words, {"mode": "free", "format": p["format"], "seed": p["seed"]})
# --- runtime curation: overrides + player reports ------------------------------
def set_override(conn: sqlite3.Connection, word: str, action: str, reason: str | None = None,
by: str | None = None) -> bool:
word = (word or "").strip().lower()
if not (word.isalpha() and action in ("allow", "block")):
return False
# An ALLOW that violates Bloom's hard rules (≥4 letters, no 'S') could never
# count — reject it rather than store an inert override. BLOCK stays permissive.
if action == "allow" and (len(word) < 4 or "s" in word):
return False
conn.execute(
"INSERT INTO bloom_word_overrides (word, action, reason, created_by) VALUES (?,?,?,?) "
"ON CONFLICT(word) DO UPDATE SET action=excluded.action, reason=excluded.reason, "
"created_by=excluded.created_by, created_at=CURRENT_TIMESTAMP",
(word, action, reason, by),
)
conn.commit()
return True
def clear_override(conn: sqlite3.Connection, word: str) -> None:
conn.execute("DELETE FROM bloom_word_overrides WHERE word=?", ((word or "").strip().lower(),))
conn.commit()
def list_overrides(conn: sqlite3.Connection) -> list[dict]:
return [dict(r) for r in conn.execute(
"SELECT word, action, reason, created_by, created_at FROM bloom_word_overrides ORDER BY created_at DESC")]
def add_report(conn: sqlite3.Connection, word: str, puzzle_date, mode, fmt, letters, reason) -> bool:
word = (word or "").strip().lower()
if not (word.isalpha() and 4 <= len(word) <= 24):
return False
# Don't pile up duplicate pending reports for the same word.
dup = conn.execute(
"SELECT 1 FROM bloom_word_reports WHERE word=? AND status='pending'", (word,)).fetchone()
if dup:
return True
conn.execute(
"INSERT INTO bloom_word_reports (word, puzzle_date, mode, format, letters, reason) "
"VALUES (?,?,?,?,?,?)",
(word, str(puzzle_date or "")[:16], str(mode or "")[:8], str(fmt or "")[:8],
str(letters or "")[:16], str(reason or "")[:60]),
)
conn.commit()
return True
def list_reports(conn: sqlite3.Connection, status: str = "pending", limit: int = 100) -> list[dict]:
return [dict(r) for r in conn.execute(
"SELECT id, word, puzzle_date, mode, format, letters, reason, status, created_at "
"FROM bloom_word_reports WHERE status=? ORDER BY created_at DESC LIMIT ?", (status, limit))]
def resolve_report(conn: sqlite3.Connection, report_id: int, action: str, by: str | None = None) -> bool:
"""action: 'approve' (→ allow override) | 'block' (→ block override) | 'dismiss'."""
status = {"approve": "approved", "block": "blocked", "dismiss": "dismissed"}.get(action)
row = conn.execute("SELECT word FROM bloom_word_reports WHERE id=?", (report_id,)).fetchone()
if not row or not status:
return False
if action == "approve":
if not set_override(conn, row["word"], "allow", reason="report", by=by):
return False # can't allow (hard rule) — leave pending; dismiss instead
elif action == "block":
set_override(conn, row["word"], "block", reason="report", by=by)
conn.execute("UPDATE bloom_word_reports SET status=? WHERE id=?", (status, report_id))
conn.commit()
return True
+77 -27
View File
@@ -1,6 +1,7 @@
from __future__ import annotations
import argparse
import contextlib
import os
import sqlite3
from pathlib import Path
@@ -10,7 +11,7 @@ from .db import connect, init_db
from .digest import send_due_digests
from .games import generate_daily_puzzles
from .localtime import local_today
from .dedup import DEFAULT_THRESHOLD, DEFAULT_WINDOW_DAYS, dedup as run_dedup
from .dedup import DEFAULT_THRESHOLD, DEFAULT_WINDOW_DAYS, cluster_duplicates, dedup as run_dedup
from .enrich import enrich_brief_images, enrich_recent_images, enrich_summarized_images
from .summarize import generate_summary, get_summary
from .feeds import (
@@ -39,9 +40,17 @@ DEFAULT_DB = ROOT / "data" / "goodnews.sqlite3"
DEFAULT_SOURCES = ROOT / "config" / "sources.toml"
def _default_db() -> Path:
# Honor GOODNEWS_DB like the rest of the app (db.connect) does, so `GOODNEWS_DB=… `
# actually targets that DB instead of being silently ignored — otherwise a copy-DB
# maintenance run (e.g. dedup --force-recluster) can land on production by surprise.
return Path(os.environ.get("GOODNEWS_DB") or DEFAULT_DB)
def main() -> None:
parser = argparse.ArgumentParser(prog="goodnews")
parser.add_argument("--db", type=Path, default=DEFAULT_DB, help="SQLite database path")
parser.add_argument("--db", type=Path, default=_default_db(),
help="SQLite database path (defaults to $GOODNEWS_DB, else the bundled data/ DB)")
subparsers = parser.add_subparsers(dest="command", required=True)
subparsers.add_parser("init-db", help="Create or update the SQLite schema")
@@ -144,6 +153,9 @@ def main() -> None:
dedup_parser.add_argument("--embed-limit", type=int, help="Cap how many missing embeddings to compute")
dedup_parser.add_argument("--base-url", help="OpenAI-compatible base URL")
dedup_parser.add_argument("--model", help="Chat model name (unused for embeddings)")
dedup_parser.add_argument("--force-recluster", action="store_true",
help="Re-cluster the EXISTING corpus even if no new embeddings "
"(re-applies representative policy; cycle-locked, no model needed)")
check_llm_parser = subparsers.add_parser("check-llm", help="Check local OpenAI-compatible model endpoint")
check_llm_parser.add_argument("--base-url", help="OpenAI-compatible base URL, e.g. http://127.0.0.1:1234/v1")
@@ -221,7 +233,9 @@ def main() -> None:
import json as _json
p = _json.loads(r["preview_json"])
line += f" (accept {round(p.get('acceptance_rate', 0) * 100)}%, sampled {p.get('sampled', 0)})"
_rate = p.get("acceptance_rate")
_rate_str = f"{round(_rate * 100)}%" if _rate is not None else ""
line += f" (accept {_rate_str}, sampled {p.get('sampled', 0)})"
print(line)
elif args.command == "promote-candidate":
init_db(conn)
@@ -286,15 +300,31 @@ def main() -> None:
print(f"enrich-images: {found} new image(s) for summarized articles")
elif args.command == "dedup":
init_db(conn)
client = llm_client_from_args(args)
stats = run_dedup(
conn, client, threshold=args.threshold, window_days=args.window_days, embed_limit=args.embed_limit
)
print(
f"dedup: embedded={stats['embedded']} articles={stats['articles']} "
f"clusters={stats['clusters']} duplicate_clusters={stats['duplicate_clusters']} "
f"duplicates_hidden={stats['duplicates']}"
)
if args.force_recluster:
# Re-apply representative policy to the EXISTING corpus. The normal path
# fast-skips when no new embeddings exist, so it would NOT pick up a policy
# change. Cycle-locked so it can't overlap the scheduled timer; no model
# needed (pure re-cluster over stored embeddings).
with cycle_lock(args.db) as acquired:
if not acquired:
print("dedup: a cycle is already running; re-run --force-recluster after it finishes")
return
stats = cluster_duplicates(conn, threshold=args.threshold, window_days=args.window_days)
print(
f"dedup (forced recluster): articles={stats['articles']} "
f"clusters={stats['clusters']} duplicate_clusters={stats['duplicate_clusters']} "
f"duplicates_hidden={stats['duplicates']}"
)
else:
client = llm_client_from_args(args)
stats = run_dedup(
conn, client, threshold=args.threshold, window_days=args.window_days, embed_limit=args.embed_limit
)
print(
f"dedup: embedded={stats['embedded']} articles={stats['articles']} "
f"clusters={stats['clusters']} duplicate_clusters={stats['duplicate_clusters']} "
f"duplicates_hidden={stats['duplicates']}"
)
elif args.command == "check-llm":
client = llm_client_from_args(args)
try:
@@ -368,7 +398,9 @@ def list_recent(conn: sqlite3.Connection, limit: int, accepted_only: bool) -> No
def print_preview(p: dict) -> None:
mode = "model" if p["classified"] else "heuristic"
print(f"Preview of {p['url']} ({mode})")
print(f" sampled={p['sampled']} accepted={p['accepted']} ({p['acceptance_rate']*100:.0f}%)")
rate = p.get("acceptance_rate")
rate_str = f"{rate * 100:.0f}%" if rate is not None else "— (all held)"
print(f" sampled={p['sampled']} accepted={p['accepted']} ({rate_str})")
print(f" freshness: newest={p['newest_published'] or 'unknown'} in_last_7d={p['recent_7d']}")
print(f" averages: cortisol={p['avg_cortisol']} ragebait={p['avg_ragebait']} pr_risk={p['avg_pr_risk']}")
if p["topic_mix"]:
@@ -398,6 +430,28 @@ def check_feeds(conn: sqlite3.Connection, include_inactive: bool = False) -> Non
print(f"--- {ok}/{len(rows)} feeds healthy ---")
@contextlib.contextmanager
def cycle_lock(db_path):
"""Exclusive, non-blocking lock shared by the scheduled cycle and any manual job
that mutates the corpus (e.g. a forced dedup re-cluster), so they can never overlap
and contend on the database/model. Yields True if acquired, False if already held."""
import fcntl
lock_path = Path(db_path).parent / ".goodnews-cycle.lock"
lock_file = open(lock_path, "w")
try:
fcntl.flock(lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB)
except OSError:
lock_file.close()
yield False
return
try:
yield True
finally:
fcntl.flock(lock_file, fcntl.LOCK_UN)
lock_file.close()
def run_cycle(conn: sqlite3.Connection, args: argparse.Namespace) -> None:
"""One end-to-end pass for a scheduler: poll due sources, classify the new
arrivals, dedup, rebuild today's brief. Each step is independent and
@@ -406,21 +460,11 @@ def run_cycle(conn: sqlite3.Connection, args: argparse.Namespace) -> None:
Holds an exclusive lock so a manual run and the systemd timer (or two timer
ticks) can never overlap and contend on the database and model.
"""
import fcntl
lock_path = Path(args.db).parent / ".goodnews-cycle.lock"
lock_file = open(lock_path, "w")
try:
fcntl.flock(lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB)
except OSError:
print("cycle: another cycle is already running; skipping")
lock_file.close()
return
try:
with cycle_lock(args.db) as acquired:
if not acquired:
print("cycle: another cycle is already running; skipping")
return
_run_cycle_locked(conn, args)
finally:
fcntl.flock(lock_file, fcntl.LOCK_UN)
lock_file.close()
def _run_cycle_locked(conn: sqlite3.Connection, args: argparse.Namespace) -> None:
@@ -505,6 +549,12 @@ def _run_cycle_locked(conn: sqlite3.Connection, args: argparse.Namespace) -> Non
except Exception as exc:
print(f"review: skipped ({exc})")
try:
from .queries import reindex_search
print(f"search: indexed {reindex_search(conn)} articles")
except Exception as exc: # noqa: BLE001 — search index is non-critical
print(f"search: skipped ({exc})")
if not args.no_digest:
try:
sent = send_due_digests(conn) # morning-gated + deduped internally
+1
View File
@@ -0,0 +1 @@
["vagina", "vulva", "nipple", "rectum", "anal", "fecal", "ejaculation", "eunuch", "nude", "nudity", "butt"]
+1
View File
@@ -0,0 +1 @@
["death","dying","died","killed","killing","murder","murdered","corpse","coffin","funeral","grave","buried","burial","weapon","gunshot","warfare","violent","violence","deadly","lethal","poison","poisoned","suicide","slaughter","victim","bleeding","wound","wounded","vomit","vomiting","vomited","diarrhea","disease","diseased","cancer","tumor","illness","infection","infected","plague","disabled","lucifer","satan","demon","demonic","devil","damned","hatred","hateful","terror","terrorize","hostage","kidnap","kidnapped","abuse","abused","assault","trauma","traumatic","anxiety","depression","depressed","divorce","divorced","bankrupt","eviction","evicted","layoff","drowned","drowning","choking","suffocate","starving","famine","poverty","despair","misery","miserable","tragic","tragedy","horror","horrible","nightmare","panic","dread","grief","grieving","mourning","rotting","decay","decayed","maggot","vermin","filth","sewage","manure"]
File diff suppressed because one or more lines are too long
+80 -1
View File
@@ -28,6 +28,7 @@ CREATE TABLE IF NOT EXISTS sources (
retry_after_at TEXT,
review_flag INTEGER NOT NULL DEFAULT 0,
review_reason TEXT,
x_handle TEXT, -- the source's own verified X handle, if known
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
);
@@ -69,6 +70,7 @@ CREATE TABLE IF NOT EXISTS article_scores (
reason_text TEXT,
topic TEXT,
flavor TEXT,
language TEXT,
model_name TEXT,
scored_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
);
@@ -300,6 +302,13 @@ CREATE TABLE IF NOT EXISTS daily_puzzles (
UNIQUE (puzzle_date, game, variant)
);
-- Full-text search over the PUBLIC article corpus (title/description/source/tags).
-- Standalone FTS5 (not external-content) since the searchable text spans tables;
-- rebuilt from the accepted, non-duplicate set on each ingest cycle (+ lazily).
CREATE VIRTUAL TABLE IF NOT EXISTS article_search USING fts5(
article_id UNINDEXED, title, body, source_name, tags
);
CREATE TABLE IF NOT EXISTS game_state (
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
game TEXT NOT NULL, -- 'word' | 'wordsearch'
@@ -310,6 +319,30 @@ CREATE TABLE IF NOT EXISTS game_state (
PRIMARY KEY (user_id, game, variant, puzzle_date)
);
-- Bloom runtime word curation (no deploy needed). The accepted set is computed
-- live as: broad dictionary {allow} {block}. Admin-managed; one row per word.
CREATE TABLE IF NOT EXISTS bloom_word_overrides (
word TEXT PRIMARY KEY, -- lowercase
action TEXT NOT NULL, -- 'allow' | 'block'
reason TEXT,
created_by TEXT,
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
);
-- Player "this should count" reports → admin queue (approve→allow / block / dismiss).
CREATE TABLE IF NOT EXISTS bloom_word_reports (
id INTEGER PRIMARY KEY AUTOINCREMENT,
word TEXT NOT NULL, -- lowercase
puzzle_date TEXT,
mode TEXT, -- 'daily' | 'free'
format TEXT, -- 'center' | 'wild'
letters TEXT, -- the wheel's 7 letters (for context)
reason TEXT, -- why it was rejected (e.g. 'not in the word list')
status TEXT NOT NULL DEFAULT 'pending', -- 'pending' | 'approved' | 'blocked' | 'dismissed'
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_bloom_reports_status ON bloom_word_reports(status, created_at);
CREATE TABLE IF NOT EXISTS user_follows (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
@@ -327,6 +360,49 @@ CREATE TABLE IF NOT EXISTS digest_sends (
sent_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
UNIQUE (user_id, brief_date)
);
-- Publishing Desk: a platform-NEUTRAL outbound-share record (X first; Bluesky /
-- Threads / newsletter later reuse this). One row per (article, platform); the
-- queue tops up without ever overwriting saved text/handles. opened != posted —
-- Web Intents can't confirm a post, so the human confirms the terminal state.
CREATE TABLE IF NOT EXISTS outbound_shares (
id INTEGER PRIMARY KEY AUTOINCREMENT,
article_id INTEGER NOT NULL REFERENCES articles(id) ON DELETE CASCADE,
platform TEXT NOT NULL DEFAULT 'x',
status TEXT NOT NULL DEFAULT 'queued', -- queued|drafting|opened|posted|skipped|snoozed
social_score INTEGER, -- LLM "stop-scrolling" interest (0-10)
rationale TEXT, -- why someone would stop scrolling
talking_points TEXT, -- JSON array of factual points
angle TEXT, -- a suggested conversational angle
entities TEXT, -- JSON array of raw named entities (LLM-extracted)
suggested_handles TEXT, -- JSON array of {handle, profile_url, via}
draft_text TEXT, -- autosaved in-progress blurb (the human writes it)
final_text TEXT, -- what was actually posted (teaches voice later)
share_url TEXT, -- the exact /a/{id}?utm... link used
post_url TEXT, -- the resulting tweet URL, if captured
snooze_until TEXT, -- 'not right now' (re-eligible after this)
opened_at TEXT,
posted_at TEXT,
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
UNIQUE (article_id, platform)
);
CREATE INDEX IF NOT EXISTS idx_outbound_shares_status ON outbound_shares(platform, status);
-- Verified handle directory — the LLM only ever proposes NAMES; the @handle comes
-- only from here (or a source's own x_handle). Aliases resolve consistently by each
-- having its own row pointing at the same handle (e.g. "Johns Hopkins University"
-- and "Johns Hopkins").
CREATE TABLE IF NOT EXISTS entity_handles (
id INTEGER PRIMARY KEY AUTOINCREMENT,
entity_name TEXT NOT NULL, -- display name as entered
normalized_name TEXT NOT NULL, -- lowercased/stripped match key
platform TEXT NOT NULL DEFAULT 'x',
handle TEXT NOT NULL, -- e.g. @AnthropicAI
profile_url TEXT,
verified_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
UNIQUE (normalized_name, platform)
);
"""
@@ -359,7 +435,7 @@ def _migrate(conn: sqlite3.Connection) -> None:
need an explicit, idempotent ALTER guarded by the current column set.
"""
score_cols = {row["name"] for row in conn.execute("PRAGMA table_info(article_scores)")}
for column in ("topic", "flavor"):
for column in ("topic", "flavor", "language"):
if column not in score_cols:
conn.execute(f"ALTER TABLE article_scores ADD COLUMN {column} TEXT")
@@ -397,6 +473,9 @@ def _migrate(conn: sqlite3.Connection) -> None:
for column, decl in health_columns.items():
if column not in source_cols:
conn.execute(f"ALTER TABLE sources ADD COLUMN {column} {decl}")
# Publishing Desk: the source's own verified X handle (suggested when sharing).
if "x_handle" not in source_cols:
conn.execute("ALTER TABLE sources ADD COLUMN x_handle TEXT")
# Lifecycle: status (active/paused/retired) + content_visible. `active` is
# kept as a synced mirror so legacy code (scheduler/CLI) keeps working.
+25 -3
View File
@@ -102,7 +102,8 @@ def cluster_duplicates(
(COALESCE(s.constructive_score,0) + COALESCE(s.agency_score,0)
+ COALESCE(s.human_benefit_score,0) + src.trust_score
- COALESCE(s.cortisol_score,0) - COALESCE(s.ragebait_score,0)
- COALESCE(s.pr_risk_score,0)) AS rank_score
- COALESCE(s.pr_risk_score,0)) AS rank_score,
COALESCE(s.accepted, 0) AS accepted
FROM articles a
JOIN article_embeddings e ON e.article_id = a.id
JOIN sources src ON src.id = a.source_id
@@ -114,7 +115,8 @@ def cluster_duplicates(
items = []
for r in rows:
vec = _unit(array("f", r["vector"]).tolist())
items.append({"id": r["id"], "ord": _day_ordinal(r["dt"]), "vec": vec, "score": r["rank_score"]})
items.append({"id": r["id"], "ord": _day_ordinal(r["dt"]), "vec": vec,
"score": r["rank_score"], "accepted": bool(r["accepted"])})
clusters: list[dict] = [] # {anchor_vec, anchor_ord, members:[item]}
for it in items:
@@ -130,6 +132,14 @@ def cluster_duplicates(
if not placed:
clusters.append({"anchor_vec": it["vec"], "anchor_ord": it["ord"], "members": [it]})
# Which articles are CURRENTLY a representative (something points at them)? Captured
# BEFORE we reset, so we can keep an established canonical stable across runs.
prior_reps = {
row[0] for row in conn.execute(
"SELECT DISTINCT duplicate_of FROM articles WHERE duplicate_of IS NOT NULL"
)
}
# Reset prior decisions for everything we considered, then re-apply.
considered = [it["id"] for it in items]
conn.executemany(
@@ -142,7 +152,19 @@ def cluster_duplicates(
if len(cl["members"]) < 2:
continue
dup_clusters += 1
rep = max(cl["members"], key=lambda m: (m["score"], -m["id"]))
# Representative priority (highest wins), in order:
# 1. accepted/serveable — an accepted page must never be retired to a REJECTED
# rep (that page would 404 with nothing to redirect to).
# 2. established rep — if a member is already the cluster's canonical, keep it,
# so an indexed URL doesn't churn when a newer twin arrives.
# 3. quality score — decides genuinely-new clusters.
# 4. -id — deterministic final tiebreak (older wins).
rep = max(cl["members"], key=lambda m: (
1 if m["accepted"] else 0,
1 if m["id"] in prior_reps else 0,
m["score"],
-m["id"],
))
for m in cl["members"]:
if m["id"] != rep["id"]:
conn.execute(
+84 -1
View File
@@ -243,6 +243,11 @@ def poll_source(conn: sqlite3.Connection, source: sqlite3.Row) -> dict:
}
# Deep-preview accessibility sample bounds (module-level so tests can shrink them).
_ACCESS_FETCH_TIMEOUT = 6 # per-article socket timeout (seconds)
_ACCESS_DEADLINE_S = 12.0 # hard wall-clock cap for the whole access phase
def preview_feed(url: str, sample: int = 25, pr_risk_default: int = 3, client=None, fetcher=None) -> dict:
"""Fetch and score a sample of a feed WITHOUT persisting anything.
@@ -302,12 +307,85 @@ def preview_feed(url: str, sample: int = 25, pr_risk_default: int = 3, client=No
cortisol=ns["cortisol_score"],
ragebait=ns["ragebait_score"],
pr_risk=ns["pr_risk_score"],
reason_code=ns["reason_code"],
language=ns.get("language", ""),
)
except Exception:
pass # one bad item shouldn't sink the whole preview
total = len(rows)
accepted = sum(1 for r in rows if r["accepted"])
# Non-English items are HELD (English-only feed for now), not calm-filter
# rejections — surface the count and judge acceptance over English items only, so
# a multilingual wire (e.g. PR Newswire) isn't unfairly penalized in the preview.
non_english = sum(1 for r in rows if r.get("reason_code") == "non_english")
judged = total - non_english
# Accessibility sample — deep preview only (it already means "spend ~a minute to
# really know"). Layered per Codex: the instant DOMAIN rule + a small sampled
# article fetch, so a paywall verdict rests on evidence, not domain alone (NYT
# Learning proved domain rules false-positive).
from .paywall import check_article_access, is_paywalled
domain_paywalled = is_paywalled(url)
access = None
access_verdict = None
if classified and rows:
from concurrent.futures import ThreadPoolExecutor, as_completed
# prefer the URLs the model would actually surface, then fill from the rest
ordered = [r["url"] for r in rows if r["accepted"] and r["url"]] + \
[r["url"] for r in rows if not r["accepted"] and r["url"]]
seen, sample_urls = set(), []
for u in ordered:
if u not in seen:
seen.add(u)
sample_urls.append(u)
if len(sample_urls) >= 6:
break
results = []
if sample_urls:
af = fetcher or fetch_feed
ex = ThreadPoolExecutor(max_workers=min(6, len(sample_urls)))
futs = {ex.submit(check_article_access, u, af, _ACCESS_FETCH_TIMEOUT): u for u in sample_urls}
done = {}
try:
# Hard wall-clock cap: the access step can NEVER stall the whole
# preview. Fetches run in parallel; whatever hasn't finished by the
# deadline is left 'unknown' (unverified — never counts as walled).
# shutdown(wait=False, cancel_futures=True) below means we don't block
# on stragglers (no `with ... as ex` join), so wall-clock == the cap.
for fut in as_completed(futs, timeout=_ACCESS_DEADLINE_S):
done[futs[fut]] = fut.result()
except Exception: # noqa: BLE001 — overall deadline hit; use what finished
pass
ex.shutdown(wait=False, cancel_futures=True)
results = [(u, done.get(u, "unknown")) for u in sample_urls]
counts = Counter(a for _, a in results)
readable, paywalled = counts.get("readable", 0), counts.get("paywalled", 0)
assessable = readable + paywalled
inacc = (paywalled / assessable) if assessable else None
# `blocked` is deliberately NOT counted as inaccessible: a bot-block isn't a
# reader paywall (it may open fine in a browser), so it can never push a
# source to reject-ready — only readable-vs-paywalled evidence does. Need a
# few clearly-assessable samples before judging confidently.
ENOUGH = 3
if assessable < ENOUGH:
access_verdict = "review" # mostly blocked/unknown — can't confirm; click examples
elif domain_paywalled and inacc >= 0.7:
access_verdict = "reject-ready" # domain rule AND sample agree it's walled
elif domain_paywalled:
access_verdict = "review" # domain says walled but the sample isn't — likely a false positive, look
elif inacc >= 0.7:
access_verdict = "review" # not on the list but mostly walled — candidate for the rule
elif inacc <= 0.3:
access_verdict = "fine"
else:
access_verdict = "review" # mixed
access = {
"checked": len(results),
"readable": readable, "paywalled": paywalled,
"blocked": counts.get("blocked", 0), "unknown": counts.get("unknown", 0),
"examples": [{"url": u, "access": a} for u, a in results][:5],
}
def _avg(key: str) -> float:
return round(sum(r[key] for r in rows) / total, 1) if total else 0.0
@@ -329,12 +407,17 @@ def preview_feed(url: str, sample: int = 25, pr_risk_default: int = 3, client=No
"sampled": total,
"classified": classified,
"accepted": accepted,
"acceptance_rate": round(accepted / total, 2) if total else 0.0,
"non_english": non_english, # held for language (English-only feed for now)
# None (not 0%) when there are no English items to judge — "all held", not "all rejected".
"acceptance_rate": round(accepted / judged, 2) if judged else None,
"avg_cortisol": _avg("cortisol"),
"avg_ragebait": _avg("ragebait"),
"avg_pr_risk": _avg("pr_risk"),
"newest_published": newest,
"recent_7d": recent_7d,
"paywall_rule": domain_paywalled, # instant domain hint
"access": access, # sampled readable/paywalled/blocked/unknown (deep only)
"access_verdict": access_verdict, # fine | review | reject-ready
"topic_mix": dict(Counter(r["topic"] for r in rows if r["topic"])),
"flavor_mix": dict(Counter(r["flavor"] for r in rows if r["flavor"])),
"examples_accepted": [r["title"] for r in rows if r["accepted"]][:5],
+135 -1
View File
@@ -17,6 +17,8 @@ import re
import sqlite3
from pathlib import Path
from . import bloom
_DATA = Path(__file__).parent / "data"
_POOL = json.loads((_DATA / "wordpool.json").read_text()) # curated static answer pool
# Guess dictionaries (same lists the client validates against) — used server-side to
@@ -26,6 +28,9 @@ _DICT = {v: set(json.loads((_DATA / f"words-{v}.json").read_text())) for v in ("
# Daily Word: 5 letters / 6 guesses · Long Word: 6 letters / 7 guesses.
WORD_VARIANTS = {"5": {"length": 5, "guesses": 6}, "6": {"length": 6, "guesses": 7}}
# Memory Match daily sync variants = "<tier>-<format>" (free play stays local).
MATCH_VARIANTS = {f"{t}-{f}" for t in ("gentle", "standard", "expert") for f in ("icons", "colors")}
def _seed(*parts: str) -> int:
return int(hashlib.sha256(":".join(parts).encode()).hexdigest(), 16)
@@ -625,12 +630,29 @@ def _merge_word(a: dict, b: dict) -> dict:
return a if _word_rank(a) >= _word_rank(b) else b
def _merge_bloom(a: dict, b: dict) -> dict:
"""Union found words — a find is monotonic (you can't un-find one), so the
union across devices is always correct. Score is recomputed by the sanitizer."""
found, seen = [], set()
for w in list(a.get("found") or []) + list(b.get("found") or []):
if isinstance(w, str) and w not in seen:
seen.add(w)
found.append(w)
return {"found": found}
def merge_game_state(game: str, a: dict | None, b: dict | None) -> dict:
if not a:
return dict(b or {})
if not b:
return dict(a or {})
return _merge_wordsearch(a, b) if game == "wordsearch" else _merge_word(a, b)
if game == "wordsearch":
return _merge_wordsearch(a, b)
if game == "bloom":
return _merge_bloom(a, b)
if game == "match":
return _merge_match(a, b)
return _merge_word(a, b)
def load_game_state(conn: sqlite3.Connection, user_id: int, game: str, variant: str, date: str) -> dict | None:
@@ -729,10 +751,92 @@ def _sanitize_word(variant: str, state: dict) -> dict:
return out
def _sanitize_bloom(conn: sqlite3.Connection, date: str, state: dict) -> dict:
"""Trust only finds real for THIS wheel — a word in the day's DYNAMIC accept
set (broad dict + overrides, computed live; shape-only if the puzzle doesn't
exist yet). Dedupes and recomputes score server-side; Full Bloom = reaching the
designed puzzle's total (max_score). Never trusts a client-sent score/full."""
payload = bloom.stored_payload(conn, date)
valid = (set(bloom.accepted_words(conn, payload["center"], payload["outer"], True))
if payload else None)
clean, seen = [], set()
for w in (state.get("found") or []):
if not isinstance(w, str):
continue
w = w.strip().lower()
if not w or w in seen:
continue
if valid is not None:
if w not in valid:
continue
elif not (len(w) >= 4 and w.isalpha() and "s" not in w): # no puzzle yet → shape only
continue
seen.add(w)
clean.append(w)
clean.sort()
score = bloom.score_words(payload, clean) if payload else 0
out = {"found": clean, "score": score}
if payload and clean and score >= payload.get("max_score", 1):
out["full"] = True # Full Bloom — found the whole designed puzzle
return out
_MATCH_MAX_FACES = 12 # the largest board uses 8 faces; cap generously
_MATCH_FACES = {"gentle": 6, "standard": 8, "expert": 8} # faces per tier = completion target
# Valid face keys — MIRRORS the frontend (icons.js ICON_KEYS + palette.js COLOR_KEYS).
# Matched keys are validated against this so bogus/junk keys can't inflate the
# completion count. Adding a face on the frontend? Add it here too; a missing key only
# under-counts (benign, self-heals once synced), never crashes.
_MATCH_FACE_KEYS = frozenset({
"sun", "moon", "star", "cloud", "raindrop", "wave", "leaf", "flower", "seedling",
"tree", "mountain", "shell", "feather", "acorn", "butterfly", "rainbow", "heart",
"sparkle", "home", "book", "teacup", "candle", "lantern", "compass", "kite", "note",
"boat", "fish", "bird", "mushroom", "bell", "snowflake", "clover",
"color-rose", "color-coral", "color-amber", "color-gold", "color-lime", "color-green",
"color-teal", "color-cyan", "color-sky", "color-blue", "color-indigo", "color-violet",
"color-plum", "color-brown", "color-sand", "color-slate", "color-charcoal", "color-cream",
})
def _match_faces(variant: str) -> int:
return _MATCH_FACES.get((variant or "").split("-", 1)[0], 8)
def _sanitize_match(variant: str, state: dict) -> dict:
"""Light, durability-only sanitize. Memory Match has nothing to cheat — the
board is deterministic and fully visible, with no score/leaderboard — so we
just drop malformed junk: matched FACE KEYS (icon name / color key, never raw
indices, so progress survives layout tweaks), validated against the real face set
(junk can't count), deduped, with a clamped move count. `done` is DERIVED from the
matched count vs the tier's face target — never trusted from the client, so a
stale/bogus flag can't mark a board cleared (matters once the ritual reads it)."""
seen: set[str] = set()
matched: list[str] = []
for k in (state.get("matched") or []):
if isinstance(k, str) and k in _MATCH_FACE_KEYS and k not in seen:
seen.add(k)
matched.append(k)
if len(matched) >= _MATCH_MAX_FACES:
break
return {"matched": matched, "moves": max(0, min(_int(state.get("moves")), 100_000)),
"done": len(matched) >= _match_faces(variant)}
def _merge_match(a: dict, b: dict) -> dict:
"""Union matched faces across devices, keep the larger move count. `done` is not
carried here — the post-merge sanitize re-derives it from the matched count."""
matched = list(dict.fromkeys([*(a.get("matched") or []), *(b.get("matched") or [])]))[:_MATCH_MAX_FACES]
return {"matched": matched, "moves": max(_int(a.get("moves")), _int(b.get("moves")))}
def sanitize_game_state(conn: sqlite3.Connection, game: str, variant: str, date: str, state: dict) -> dict:
"""Never trust client JSON at the storage layer — normalize before merge/store."""
if game == "wordsearch":
return _sanitize_wordsearch(conn, variant, date, state or {})
if game == "bloom":
return _sanitize_bloom(conn, date, state or {})
if game == "match":
return _sanitize_match(variant, state or {})
return _sanitize_word(variant, state or {})
@@ -770,6 +874,31 @@ def game_stats(conn: sqlite3.Connection, user_id: int, game: str, variant: str)
if game == "wordsearch":
times = [s.get("ms") for s in states if s.get("ms")]
return {"completed": sum(1 for s in states if s.get("ms")), "best": min(times) if times else 0}
if game == "bloom":
# Calm, no-pressure record: days played, lifetime words, Full Blooms, and
# the best tier ever reached (computed per day from that wheel's tiers).
tier_names = [t[0] for t in bloom.TIER_PCTS]
played = words = full = 0
best_idx = -1
for r in rows:
try:
s = json.loads(r["state_json"])
except (ValueError, TypeError):
continue
found = s.get("found") or []
if not found:
continue
played += 1
words += len(found)
if s.get("full"):
full += 1
p = bloom.stored_payload(conn, r["puzzle_date"])
if p:
sc = s.get("score") or 0
idx = max((i for i, t in enumerate(p["tiers"]) if sc >= t["score"]), default=0)
best_idx = max(best_idx, idx)
return {"played": played, "words": words, "full_blooms": full,
"best_tier": tier_names[best_idx] if best_idx >= 0 else None}
played = won = 0
dist: dict[int, int] = {}
streak = 0
@@ -823,4 +952,9 @@ def generate_daily_puzzles(conn: sqlite3.Connection, date: str, client=None) ->
).fetchone():
generate_wordsearch_puzzle(conn, date, client=client)
made += 1
if not conn.execute(
"SELECT 1 FROM daily_puzzles WHERE puzzle_date=? AND game='bloom' AND variant=''", (date,)
).fetchone():
bloom.generate_bloom_puzzle(conn, date) # pure code, no LLM
made += 1
return made
+93 -6
View File
@@ -49,6 +49,7 @@ CLASSIFICATION_SCHEMA = {
"tags",
"reason_code",
"reason_text",
"language",
],
"properties": {
"constructive_score": _SCORE_FIELD,
@@ -64,6 +65,7 @@ CLASSIFICATION_SCHEMA = {
"tags": {"type": "array", "items": {"type": "string", "enum": list(ALLOWED_TAGS)}, "maxItems": MAX_TAGS},
"reason_code": {"type": "string"},
"reason_text": {"type": "string"},
"language": {"type": "string"}, # ISO 639-1 of the article's own text (en, de, es…)
},
}
@@ -104,6 +106,11 @@ Grouping tags — choose ONLY from this controlled vocabulary:
Tag discipline: assign 1-4 tags; prefer fewer, stronger ones; never tag by weak
association; pick tags a reader would reasonably use to find this story later.
Also report `language`: the ISO 639-1 code of the article's OWN text (the title and
description), e.g. "en", "de", "es", "fr". Judge the language of the words, not the
subject. This is detection only — score and accept the story on its merits as usual;
the site decides separately what to do with non-English items.
Return only JSON with this exact shape:
{{
"constructive_score": 0,
@@ -118,7 +125,8 @@ Return only JSON with this exact shape:
"flavor": "one_of_the_allowed_flavors",
"tags": ["one_to_four_allowed_tags"],
"reason_code": "short_snake_case",
"reason_text": "one concise sentence"
"reason_text": "one concise sentence",
"language": "en"
}}
""".format(topics=topics_prompt_block(), flavors=flavors_prompt_block(), tags=tags_prompt_block())
@@ -222,6 +230,60 @@ class LocalModelClient:
"""
return self._raw_content(self._build_payload(messages, None))
def rank_for_social(self, candidates: list[dict]) -> list[dict]:
"""ONE bounded COMPARATIVE pass over a small candidate set (not N calls).
Returns a best-first list of {id, social_score 0-10, why, talking_points,
angle, entities}. Bounded by self.timeout; callers fall back to deterministic
ranking on ANY failure, so the Publishing Desk always works."""
if not candidates:
return []
lines = []
for c in candidates:
summ = " ".join((c.get("summary") or "").split())[:280]
lines.append(f'- id={int(c["id"])} | topic={c.get("topic")} | {c["title"]} :: {summ}')
user = (
"These are constructive-news articles. Compare them as candidates for a SHORT X "
"(Twitter) post from a calm good-news account, and rank best-first by SOCIAL "
"share-worthiness — would someone stop scrolling? That differs from how 'good' the "
"article is.\n\n" + "\n".join(lines) + "\n\n"
'Reply with JSON only, exactly this shape:\n'
'{"ranked": [{"id": <one of the ids above>, "social_score": <0-10>, '
'"why": "one sentence: why it stops the scroll", '
'"talking_points": ["3 short factual points a writer could use"], '
'"angle": "a possible conversational angle", '
'"entities": ["real org/person names mentioned, for tagging"]}]}\n'
"Only use ids from the list above. Order best-first."
)
messages = [
{"role": "system", "content": "You rank constructive news for social sharing. Reply with JSON only."},
{"role": "user", "content": user},
]
data = parse_classifier_json(self.chat_text(messages))
ranked = data.get("ranked") if isinstance(data, dict) else None
if not isinstance(ranked, list):
raise RuntimeError("rank_for_social: missing 'ranked' list")
out = []
for r in ranked:
if not isinstance(r, dict):
continue
try:
rid = int(r.get("id"))
except (TypeError, ValueError):
continue
# Require ACTUAL lists — a model that returns a bare string must not be
# iterated into characters ("fact" → ["f","a","c","t"]).
tp = r.get("talking_points")
ents = r.get("entities")
out.append({
"id": rid,
"social_score": _bounded_int(r.get("social_score")),
"why": str(r.get("why") or "")[:300],
"talking_points": [str(p)[:200] for p in tp][:4] if isinstance(tp, list) else [],
"angle": str(r.get("angle") or "")[:300],
"entities": [str(e)[:80] for e in ents][:8] if isinstance(ents, list) else [],
})
return out
def _raw_content(self, payload: dict) -> str:
body = json.dumps(payload).encode("utf-8")
headers = {"Content-Type": "application/json"}
@@ -304,7 +366,29 @@ def parse_classifier_json(content: str) -> dict:
return json.loads(content[start : end + 1])
def _is_english(language: str) -> bool:
"""Conservative: HOLD only when the model clearly reports a non-English language.
Missing/blank/undetermined → treated as English, so a model hiccup never silently
drops genuine English content (the corpus is ~all English today)."""
lang = (language or "").strip().lower()
if not lang or lang in ("und", "unknown", "mul", "zxx"):
return True
return lang == "en" or lang.startswith("en-") or lang.startswith("en_")
def normalize_scores(data: dict, model_name: str) -> dict:
language = str(data.get("language") or "").strip().lower()[:16]
accepted = 1 if bool(data.get("accepted")) else 0
reason_code = str(data.get("reason_code") or "model_no_reason")[:120]
reason_text = str(data.get("reason_text") or "")[:1000]
# Language gate (code disposes): the public feed is English-only for now. A
# non-English article is HELD — never shown — but PRESERVED with a distinct
# reason so it isn't counted as a calm-filter rejection or a source failure, and
# can be revisited when translation support lands (Phase 4 / GDELT).
if not _is_english(language):
accepted = 0
reason_code = "non_english"
reason_text = f"Held — non-English ({language}); awaiting translation support."
return {
"constructive_score": _bounded_int(data.get("constructive_score")),
"cortisol_score": _bounded_int(data.get("cortisol_score")),
@@ -313,12 +397,13 @@ def normalize_scores(data: dict, model_name: str) -> dict:
"human_benefit_score": _bounded_int(data.get("human_benefit_score")),
"novelty_score": _bounded_int(data.get("novelty_score")),
"pr_risk_score": _bounded_int(data.get("pr_risk_score")),
"accepted": 1 if bool(data.get("accepted")) else 0,
"accepted": accepted,
"topic": coerce_topic(data.get("topic")),
"flavor": coerce_flavor(data.get("flavor")),
"tags": coerce_tags(data.get("tags")),
"reason_code": str(data.get("reason_code") or "model_no_reason")[:120],
"reason_text": str(data.get("reason_text") or "")[:1000],
"reason_code": reason_code,
"reason_text": reason_text,
"language": language,
"model_name": model_name,
}
@@ -329,9 +414,9 @@ def upsert_article_score(conn: sqlite3.Connection, article_id: int, scores: dict
INSERT INTO article_scores (
article_id, constructive_score, cortisol_score, ragebait_score,
agency_score, human_benefit_score, novelty_score, pr_risk_score,
accepted, topic, flavor, reason_code, reason_text, model_name, scored_at
accepted, topic, flavor, reason_code, reason_text, language, model_name, scored_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
ON CONFLICT(article_id) DO UPDATE SET
constructive_score = excluded.constructive_score,
cortisol_score = excluded.cortisol_score,
@@ -345,6 +430,7 @@ def upsert_article_score(conn: sqlite3.Connection, article_id: int, scores: dict
flavor = excluded.flavor,
reason_code = excluded.reason_code,
reason_text = excluded.reason_text,
language = excluded.language,
model_name = excluded.model_name,
scored_at = CURRENT_TIMESTAMP
""",
@@ -362,6 +448,7 @@ def upsert_article_score(conn: sqlite3.Connection, article_id: int, scores: dict
scores["flavor"],
scores["reason_code"],
scores["reason_text"],
scores.get("language"),
scores["model_name"],
),
)
+52
View File
@@ -8,6 +8,7 @@ and for replacements. It will never be perfect; it's an honest hint, not a gate.
from __future__ import annotations
import re
from urllib.parse import urlsplit
# Host suffixes considered paywalled. Subdomains match (news.nature.com → nature.com).
@@ -53,3 +54,54 @@ def is_paywalled_for_source(url: str | None, override: str | None = None) -> boo
if override == "paywalled":
return True
return is_paywalled(url)
# --- Content-level accessibility (deep-preview only; the live pipeline still never
# fetches article pages) -----------------------------------------------------
# Wall phrases that appear in the rendered, walled state. Kept specific so a footer
# "subscribe to our newsletter" doesn't read as a paywall.
_WALL_MARKERS = (
"subscribe to continue", "subscribe to keep reading", "subscribe to read",
"to continue reading", "already a subscriber", "subscribers only",
"this article is for subscribers", "this content is for subscribers",
"create a free account to continue", "create an account to keep reading",
"unlock this article", "register to continue reading",
)
_ACCESS_FALSE = re.compile(r'"isaccessibleforfree"\s*:\s*("?)(false)\1', re.I)
_ACCESS_TRUE = re.compile(r'"isaccessibleforfree"\s*:\s*("?)(true)\1', re.I)
_CONTENT_LOCKED = re.compile(r'content[_-]tier"[^>]*content="locked', re.I)
_STRIP_BLOCKS = re.compile(r"(?is)<(script|style|noscript|template)[^>]*>.*?</\1>")
_STRIP_TAGS = re.compile(r"(?s)<[^>]+>")
_WS = re.compile(r"\s+")
def check_article_access(url: str, fetcher, timeout: int = 8) -> str:
"""Best-effort readability of ONE article URL, for the deep-preview accessibility
sample. Returns 'readable' | 'paywalled' | 'blocked' | 'unknown'.
Conservative + evidence-led: an explicit signal (schema.org isAccessibleForFree,
content-tier=locked, or a clear wall phrase) marks 'paywalled'; otherwise a page
with substantial body text reads as 'readable'; thin/ambiguous pages stay
'unknown'. A fetch error is 'blocked'. Heuristic by nature — it informs the
verdict, it never auto-rejects (domain rules already proved they can lie)."""
try:
raw = fetcher(url, timeout=timeout)
except Exception: # noqa: BLE001 — any fetch failure = can't read it right now
return "blocked"
try:
html = raw.decode("utf-8", "ignore")
except Exception: # noqa: BLE001
return "unknown"
if _ACCESS_FALSE.search(html) or _CONTENT_LOCKED.search(html):
return "paywalled"
low = html.lower()
if any(m in low for m in _WALL_MARKERS):
return "paywalled"
# No wall signal — judge by how much real article text is present.
text = _WS.sub(" ", _STRIP_TAGS.sub(" ", _STRIP_BLOCKS.sub(" ", html))).strip()
if _ACCESS_TRUE.search(html) and len(text) >= 600:
return "readable"
if len(text) >= 1500:
return "readable"
return "unknown"
+400
View File
@@ -0,0 +1,400 @@
"""Publishing Desk — the platform-neutral outbound-share queue (X first).
Pattern (Claude + Codex): code reduces the corpus to a small set of strong,
*eligible* candidates; ONE bounded comparative LLM call ranks them together and
returns talking points / angle / entities; code validates, applies diversity, and
tops the queue up to a target. If the model is down or returns junk, a deterministic
ranking is the fallback — the Desk always works.
The human writes every blurb; the LLM never writes the post and never invents a
@handle (handles come only from the verified `entity_handles` table or a source's
own `x_handle`).
"""
from __future__ import annotations
import json
import re
import sqlite3
from datetime import datetime, timezone
from .paywall import is_paywalled_for_source
PLATFORM_X = "x"
QUEUE_TARGET = 8 # how many active items the Desk tries to keep ready
_LLM_POOL = 15 # most candidates handed to the one comparative LLM call
_RECENT = "-3 days" # "timely" window for share candidates
# Active = occupying a slot in the working queue (so we don't re-add or duplicate).
_ACTIVE = ("queued", "drafting", "opened")
# Legal suffixes are dropped ("Apple Inc" ≡ "Apple") but ONLY from the END, and "the"
# is NEVER dropped. Removing them anywhere collapsed "The Who"→"who" (collides with
# WHO) and "Inc. Magazine"→"magazine". Identity words (university, institute, lab…) are
# preserved; short forms/abbreviations need explicit alias rows.
_LEGAL_SUFFIXES = {"inc", "llc", "ltd", "corp", "corporation", "plc", "gmbh", "co"}
def normalize_entity(name: str) -> str:
toks = re.sub(r"[^a-z0-9 ]", " ", (name or "").lower()).split()
while toks and toks[-1] in _LEGAL_SUFFIXES: # trailing only
toks.pop()
return " ".join(toks)
_HANDLE_RE = re.compile(r"^[A-Za-z0-9_]{1,15}$") # X: 1-15 chars, letters/digits/underscore
def valid_handle(handle: str | None) -> str | None:
"""Canonical handle WITHOUT the @, or None. Tolerates one optional leading @;
rejects empty, spaces, URLs, and punctuation — so '@', '@not a handle',
'@https://x.com/NASA', '@NASA!' never get stored or suggested."""
h = (handle or "").strip()
if h.startswith("@"):
h = h[1:]
return h if _HANDLE_RE.match(h) else None
# --- verified handle resolution -------------------------------------------------
def resolve_handles(conn: sqlite3.Connection, entities: list[str], source_handle: str | None = None,
platform: str = PLATFORM_X, cap: int = 2) -> list[dict]:
"""Verified handles ONLY: the source's own handle first, then LLM-named entities
matched against the curated table. Deduped, capped. Unmatched entities are NOT
guessed — the UI offers a 'Find on X' search for those instead."""
out: list[dict] = []
seen: set[str] = set()
def add(handle: str | None, profile_url: str | None, via: str) -> None:
canon = valid_handle(handle) # validate even verified/source handles before display
if not canon:
return
key = canon.lower()
if key in seen:
return
seen.add(key)
out.append({"handle": "@" + canon, "profile_url": profile_url or f"https://x.com/{canon}", "via": via})
if source_handle:
add(source_handle, None, "source")
for name in entities or []:
if len(out) >= cap:
break
norm = normalize_entity(name)
if not norm:
continue
row = conn.execute(
"SELECT handle, profile_url FROM entity_handles WHERE normalized_name=? AND platform=?",
(norm, platform),
).fetchone()
if row:
add(row["handle"], row["profile_url"], "entity")
return out[:cap]
def add_entity_handle(conn: sqlite3.Connection, entity_name: str, handle: str,
profile_url: str | None = None, platform: str = PLATFORM_X) -> bool:
"""Save a verified handle (e.g. after you confirm one via 'Find on X'), so it's
automatic next time. Idempotent on (normalized_name, platform)."""
norm = normalize_entity(entity_name)
canon = valid_handle(handle)
if not norm or not canon: # reject junk handles before they're ever stored
return False
conn.execute(
"""INSERT INTO entity_handles (entity_name, normalized_name, platform, handle, profile_url)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(normalized_name, platform) DO UPDATE SET
handle=excluded.handle, profile_url=excluded.profile_url,
entity_name=excluded.entity_name, verified_at=CURRENT_TIMESTAMP""",
(entity_name.strip(), norm, platform, canon, profile_url or f"https://x.com/{canon}"),
)
conn.commit()
return True
# --- candidate eligibility + ranking --------------------------------------------
def eligible_candidates(conn: sqlite3.Connection, platform: str = PLATFORM_X, limit: int = _LLM_POOL) -> list[dict]:
"""Hard filters (code disposes): accepted · visible · non-duplicate · timely ·
complete share page · not already queued/posted/skipped/snoozed. Readable
(paywall) is checked in Python. Returns the deterministically pre-ranked top
`limit` to hand to the comparative LLM call."""
rows = conn.execute(
f"""
SELECT a.id, a.title, a.canonical_url, a.image_url, a.published_at, a.discovered_at,
a.source_id, src.name AS source_name, src.x_handle AS source_handle,
src.default_category AS category, src.paywall_override,
s.constructive_score, s.novelty_score, s.topic,
m.summary, m.what_happened, m.why_matters, m.why_belongs
FROM articles a
JOIN article_scores s ON s.article_id = a.id
JOIN sources src ON src.id = a.source_id
JOIN article_summaries m ON m.article_id = a.id
WHERE s.accepted = 1
AND a.duplicate_of IS NULL
AND src.content_visible = 1
AND m.summary IS NOT NULL AND m.what_happened IS NOT NULL
AND m.why_matters IS NOT NULL AND m.why_belongs IS NOT NULL
AND COALESCE(a.published_at, a.discovered_at) >= datetime('now', ?)
AND a.id NOT IN (
SELECT article_id FROM outbound_shares WHERE platform = ? AND (
status IN ('queued','drafting','opened','posted','skipped')
OR (status = 'snoozed' AND (snooze_until IS NULL OR snooze_until > datetime('now')))
)
)
ORDER BY COALESCE(a.published_at, a.discovered_at) DESC
LIMIT 200
""",
(_RECENT, platform),
).fetchall()
cands = [dict(r) for r in rows
if not is_paywalled_for_source(r["canonical_url"], r["paywall_override"])]
cands.sort(key=_det_score, reverse=True)
return cands[:limit]
def _det_score(c: dict) -> float:
"""Deterministic shareability score — the pre-rank and the LLM-failure fallback.
'Good article' and 'good post' differ, so this favors novelty + a usable image
+ freshness, not just the constructive score."""
score = 1.5 * (c.get("novelty_score") or 0) + 1.0 * (c.get("constructive_score") or 0)
if c.get("image_url"):
score += 2.0
return score
def _diverse_pick(cands: list[dict], need: int, per_source: int = 1, per_topic: int = 2) -> list[dict]:
"""Pick `need` items spreading across sources/topics (cands already ranked)."""
out, src_n, top_n = [], {}, {}
for c in cands:
if len(out) >= need:
break
sid, top = c.get("source_id"), c.get("topic")
if src_n.get(sid, 0) >= per_source or (top and top_n.get(top, 0) >= per_topic):
continue
out.append(c)
src_n[sid] = src_n.get(sid, 0) + 1
if top:
top_n[top] = top_n.get(top, 0) + 1
# If diversity caps left us short (small pool), fill from the remainder in rank order.
if len(out) < need:
chosen = {c["id"] for c in out}
out.extend(c for c in cands if c["id"] not in chosen)
return out[:need]
# --- queue build (background job) -----------------------------------------------
def _share_url(base_url: str, article_id: int, platform: str = PLATFORM_X) -> str:
base = (base_url or "").rstrip("/")
return f"{base}/a/{article_id}?utm_source={platform}&utm_medium=social&utm_campaign=publishing_desk"
def build_queue(conn: sqlite3.Connection, base_url: str, client=None,
platform: str = PLATFORM_X, target: int = QUEUE_TARGET) -> dict:
"""Top the active queue up to `target`. Comparative LLM ranks the eligible pool;
deterministic fallback if the model is unavailable or returns junk. Never
overwrites saved draft/final text on a re-queue."""
active = conn.execute(
"SELECT COUNT(*) FROM outbound_shares WHERE platform=? AND status IN (?,?,?)",
(platform, *_ACTIVE),
).fetchone()[0]
need = target - active
if need <= 0:
return {"added": 0, "active": active, "ranked_by": "none"}
cands = eligible_candidates(conn, platform=platform, limit=_LLM_POOL)
if not cands:
return {"added": 0, "active": active, "ranked_by": "none"}
by_id = {c["id"]: c for c in cands}
ranked_by = "deterministic"
llm = None
if client is not None:
try:
llm = client.rank_for_social(
[{"id": c["id"], "title": c["title"], "summary": c.get("summary") or "",
"topic": c.get("topic")} for c in cands]
)
except Exception: # noqa: BLE001 — model down/slow/garbage → deterministic fallback
llm = None
if llm:
# validate ids against the eligible pool AND dedupe (a model that repeats an id
# must not inflate the chosen set); attach LLM fields; rank by social score.
seen_ids, ordered = set(), []
for r in llm:
rid = r.get("id")
if rid in by_id and rid not in seen_ids:
seen_ids.add(rid)
by_id[rid]["_llm"] = r
ordered.append(by_id[rid])
if ordered:
ranked_by = "llm"
ordered.sort(key=lambda c: c["_llm"].get("social_score", 0), reverse=True)
rest = sorted((c for c in cands if "_llm" not in c), key=_det_score, reverse=True)
cands = ordered + rest
chosen = _diverse_pick(cands, need)
before = conn.total_changes
for c in chosen:
m = c.get("_llm")
if m:
social, angle = m.get("social_score"), m.get("angle")
rationale = m.get("why") or m.get("rationale")
points = m.get("talking_points") if isinstance(m.get("talking_points"), list) else []
entities = m.get("entities") if isinstance(m.get("entities"), list) else []
else:
# Deterministic fallback (model down): seed the writing aids from the
# already-generated summary/explanation so the card is still useful.
# interest score + angle stay None on purpose — they're LLM-only judgments
# the UI hides when absent; we don't manufacture a fake angle/score.
social, angle, entities = None, None, []
rationale = c.get("summary")
points = [p for p in (c.get("what_happened"), c.get("why_matters"), c.get("why_belongs")) if p]
handles = resolve_handles(conn, entities, c.get("source_handle"), platform=platform)
# ON CONFLICT re-queues ONLY an (expired) snoozed row — eligibility already
# excludes active/posted/skipped, and the WHERE guard makes that defense-in-depth
# so a re-build can never clobber an active draft or a terminal status. draft_text
# / final_text are never in the SET, so saved work survives a re-queue.
conn.execute(
"""INSERT INTO outbound_shares
(article_id, platform, status, social_score, rationale, talking_points,
angle, entities, suggested_handles, share_url)
VALUES (?, ?, 'queued', ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(article_id, platform) DO UPDATE SET
status='queued', social_score=excluded.social_score,
rationale=excluded.rationale, talking_points=excluded.talking_points,
angle=excluded.angle, entities=excluded.entities,
suggested_handles=excluded.suggested_handles, share_url=excluded.share_url,
snooze_until=NULL, updated_at=CURRENT_TIMESTAMP
WHERE outbound_shares.status = 'snoozed'
AND outbound_shares.snooze_until IS NOT NULL
AND outbound_shares.snooze_until <= datetime('now')""",
(c["id"], platform, social, rationale,
json.dumps(points), angle,
json.dumps(entities), json.dumps(handles), _share_url(base_url, c["id"], platform)),
)
conn.commit()
# Counts come from ACTUAL persisted rows, not loop iterations (a skipped conflict
# changes nothing, so it can't falsely report a fuller queue).
added = conn.total_changes - before
active_now = conn.execute(
"SELECT COUNT(*) FROM outbound_shares WHERE platform=? AND status IN (?,?,?)",
(platform, *_ACTIVE),
).fetchone()[0]
return {"added": added, "active": active_now, "ranked_by": ranked_by}
# --- queue read + status transitions --------------------------------------------
def _row_to_item(r: sqlite3.Row) -> dict:
d = dict(r)
for k in ("talking_points", "entities", "suggested_handles"):
try:
d[k] = json.loads(d[k]) if d.get(k) else []
except (ValueError, TypeError):
d[k] = []
return d
def list_queue(conn: sqlite3.Connection, platform: str = PLATFORM_X, include_archived: bool = False) -> list[dict]:
"""The working queue (queued/drafting/opened), newest-interest first. With
include_archived, also returns skipped/snoozed (the recoverable tray). Posted is
NEVER returned here — it's done, and including it would grow the payload forever
(a dedicated paginated history can come later if wanted)."""
statuses = list(_ACTIVE) + (["skipped", "snoozed"] if include_archived else [])
qs = ",".join("?" for _ in statuses)
rows = conn.execute(
f"""
SELECT o.id, o.article_id, o.platform, o.status, o.social_score, o.rationale,
o.talking_points, o.angle, o.entities, o.suggested_handles, o.draft_text,
o.final_text, o.share_url, o.post_url, o.snooze_until, o.opened_at, o.posted_at,
a.title, a.canonical_url, a.image_url, src.name AS source_name
FROM outbound_shares o
JOIN articles a ON a.id = o.article_id
JOIN sources src ON src.id = a.source_id
WHERE o.platform = ? AND o.status IN ({qs})
ORDER BY CASE o.status WHEN 'opened' THEN 0 WHEN 'drafting' THEN 1 ELSE 2 END,
o.social_score DESC, o.created_at DESC
""",
(platform, *statuses),
).fetchall()
return [_row_to_item(r) for r in rows]
_ACTIVE_SET = {"queued", "drafting", "opened"}
_VALID_STATUS = {"queued", "drafting", "opened", "posted", "skipped", "snoozed"}
def _is_future(ts: str | None) -> bool:
if not ts:
return False
try:
dt = datetime.fromisoformat(str(ts).strip().replace("Z", "").replace("T", " "))
except (ValueError, TypeError):
return False
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt > datetime.now(timezone.utc)
def set_status(conn: sqlite3.Connection, share_id: int, status: str, *,
draft_text: str | None = None, final_text: str | None = None,
post_url: str | None = None, snooze_until: str | None = None) -> bool:
"""Transition an ACTIVE share. Enforces the lifecycle: only queued/drafting/opened
items transition here — `posted` is permanently terminal and skipped/snoozed recover
via restore() (so dedup can't be undone and an item can't be reposted). `snoozed`
requires a valid FUTURE timestamp (a null/past date would exclude it forever);
leaving snooze otherwise clears snooze_until. opened/posted stamp their times."""
if status not in _VALID_STATUS:
return False
if status == "snoozed" and not _is_future(snooze_until):
return False
row = conn.execute("SELECT status FROM outbound_shares WHERE id = ?", (share_id,)).fetchone()
if not row or row["status"] not in _ACTIVE_SET: # terminal/archived → use restore()
return False
# snooze_until is set only when snoozing; cleared on every other transition.
sets = ["status = ?", "updated_at = CURRENT_TIMESTAMP", "snooze_until = ?"]
params: list = [status, snooze_until if status == "snoozed" else None]
if status == "opened":
sets.append("opened_at = CURRENT_TIMESTAMP")
if status == "posted":
sets.append("posted_at = CURRENT_TIMESTAMP")
if draft_text is not None:
sets.append("draft_text = ?")
params.append(draft_text)
if final_text is not None:
sets.append("final_text = ?")
params.append(final_text)
if post_url is not None:
sets.append("post_url = ?")
params.append(post_url)
params.append(share_id)
cur = conn.execute(
f"UPDATE outbound_shares SET {', '.join(sets)} WHERE id = ? "
"AND status IN ('queued','drafting','opened')", # atomic: don't transition a row that just changed
params,
)
conn.commit()
return cur.rowcount > 0
def save_draft(conn: sqlite3.Connection, share_id: int, draft_text: str) -> bool:
# Only ACTIVE rows accept a draft — a late debounced autosave that lands after
# Posted/Skip/Snooze must be a no-op (never write to a terminal/archived row).
cur = conn.execute(
"UPDATE outbound_shares SET draft_text = ?, status = CASE status WHEN 'queued' THEN 'drafting' ELSE status END, "
"updated_at = CURRENT_TIMESTAMP WHERE id = ? AND status IN ('queued','drafting','opened')",
(draft_text, share_id),
)
conn.commit()
return cur.rowcount > 0
def restore(conn: sqlite3.Connection, share_id: int) -> bool:
"""Bring a skipped/snoozed item back to the working queue (mistaken-click safety)."""
cur = conn.execute(
"UPDATE outbound_shares SET status='queued', snooze_until=NULL, updated_at=CURRENT_TIMESTAMP "
"WHERE id = ? AND status IN ('skipped','snoozed')",
(share_id,),
)
conn.commit()
return cur.rowcount > 0
+62 -9
View File
@@ -11,6 +11,7 @@ import sqlite3
from datetime import UTC, datetime, timedelta
from .feeds import MAX_BACKOFF_MINUTES
from .localtime import local_now
from .paywall import is_paywalled, is_paywalled_for_source
# UA substrings that mark automated clients. Crawlers run JS on a throttled
@@ -78,6 +79,7 @@ def feed(
follow_sources: list[int] | None = None,
follow_tags: list[str] | None = None,
since: str | None = None,
match: str | None = None,
) -> list[dict]:
"""Return articles with categorical filters applied in SQL.
@@ -92,6 +94,14 @@ def feed(
"""
clauses = ["a.duplicate_of IS NULL", "src.content_visible = 1"]
params: list = []
# Full-text search: join the FTS index and MATCH first, so its bound param
# leads and relevance can drive the ordering. All the boundary clauses below
# still apply, so search mirrors exactly what the visitor feed would show.
fts_join = ""
if match:
fts_join = "JOIN article_search ON article_search.article_id = a.id"
clauses.append("article_search MATCH ?")
params.append(match)
if accepted_only:
clauses.append("s.accepted = 1")
if topic:
@@ -155,17 +165,19 @@ def feed(
where = "WHERE " + " AND ".join(clauses)
params.extend([limit, offset])
order_by = (
"COALESCE(a.published_at, a.discovered_at) DESC, rank_score DESC"
if sort == "latest"
else "rank_score DESC, COALESCE(a.published_at, a.discovered_at) DESC"
)
if match:
order_by = "bm25(article_search), COALESCE(a.published_at, a.discovered_at) DESC" # relevance, then recency
elif sort == "latest":
order_by = "COALESCE(a.published_at, a.discovered_at) DESC, rank_score DESC"
else:
order_by = "rank_score DESC, COALESCE(a.published_at, a.discovered_at) DESC"
rows = conn.execute(
f"""
SELECT {_ARTICLE_COLUMNS}
FROM articles a
JOIN sources src ON src.id = a.source_id
JOIN article_scores s ON s.article_id = a.id
{fts_join}
{where}
ORDER BY {order_by}
LIMIT ? OFFSET ?
@@ -175,6 +187,27 @@ def feed(
return [dict(row) for row in rows]
def reindex_search(conn: sqlite3.Connection) -> int:
"""Rebuild the article_search FTS index from the accepted, non-duplicate corpus
(title/description/source name/tags). A cheap full rebuild (a few thousand
rows); run on each ingest cycle and lazily on first search. Live visibility /
boundary filtering is applied at query time, so it doesn't need reindexing."""
conn.execute("DELETE FROM article_search")
conn.execute(
"""
INSERT INTO article_search (article_id, title, body, source_name, tags)
SELECT a.id, a.title, COALESCE(a.description, ''), src.name,
COALESCE((SELECT group_concat(t.tag, ' ') FROM article_tags t WHERE t.article_id = a.id), '')
FROM articles a
JOIN sources src ON src.id = a.source_id
JOIN article_scores s ON s.article_id = a.id
WHERE s.accepted = 1 AND a.duplicate_of IS NULL
"""
)
conn.commit()
return conn.execute("SELECT COUNT(*) FROM article_search").fetchone()[0]
def brief(conn: sqlite3.Connection, brief_date: str | None = None, limit: int = 10) -> dict:
"""Return a stored daily brief (latest if no date) with its ranked items."""
target_date = brief_date or _latest_brief_date(conn)
@@ -344,6 +377,8 @@ def source_health(conn: sqlite3.Connection) -> list[dict]:
(SELECT COUNT(*) FROM articles a WHERE a.source_id = s.id) AS total_articles,
(SELECT COUNT(*) FROM articles a JOIN article_scores sc ON sc.article_id = a.id
WHERE a.source_id = s.id AND sc.accepted = 1) AS accepted_total,
(SELECT COUNT(*) FROM articles a JOIN article_scores sc ON sc.article_id = a.id
WHERE a.source_id = s.id AND sc.reason_code = 'non_english') AS non_english,
(SELECT COUNT(*) FROM articles a WHERE a.source_id = s.id AND a.duplicate_of IS NOT NULL) AS duplicates,
(SELECT COUNT(*) FROM articles a JOIN article_scores sc ON sc.article_id = a.id
WHERE a.source_id = s.id AND sc.accepted = 1 AND a.duplicate_of IS NULL) AS served,
@@ -365,7 +400,14 @@ def source_health(conn: sqlite3.Connection) -> list[dict]:
d = dict(r)
total = d["total_articles"] or 0
accepted = d["accepted_total"] or 0
d["acceptance_rate"] = round(100 * accepted / total) if total else None
non_english = d.get("non_english") or 0
# Acceptance is judged over articles actually scored in English — non-English
# items are HELD (awaiting translation), not calm-filter rejections, so they
# don't drag a multilingual source's rate down.
judged = total - non_english
d["acceptance_rate"] = round(100 * accepted / judged) if judged else None
d["non_english"] = non_english
d["non_english_rate"] = round(100 * non_english / total) if total else None
d["duplicate_rate"] = round(100 * d["duplicates"] / total) if total else None
# Curation quality: of what this source got ACCEPTED, how much was a
# duplicate of content already served (accepted_total served = accepted dupes).
@@ -459,7 +501,9 @@ def _attention(content: dict, sources: list[dict], feedback_unread: int, now: da
_SRC_ART_FILTERS = {
"accepted": "AND s.accepted = 1",
"rejected": "AND s.accepted = 0",
# 'rejected' = calm-filter rejections only; non-English is HELD, its own bucket.
"rejected": "AND s.accepted = 0 AND COALESCE(s.reason_code,'') != 'non_english'",
"held": "AND s.reason_code = 'non_english'",
"no_image": "AND (a.image_url IS NULL OR a.image_url = '')",
"duplicates": "AND a.duplicate_of IS NOT NULL",
}
@@ -493,6 +537,7 @@ def source_articles(conn: sqlite3.Connection, source_id: int, filter: str = "all
"published_at": r["published_at"] or r["discovered_at"],
"accepted": r["accepted"],
"reason": r["reason_text"] or r["reason_code"], # the "why" behind accept/reject
"held": r["reason_code"] == "non_english", # held for language, not rejected
"topic": r["topic"],
"flavor": r["flavor"],
"paywalled": is_paywalled_for_source(r["canonical_url"], override), # effective (domain rule + override)
@@ -510,7 +555,8 @@ def source_articles_summary(conn: sqlite3.Connection, source_id: int) -> dict:
"""
SELECT COUNT(*) total,
COALESCE(SUM(s.accepted = 1), 0) accepted,
COALESCE(SUM(s.accepted = 0), 0) rejected,
COALESCE(SUM(s.accepted = 0 AND COALESCE(s.reason_code,'') != 'non_english'), 0) rejected,
COALESCE(SUM(s.reason_code = 'non_english'), 0) non_english,
COALESCE(SUM(a.image_url IS NULL OR a.image_url = ''), 0) no_image,
COALESCE(SUM(a.duplicate_of IS NOT NULL), 0) duplicates
FROM articles a LEFT JOIN article_scores s ON s.article_id = a.id
@@ -523,6 +569,7 @@ def source_articles_summary(conn: sqlite3.Connection, source_id: int) -> dict:
url = (srow["homepage_url"] or srow["feed_url"]) if srow else None
return {
"total": agg["total"], "accepted": agg["accepted"], "rejected": agg["rejected"],
"non_english": agg["non_english"], # held for language (not a calm-filter rejection)
"no_image": agg["no_image"], "duplicates": agg["duplicates"],
"paywalled": is_paywalled_for_source(url, override), # effective
"paywall_domain": is_paywalled(url), # what the domain rule alone says
@@ -533,6 +580,11 @@ def source_articles_summary(conn: sqlite3.Connection, source_id: int) -> dict:
def admin_stats(conn: sqlite3.Connection, days: int = 30) -> dict:
"""Aggregate, non-personal usage stats for the admin dashboard."""
since = f"-{days} days"
# "Today" for timestamp-based counters is the SITE-LOCAL day (GOODNEWS_TZ), not
# UTC: otherwise an evening error (e.g. 22:53 local) lands on the next UTC day and
# reads as a fresh "today" the following morning — the exact false-alarm we hit.
local_day_start = (local_now().replace(hour=0, minute=0, second=0, microsecond=0)
.astimezone(UTC).strftime("%Y-%m-%d %H:%M:%S"))
def scalar(sql, params=()):
return conn.execute(sql, params).fetchone()[0] or 0
@@ -658,7 +710,8 @@ def admin_stats(conn: sqlite3.Connection, days: int = 30) -> dict:
# check routinely and would read as real users seeing blank screens.
"client_errors": {
"today": scalar(
f"SELECT COUNT(*) FROM client_errors WHERE date(created_at)=date('now') AND {_NOT_BOT_SQL}"
f"SELECT COUNT(*) FROM client_errors WHERE created_at >= ? AND {_NOT_BOT_SQL}",
(local_day_start,),
),
"window": scalar(
f"SELECT COUNT(*) FROM client_errors WHERE created_at>=date('now',?) AND {_NOT_BOT_SQL}",
+12
View File
@@ -175,6 +175,18 @@ def reject_candidate(conn: sqlite3.Connection, candidate_id: int) -> bool:
return cur.rowcount > 0
def restore_candidate(conn: sqlite3.Connection, candidate_id: int) -> bool:
"""Send a REJECTED candidate back to staging ('suggested') so it re-enters the
queue for another look. Only un-rejects — a promoted candidate is untouched."""
cur = conn.execute(
"UPDATE source_candidates SET status = 'suggested', updated_at = CURRENT_TIMESTAMP "
"WHERE id = ? AND status = 'rejected'",
(candidate_id,),
)
conn.commit()
return cur.rowcount > 0
def promote_candidate(
conn: sqlite3.Connection,
candidate_id: int,
+3 -1
View File
@@ -419,7 +419,9 @@
box.append(d);
};
stat("Mode:", p.classified ? "model (accurate)" : "heuristic (quick, conservative)");
stat("Acceptance:", `${Math.round(p.acceptance_rate * 100)}% (${p.accepted}/${p.sampled})`);
stat("Acceptance:", p.acceptance_rate == null
? `— (all held · ${p.accepted}/${p.sampled})`
: `${Math.round(p.acceptance_rate * 100)}% (${p.accepted}/${p.sampled})`);
stat("Freshness:", `${p.recent_7d}/${p.sampled} in last 7 days · newest ${(p.newest_published||"unknown").slice(0,10)}`);
stat("Calm averages:", `cortisol ${p.avg_cortisol} · ragebait ${p.avg_ragebait} · PR ${p.avg_pr_risk}`);
const mix = (m) => Object.entries(m).map(([k, v]) => `${k} ${v}`).join(" · ") || "—";