89c0fbe1f6
The deploy pipeline runs from the working tree, so a wave of shipped features
had never been committed. This snapshots git to what's actually running.
SEO impression recovery (live + verified):
- Duplicate /a/{id} now 301-redirect to their canonical twin instead of 404
(a hard 404 silently dropped already-indexed URLs and tanked impressions).
- Dedup representative selection reworked: accepted/serveable -> established
rep (URL stability) -> quality score, so an accepted page never retires to a
rejected rep and an indexed canonical doesn't churn when a newer twin arrives.
- HEAD /a/{id} returns the same status as GET (api_route GET+HEAD) instead of
falling through to the static mount and 404ing.
- `dedup --force-recluster`: cycle-locked, model-free re-cluster to re-apply the
policy to the existing corpus (shared cycle_lock context manager).
- CLI honors GOODNEWS_DB for its default --db (was silently ignored).
Publishing Desk (admin tool to post highlights to X via Web Intents):
- publishing.py queue/rank/handle-resolution; admin UI; full searchable emoji
picker (bundled data, no CDN) for the blurb editor.
Play games + site:
- Bloom (word-wheel), Memory Match, daily ritual set, Zen Den (dev-gated).
- English-only language gate; source prospecting; paywall + dedup hardening.
Tests: full suite green (349). Ignores tightened (node_modules, data/*.db).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
401 lines
18 KiB
Python
401 lines
18 KiB
Python
"""Publishing Desk — the platform-neutral outbound-share queue (X first).
|
|
|
|
Pattern (Claude + Codex): code reduces the corpus to a small set of strong,
|
|
*eligible* candidates; ONE bounded comparative LLM call ranks them together and
|
|
returns talking points / angle / entities; code validates, applies diversity, and
|
|
tops the queue up to a target. If the model is down or returns junk, a deterministic
|
|
ranking is the fallback — the Desk always works.
|
|
|
|
The human writes every blurb; the LLM never writes the post and never invents a
|
|
@handle (handles come only from the verified `entity_handles` table or a source's
|
|
own `x_handle`).
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import re
|
|
import sqlite3
|
|
from datetime import datetime, timezone
|
|
|
|
from .paywall import is_paywalled_for_source
|
|
|
|
PLATFORM_X = "x"
|
|
QUEUE_TARGET = 8 # how many active items the Desk tries to keep ready
|
|
_LLM_POOL = 15 # most candidates handed to the one comparative LLM call
|
|
_RECENT = "-3 days" # "timely" window for share candidates
|
|
# Active = occupying a slot in the working queue (so we don't re-add or duplicate).
|
|
_ACTIVE = ("queued", "drafting", "opened")
|
|
|
|
# Legal suffixes are dropped ("Apple Inc" ≡ "Apple") but ONLY from the END, and "the"
|
|
# is NEVER dropped. Removing them anywhere collapsed "The Who"→"who" (collides with
|
|
# WHO) and "Inc. Magazine"→"magazine". Identity words (university, institute, lab…) are
|
|
# preserved; short forms/abbreviations need explicit alias rows.
|
|
_LEGAL_SUFFIXES = {"inc", "llc", "ltd", "corp", "corporation", "plc", "gmbh", "co"}
|
|
|
|
|
|
def normalize_entity(name: str) -> str:
|
|
toks = re.sub(r"[^a-z0-9 ]", " ", (name or "").lower()).split()
|
|
while toks and toks[-1] in _LEGAL_SUFFIXES: # trailing only
|
|
toks.pop()
|
|
return " ".join(toks)
|
|
|
|
|
|
_HANDLE_RE = re.compile(r"^[A-Za-z0-9_]{1,15}$") # X: 1-15 chars, letters/digits/underscore
|
|
|
|
|
|
def valid_handle(handle: str | None) -> str | None:
|
|
"""Canonical handle WITHOUT the @, or None. Tolerates one optional leading @;
|
|
rejects empty, spaces, URLs, and punctuation — so '@', '@not a handle',
|
|
'@https://x.com/NASA', '@NASA!' never get stored or suggested."""
|
|
h = (handle or "").strip()
|
|
if h.startswith("@"):
|
|
h = h[1:]
|
|
return h if _HANDLE_RE.match(h) else None
|
|
|
|
|
|
# --- verified handle resolution -------------------------------------------------
|
|
|
|
def resolve_handles(conn: sqlite3.Connection, entities: list[str], source_handle: str | None = None,
|
|
platform: str = PLATFORM_X, cap: int = 2) -> list[dict]:
|
|
"""Verified handles ONLY: the source's own handle first, then LLM-named entities
|
|
matched against the curated table. Deduped, capped. Unmatched entities are NOT
|
|
guessed — the UI offers a 'Find on X' search for those instead."""
|
|
out: list[dict] = []
|
|
seen: set[str] = set()
|
|
|
|
def add(handle: str | None, profile_url: str | None, via: str) -> None:
|
|
canon = valid_handle(handle) # validate even verified/source handles before display
|
|
if not canon:
|
|
return
|
|
key = canon.lower()
|
|
if key in seen:
|
|
return
|
|
seen.add(key)
|
|
out.append({"handle": "@" + canon, "profile_url": profile_url or f"https://x.com/{canon}", "via": via})
|
|
|
|
if source_handle:
|
|
add(source_handle, None, "source")
|
|
for name in entities or []:
|
|
if len(out) >= cap:
|
|
break
|
|
norm = normalize_entity(name)
|
|
if not norm:
|
|
continue
|
|
row = conn.execute(
|
|
"SELECT handle, profile_url FROM entity_handles WHERE normalized_name=? AND platform=?",
|
|
(norm, platform),
|
|
).fetchone()
|
|
if row:
|
|
add(row["handle"], row["profile_url"], "entity")
|
|
return out[:cap]
|
|
|
|
|
|
def add_entity_handle(conn: sqlite3.Connection, entity_name: str, handle: str,
|
|
profile_url: str | None = None, platform: str = PLATFORM_X) -> bool:
|
|
"""Save a verified handle (e.g. after you confirm one via 'Find on X'), so it's
|
|
automatic next time. Idempotent on (normalized_name, platform)."""
|
|
norm = normalize_entity(entity_name)
|
|
canon = valid_handle(handle)
|
|
if not norm or not canon: # reject junk handles before they're ever stored
|
|
return False
|
|
conn.execute(
|
|
"""INSERT INTO entity_handles (entity_name, normalized_name, platform, handle, profile_url)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
ON CONFLICT(normalized_name, platform) DO UPDATE SET
|
|
handle=excluded.handle, profile_url=excluded.profile_url,
|
|
entity_name=excluded.entity_name, verified_at=CURRENT_TIMESTAMP""",
|
|
(entity_name.strip(), norm, platform, canon, profile_url or f"https://x.com/{canon}"),
|
|
)
|
|
conn.commit()
|
|
return True
|
|
|
|
|
|
# --- candidate eligibility + ranking --------------------------------------------
|
|
|
|
def eligible_candidates(conn: sqlite3.Connection, platform: str = PLATFORM_X, limit: int = _LLM_POOL) -> list[dict]:
|
|
"""Hard filters (code disposes): accepted · visible · non-duplicate · timely ·
|
|
complete share page · not already queued/posted/skipped/snoozed. Readable
|
|
(paywall) is checked in Python. Returns the deterministically pre-ranked top
|
|
`limit` to hand to the comparative LLM call."""
|
|
rows = conn.execute(
|
|
f"""
|
|
SELECT a.id, a.title, a.canonical_url, a.image_url, a.published_at, a.discovered_at,
|
|
a.source_id, src.name AS source_name, src.x_handle AS source_handle,
|
|
src.default_category AS category, src.paywall_override,
|
|
s.constructive_score, s.novelty_score, s.topic,
|
|
m.summary, m.what_happened, m.why_matters, m.why_belongs
|
|
FROM articles a
|
|
JOIN article_scores s ON s.article_id = a.id
|
|
JOIN sources src ON src.id = a.source_id
|
|
JOIN article_summaries m ON m.article_id = a.id
|
|
WHERE s.accepted = 1
|
|
AND a.duplicate_of IS NULL
|
|
AND src.content_visible = 1
|
|
AND m.summary IS NOT NULL AND m.what_happened IS NOT NULL
|
|
AND m.why_matters IS NOT NULL AND m.why_belongs IS NOT NULL
|
|
AND COALESCE(a.published_at, a.discovered_at) >= datetime('now', ?)
|
|
AND a.id NOT IN (
|
|
SELECT article_id FROM outbound_shares WHERE platform = ? AND (
|
|
status IN ('queued','drafting','opened','posted','skipped')
|
|
OR (status = 'snoozed' AND (snooze_until IS NULL OR snooze_until > datetime('now')))
|
|
)
|
|
)
|
|
ORDER BY COALESCE(a.published_at, a.discovered_at) DESC
|
|
LIMIT 200
|
|
""",
|
|
(_RECENT, platform),
|
|
).fetchall()
|
|
cands = [dict(r) for r in rows
|
|
if not is_paywalled_for_source(r["canonical_url"], r["paywall_override"])]
|
|
cands.sort(key=_det_score, reverse=True)
|
|
return cands[:limit]
|
|
|
|
|
|
def _det_score(c: dict) -> float:
|
|
"""Deterministic shareability score — the pre-rank and the LLM-failure fallback.
|
|
'Good article' and 'good post' differ, so this favors novelty + a usable image
|
|
+ freshness, not just the constructive score."""
|
|
score = 1.5 * (c.get("novelty_score") or 0) + 1.0 * (c.get("constructive_score") or 0)
|
|
if c.get("image_url"):
|
|
score += 2.0
|
|
return score
|
|
|
|
|
|
def _diverse_pick(cands: list[dict], need: int, per_source: int = 1, per_topic: int = 2) -> list[dict]:
|
|
"""Pick `need` items spreading across sources/topics (cands already ranked)."""
|
|
out, src_n, top_n = [], {}, {}
|
|
for c in cands:
|
|
if len(out) >= need:
|
|
break
|
|
sid, top = c.get("source_id"), c.get("topic")
|
|
if src_n.get(sid, 0) >= per_source or (top and top_n.get(top, 0) >= per_topic):
|
|
continue
|
|
out.append(c)
|
|
src_n[sid] = src_n.get(sid, 0) + 1
|
|
if top:
|
|
top_n[top] = top_n.get(top, 0) + 1
|
|
# If diversity caps left us short (small pool), fill from the remainder in rank order.
|
|
if len(out) < need:
|
|
chosen = {c["id"] for c in out}
|
|
out.extend(c for c in cands if c["id"] not in chosen)
|
|
return out[:need]
|
|
|
|
|
|
# --- queue build (background job) -----------------------------------------------
|
|
|
|
def _share_url(base_url: str, article_id: int, platform: str = PLATFORM_X) -> str:
|
|
base = (base_url or "").rstrip("/")
|
|
return f"{base}/a/{article_id}?utm_source={platform}&utm_medium=social&utm_campaign=publishing_desk"
|
|
|
|
|
|
def build_queue(conn: sqlite3.Connection, base_url: str, client=None,
|
|
platform: str = PLATFORM_X, target: int = QUEUE_TARGET) -> dict:
|
|
"""Top the active queue up to `target`. Comparative LLM ranks the eligible pool;
|
|
deterministic fallback if the model is unavailable or returns junk. Never
|
|
overwrites saved draft/final text on a re-queue."""
|
|
active = conn.execute(
|
|
"SELECT COUNT(*) FROM outbound_shares WHERE platform=? AND status IN (?,?,?)",
|
|
(platform, *_ACTIVE),
|
|
).fetchone()[0]
|
|
need = target - active
|
|
if need <= 0:
|
|
return {"added": 0, "active": active, "ranked_by": "none"}
|
|
|
|
cands = eligible_candidates(conn, platform=platform, limit=_LLM_POOL)
|
|
if not cands:
|
|
return {"added": 0, "active": active, "ranked_by": "none"}
|
|
|
|
by_id = {c["id"]: c for c in cands}
|
|
ranked_by = "deterministic"
|
|
llm = None
|
|
if client is not None:
|
|
try:
|
|
llm = client.rank_for_social(
|
|
[{"id": c["id"], "title": c["title"], "summary": c.get("summary") or "",
|
|
"topic": c.get("topic")} for c in cands]
|
|
)
|
|
except Exception: # noqa: BLE001 — model down/slow/garbage → deterministic fallback
|
|
llm = None
|
|
if llm:
|
|
# validate ids against the eligible pool AND dedupe (a model that repeats an id
|
|
# must not inflate the chosen set); attach LLM fields; rank by social score.
|
|
seen_ids, ordered = set(), []
|
|
for r in llm:
|
|
rid = r.get("id")
|
|
if rid in by_id and rid not in seen_ids:
|
|
seen_ids.add(rid)
|
|
by_id[rid]["_llm"] = r
|
|
ordered.append(by_id[rid])
|
|
if ordered:
|
|
ranked_by = "llm"
|
|
ordered.sort(key=lambda c: c["_llm"].get("social_score", 0), reverse=True)
|
|
rest = sorted((c for c in cands if "_llm" not in c), key=_det_score, reverse=True)
|
|
cands = ordered + rest
|
|
|
|
chosen = _diverse_pick(cands, need)
|
|
before = conn.total_changes
|
|
for c in chosen:
|
|
m = c.get("_llm")
|
|
if m:
|
|
social, angle = m.get("social_score"), m.get("angle")
|
|
rationale = m.get("why") or m.get("rationale")
|
|
points = m.get("talking_points") if isinstance(m.get("talking_points"), list) else []
|
|
entities = m.get("entities") if isinstance(m.get("entities"), list) else []
|
|
else:
|
|
# Deterministic fallback (model down): seed the writing aids from the
|
|
# already-generated summary/explanation so the card is still useful.
|
|
# interest score + angle stay None on purpose — they're LLM-only judgments
|
|
# the UI hides when absent; we don't manufacture a fake angle/score.
|
|
social, angle, entities = None, None, []
|
|
rationale = c.get("summary")
|
|
points = [p for p in (c.get("what_happened"), c.get("why_matters"), c.get("why_belongs")) if p]
|
|
handles = resolve_handles(conn, entities, c.get("source_handle"), platform=platform)
|
|
# ON CONFLICT re-queues ONLY an (expired) snoozed row — eligibility already
|
|
# excludes active/posted/skipped, and the WHERE guard makes that defense-in-depth
|
|
# so a re-build can never clobber an active draft or a terminal status. draft_text
|
|
# / final_text are never in the SET, so saved work survives a re-queue.
|
|
conn.execute(
|
|
"""INSERT INTO outbound_shares
|
|
(article_id, platform, status, social_score, rationale, talking_points,
|
|
angle, entities, suggested_handles, share_url)
|
|
VALUES (?, ?, 'queued', ?, ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(article_id, platform) DO UPDATE SET
|
|
status='queued', social_score=excluded.social_score,
|
|
rationale=excluded.rationale, talking_points=excluded.talking_points,
|
|
angle=excluded.angle, entities=excluded.entities,
|
|
suggested_handles=excluded.suggested_handles, share_url=excluded.share_url,
|
|
snooze_until=NULL, updated_at=CURRENT_TIMESTAMP
|
|
WHERE outbound_shares.status = 'snoozed'
|
|
AND outbound_shares.snooze_until IS NOT NULL
|
|
AND outbound_shares.snooze_until <= datetime('now')""",
|
|
(c["id"], platform, social, rationale,
|
|
json.dumps(points), angle,
|
|
json.dumps(entities), json.dumps(handles), _share_url(base_url, c["id"], platform)),
|
|
)
|
|
conn.commit()
|
|
# Counts come from ACTUAL persisted rows, not loop iterations (a skipped conflict
|
|
# changes nothing, so it can't falsely report a fuller queue).
|
|
added = conn.total_changes - before
|
|
active_now = conn.execute(
|
|
"SELECT COUNT(*) FROM outbound_shares WHERE platform=? AND status IN (?,?,?)",
|
|
(platform, *_ACTIVE),
|
|
).fetchone()[0]
|
|
return {"added": added, "active": active_now, "ranked_by": ranked_by}
|
|
|
|
|
|
# --- queue read + status transitions --------------------------------------------
|
|
|
|
def _row_to_item(r: sqlite3.Row) -> dict:
|
|
d = dict(r)
|
|
for k in ("talking_points", "entities", "suggested_handles"):
|
|
try:
|
|
d[k] = json.loads(d[k]) if d.get(k) else []
|
|
except (ValueError, TypeError):
|
|
d[k] = []
|
|
return d
|
|
|
|
|
|
def list_queue(conn: sqlite3.Connection, platform: str = PLATFORM_X, include_archived: bool = False) -> list[dict]:
|
|
"""The working queue (queued/drafting/opened), newest-interest first. With
|
|
include_archived, also returns skipped/snoozed (the recoverable tray). Posted is
|
|
NEVER returned here — it's done, and including it would grow the payload forever
|
|
(a dedicated paginated history can come later if wanted)."""
|
|
statuses = list(_ACTIVE) + (["skipped", "snoozed"] if include_archived else [])
|
|
qs = ",".join("?" for _ in statuses)
|
|
rows = conn.execute(
|
|
f"""
|
|
SELECT o.id, o.article_id, o.platform, o.status, o.social_score, o.rationale,
|
|
o.talking_points, o.angle, o.entities, o.suggested_handles, o.draft_text,
|
|
o.final_text, o.share_url, o.post_url, o.snooze_until, o.opened_at, o.posted_at,
|
|
a.title, a.canonical_url, a.image_url, src.name AS source_name
|
|
FROM outbound_shares o
|
|
JOIN articles a ON a.id = o.article_id
|
|
JOIN sources src ON src.id = a.source_id
|
|
WHERE o.platform = ? AND o.status IN ({qs})
|
|
ORDER BY CASE o.status WHEN 'opened' THEN 0 WHEN 'drafting' THEN 1 ELSE 2 END,
|
|
o.social_score DESC, o.created_at DESC
|
|
""",
|
|
(platform, *statuses),
|
|
).fetchall()
|
|
return [_row_to_item(r) for r in rows]
|
|
|
|
|
|
_ACTIVE_SET = {"queued", "drafting", "opened"}
|
|
_VALID_STATUS = {"queued", "drafting", "opened", "posted", "skipped", "snoozed"}
|
|
|
|
|
|
def _is_future(ts: str | None) -> bool:
|
|
if not ts:
|
|
return False
|
|
try:
|
|
dt = datetime.fromisoformat(str(ts).strip().replace("Z", "").replace("T", " "))
|
|
except (ValueError, TypeError):
|
|
return False
|
|
if dt.tzinfo is None:
|
|
dt = dt.replace(tzinfo=timezone.utc)
|
|
return dt > datetime.now(timezone.utc)
|
|
|
|
|
|
def set_status(conn: sqlite3.Connection, share_id: int, status: str, *,
|
|
draft_text: str | None = None, final_text: str | None = None,
|
|
post_url: str | None = None, snooze_until: str | None = None) -> bool:
|
|
"""Transition an ACTIVE share. Enforces the lifecycle: only queued/drafting/opened
|
|
items transition here — `posted` is permanently terminal and skipped/snoozed recover
|
|
via restore() (so dedup can't be undone and an item can't be reposted). `snoozed`
|
|
requires a valid FUTURE timestamp (a null/past date would exclude it forever);
|
|
leaving snooze otherwise clears snooze_until. opened/posted stamp their times."""
|
|
if status not in _VALID_STATUS:
|
|
return False
|
|
if status == "snoozed" and not _is_future(snooze_until):
|
|
return False
|
|
row = conn.execute("SELECT status FROM outbound_shares WHERE id = ?", (share_id,)).fetchone()
|
|
if not row or row["status"] not in _ACTIVE_SET: # terminal/archived → use restore()
|
|
return False
|
|
# snooze_until is set only when snoozing; cleared on every other transition.
|
|
sets = ["status = ?", "updated_at = CURRENT_TIMESTAMP", "snooze_until = ?"]
|
|
params: list = [status, snooze_until if status == "snoozed" else None]
|
|
if status == "opened":
|
|
sets.append("opened_at = CURRENT_TIMESTAMP")
|
|
if status == "posted":
|
|
sets.append("posted_at = CURRENT_TIMESTAMP")
|
|
if draft_text is not None:
|
|
sets.append("draft_text = ?")
|
|
params.append(draft_text)
|
|
if final_text is not None:
|
|
sets.append("final_text = ?")
|
|
params.append(final_text)
|
|
if post_url is not None:
|
|
sets.append("post_url = ?")
|
|
params.append(post_url)
|
|
params.append(share_id)
|
|
cur = conn.execute(
|
|
f"UPDATE outbound_shares SET {', '.join(sets)} WHERE id = ? "
|
|
"AND status IN ('queued','drafting','opened')", # atomic: don't transition a row that just changed
|
|
params,
|
|
)
|
|
conn.commit()
|
|
return cur.rowcount > 0
|
|
|
|
|
|
def save_draft(conn: sqlite3.Connection, share_id: int, draft_text: str) -> bool:
|
|
# Only ACTIVE rows accept a draft — a late debounced autosave that lands after
|
|
# Posted/Skip/Snooze must be a no-op (never write to a terminal/archived row).
|
|
cur = conn.execute(
|
|
"UPDATE outbound_shares SET draft_text = ?, status = CASE status WHEN 'queued' THEN 'drafting' ELSE status END, "
|
|
"updated_at = CURRENT_TIMESTAMP WHERE id = ? AND status IN ('queued','drafting','opened')",
|
|
(draft_text, share_id),
|
|
)
|
|
conn.commit()
|
|
return cur.rowcount > 0
|
|
|
|
|
|
def restore(conn: sqlite3.Connection, share_id: int) -> bool:
|
|
"""Bring a skipped/snoozed item back to the working queue (mistaken-click safety)."""
|
|
cur = conn.execute(
|
|
"UPDATE outbound_shares SET status='queued', snooze_until=NULL, updated_at=CURRENT_TIMESTAMP "
|
|
"WHERE id = ? AND status IN ('skipped','snoozed')",
|
|
(share_id,),
|
|
)
|
|
conn.commit()
|
|
return cur.rowcount > 0
|