"""Publishing Desk — the platform-neutral outbound-share queue (X first). Pattern (Claude + Codex): code reduces the corpus to a small set of strong, *eligible* candidates; ONE bounded comparative LLM call ranks them together and returns talking points / angle / entities; code validates, applies diversity, and tops the queue up to a target. If the model is down or returns junk, a deterministic ranking is the fallback — the Desk always works. The human writes every blurb; the LLM never writes the post and never invents a @handle (handles come only from the verified `entity_handles` table or a source's own `x_handle`). """ from __future__ import annotations import json import re import sqlite3 from datetime import datetime, timezone from .paywall import is_paywalled_for_source PLATFORM_X = "x" QUEUE_TARGET = 8 # how many active items the Desk tries to keep ready _LLM_POOL = 15 # most candidates handed to the one comparative LLM call _RECENT = "-3 days" # "timely" window for share candidates # Active = occupying a slot in the working queue (so we don't re-add or duplicate). _ACTIVE = ("queued", "drafting", "opened") # Legal suffixes are dropped ("Apple Inc" ≡ "Apple") but ONLY from the END, and "the" # is NEVER dropped. Removing them anywhere collapsed "The Who"→"who" (collides with # WHO) and "Inc. Magazine"→"magazine". Identity words (university, institute, lab…) are # preserved; short forms/abbreviations need explicit alias rows. _LEGAL_SUFFIXES = {"inc", "llc", "ltd", "corp", "corporation", "plc", "gmbh", "co"} def normalize_entity(name: str) -> str: toks = re.sub(r"[^a-z0-9 ]", " ", (name or "").lower()).split() while toks and toks[-1] in _LEGAL_SUFFIXES: # trailing only toks.pop() return " ".join(toks) _HANDLE_RE = re.compile(r"^[A-Za-z0-9_]{1,15}$") # X: 1-15 chars, letters/digits/underscore def valid_handle(handle: str | None) -> str | None: """Canonical handle WITHOUT the @, or None. Tolerates one optional leading @; rejects empty, spaces, URLs, and punctuation — so '@', '@not a handle', '@https://x.com/NASA', '@NASA!' never get stored or suggested.""" h = (handle or "").strip() if h.startswith("@"): h = h[1:] return h if _HANDLE_RE.match(h) else None # --- verified handle resolution ------------------------------------------------- def resolve_handles(conn: sqlite3.Connection, entities: list[str], source_handle: str | None = None, platform: str = PLATFORM_X, cap: int = 2) -> list[dict]: """Verified handles ONLY: the source's own handle first, then LLM-named entities matched against the curated table. Deduped, capped. Unmatched entities are NOT guessed — the UI offers a 'Find on X' search for those instead.""" out: list[dict] = [] seen: set[str] = set() def add(handle: str | None, profile_url: str | None, via: str) -> None: canon = valid_handle(handle) # validate even verified/source handles before display if not canon: return key = canon.lower() if key in seen: return seen.add(key) out.append({"handle": "@" + canon, "profile_url": profile_url or f"https://x.com/{canon}", "via": via}) if source_handle: add(source_handle, None, "source") for name in entities or []: if len(out) >= cap: break norm = normalize_entity(name) if not norm: continue row = conn.execute( "SELECT handle, profile_url FROM entity_handles WHERE normalized_name=? AND platform=?", (norm, platform), ).fetchone() if row: add(row["handle"], row["profile_url"], "entity") return out[:cap] def add_entity_handle(conn: sqlite3.Connection, entity_name: str, handle: str, profile_url: str | None = None, platform: str = PLATFORM_X) -> bool: """Save a verified handle (e.g. after you confirm one via 'Find on X'), so it's automatic next time. Idempotent on (normalized_name, platform).""" norm = normalize_entity(entity_name) canon = valid_handle(handle) if not norm or not canon: # reject junk handles before they're ever stored return False conn.execute( """INSERT INTO entity_handles (entity_name, normalized_name, platform, handle, profile_url) VALUES (?, ?, ?, ?, ?) ON CONFLICT(normalized_name, platform) DO UPDATE SET handle=excluded.handle, profile_url=excluded.profile_url, entity_name=excluded.entity_name, verified_at=CURRENT_TIMESTAMP""", (entity_name.strip(), norm, platform, canon, profile_url or f"https://x.com/{canon}"), ) conn.commit() return True # --- candidate eligibility + ranking -------------------------------------------- def eligible_candidates(conn: sqlite3.Connection, platform: str = PLATFORM_X, limit: int = _LLM_POOL) -> list[dict]: """Hard filters (code disposes): accepted · visible · non-duplicate · timely · complete share page · not already queued/posted/skipped/snoozed. Readable (paywall) is checked in Python. Returns the deterministically pre-ranked top `limit` to hand to the comparative LLM call.""" rows = conn.execute( f""" SELECT a.id, a.title, a.canonical_url, a.image_url, a.published_at, a.discovered_at, a.source_id, src.name AS source_name, src.x_handle AS source_handle, src.default_category AS category, src.paywall_override, s.constructive_score, s.novelty_score, s.topic, m.summary, m.what_happened, m.why_matters, m.why_belongs FROM articles a JOIN article_scores s ON s.article_id = a.id JOIN sources src ON src.id = a.source_id JOIN article_summaries m ON m.article_id = a.id WHERE s.accepted = 1 AND a.duplicate_of IS NULL AND src.content_visible = 1 AND m.summary IS NOT NULL AND m.what_happened IS NOT NULL AND m.why_matters IS NOT NULL AND m.why_belongs IS NOT NULL AND COALESCE(a.published_at, a.discovered_at) >= datetime('now', ?) AND a.id NOT IN ( SELECT article_id FROM outbound_shares WHERE platform = ? AND ( status IN ('queued','drafting','opened','posted','skipped') OR (status = 'snoozed' AND (snooze_until IS NULL OR snooze_until > datetime('now'))) ) ) ORDER BY COALESCE(a.published_at, a.discovered_at) DESC LIMIT 200 """, (_RECENT, platform), ).fetchall() cands = [dict(r) for r in rows if not is_paywalled_for_source(r["canonical_url"], r["paywall_override"])] cands.sort(key=_det_score, reverse=True) return cands[:limit] def _det_score(c: dict) -> float: """Deterministic shareability score — the pre-rank and the LLM-failure fallback. 'Good article' and 'good post' differ, so this favors novelty + a usable image + freshness, not just the constructive score.""" score = 1.5 * (c.get("novelty_score") or 0) + 1.0 * (c.get("constructive_score") or 0) if c.get("image_url"): score += 2.0 return score def _diverse_pick(cands: list[dict], need: int, per_source: int = 1, per_topic: int = 2) -> list[dict]: """Pick `need` items spreading across sources/topics (cands already ranked).""" out, src_n, top_n = [], {}, {} for c in cands: if len(out) >= need: break sid, top = c.get("source_id"), c.get("topic") if src_n.get(sid, 0) >= per_source or (top and top_n.get(top, 0) >= per_topic): continue out.append(c) src_n[sid] = src_n.get(sid, 0) + 1 if top: top_n[top] = top_n.get(top, 0) + 1 # If diversity caps left us short (small pool), fill from the remainder in rank order. if len(out) < need: chosen = {c["id"] for c in out} out.extend(c for c in cands if c["id"] not in chosen) return out[:need] # --- queue build (background job) ----------------------------------------------- def _share_url(base_url: str, article_id: int, platform: str = PLATFORM_X) -> str: base = (base_url or "").rstrip("/") return f"{base}/a/{article_id}?utm_source={platform}&utm_medium=social&utm_campaign=publishing_desk" def build_queue(conn: sqlite3.Connection, base_url: str, client=None, platform: str = PLATFORM_X, target: int = QUEUE_TARGET) -> dict: """Top the active queue up to `target`. Comparative LLM ranks the eligible pool; deterministic fallback if the model is unavailable or returns junk. Never overwrites saved draft/final text on a re-queue.""" active = conn.execute( "SELECT COUNT(*) FROM outbound_shares WHERE platform=? AND status IN (?,?,?)", (platform, *_ACTIVE), ).fetchone()[0] need = target - active if need <= 0: return {"added": 0, "active": active, "ranked_by": "none"} cands = eligible_candidates(conn, platform=platform, limit=_LLM_POOL) if not cands: return {"added": 0, "active": active, "ranked_by": "none"} by_id = {c["id"]: c for c in cands} ranked_by = "deterministic" llm = None if client is not None: try: llm = client.rank_for_social( [{"id": c["id"], "title": c["title"], "summary": c.get("summary") or "", "topic": c.get("topic")} for c in cands] ) except Exception: # noqa: BLE001 — model down/slow/garbage → deterministic fallback llm = None if llm: # validate ids against the eligible pool AND dedupe (a model that repeats an id # must not inflate the chosen set); attach LLM fields; rank by social score. seen_ids, ordered = set(), [] for r in llm: rid = r.get("id") if rid in by_id and rid not in seen_ids: seen_ids.add(rid) by_id[rid]["_llm"] = r ordered.append(by_id[rid]) if ordered: ranked_by = "llm" ordered.sort(key=lambda c: c["_llm"].get("social_score", 0), reverse=True) rest = sorted((c for c in cands if "_llm" not in c), key=_det_score, reverse=True) cands = ordered + rest chosen = _diverse_pick(cands, need) before = conn.total_changes for c in chosen: m = c.get("_llm") if m: social, angle = m.get("social_score"), m.get("angle") rationale = m.get("why") or m.get("rationale") points = m.get("talking_points") if isinstance(m.get("talking_points"), list) else [] entities = m.get("entities") if isinstance(m.get("entities"), list) else [] else: # Deterministic fallback (model down): seed the writing aids from the # already-generated summary/explanation so the card is still useful. # interest score + angle stay None on purpose — they're LLM-only judgments # the UI hides when absent; we don't manufacture a fake angle/score. social, angle, entities = None, None, [] rationale = c.get("summary") points = [p for p in (c.get("what_happened"), c.get("why_matters"), c.get("why_belongs")) if p] handles = resolve_handles(conn, entities, c.get("source_handle"), platform=platform) # ON CONFLICT re-queues ONLY an (expired) snoozed row — eligibility already # excludes active/posted/skipped, and the WHERE guard makes that defense-in-depth # so a re-build can never clobber an active draft or a terminal status. draft_text # / final_text are never in the SET, so saved work survives a re-queue. conn.execute( """INSERT INTO outbound_shares (article_id, platform, status, social_score, rationale, talking_points, angle, entities, suggested_handles, share_url) VALUES (?, ?, 'queued', ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(article_id, platform) DO UPDATE SET status='queued', social_score=excluded.social_score, rationale=excluded.rationale, talking_points=excluded.talking_points, angle=excluded.angle, entities=excluded.entities, suggested_handles=excluded.suggested_handles, share_url=excluded.share_url, snooze_until=NULL, updated_at=CURRENT_TIMESTAMP WHERE outbound_shares.status = 'snoozed' AND outbound_shares.snooze_until IS NOT NULL AND outbound_shares.snooze_until <= datetime('now')""", (c["id"], platform, social, rationale, json.dumps(points), angle, json.dumps(entities), json.dumps(handles), _share_url(base_url, c["id"], platform)), ) conn.commit() # Counts come from ACTUAL persisted rows, not loop iterations (a skipped conflict # changes nothing, so it can't falsely report a fuller queue). added = conn.total_changes - before active_now = conn.execute( "SELECT COUNT(*) FROM outbound_shares WHERE platform=? AND status IN (?,?,?)", (platform, *_ACTIVE), ).fetchone()[0] return {"added": added, "active": active_now, "ranked_by": ranked_by} # --- queue read + status transitions -------------------------------------------- def _row_to_item(r: sqlite3.Row) -> dict: d = dict(r) for k in ("talking_points", "entities", "suggested_handles"): try: d[k] = json.loads(d[k]) if d.get(k) else [] except (ValueError, TypeError): d[k] = [] return d def list_queue(conn: sqlite3.Connection, platform: str = PLATFORM_X, include_archived: bool = False) -> list[dict]: """The working queue (queued/drafting/opened), newest-interest first. With include_archived, also returns skipped/snoozed (the recoverable tray). Posted is NEVER returned here — it's done, and including it would grow the payload forever (a dedicated paginated history can come later if wanted).""" statuses = list(_ACTIVE) + (["skipped", "snoozed"] if include_archived else []) qs = ",".join("?" for _ in statuses) rows = conn.execute( f""" SELECT o.id, o.article_id, o.platform, o.status, o.social_score, o.rationale, o.talking_points, o.angle, o.entities, o.suggested_handles, o.draft_text, o.final_text, o.share_url, o.post_url, o.snooze_until, o.opened_at, o.posted_at, a.title, a.canonical_url, a.image_url, src.name AS source_name FROM outbound_shares o JOIN articles a ON a.id = o.article_id JOIN sources src ON src.id = a.source_id WHERE o.platform = ? AND o.status IN ({qs}) ORDER BY CASE o.status WHEN 'opened' THEN 0 WHEN 'drafting' THEN 1 ELSE 2 END, o.social_score DESC, o.created_at DESC """, (platform, *statuses), ).fetchall() return [_row_to_item(r) for r in rows] _ACTIVE_SET = {"queued", "drafting", "opened"} _VALID_STATUS = {"queued", "drafting", "opened", "posted", "skipped", "snoozed"} def _is_future(ts: str | None) -> bool: if not ts: return False try: dt = datetime.fromisoformat(str(ts).strip().replace("Z", "").replace("T", " ")) except (ValueError, TypeError): return False if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt > datetime.now(timezone.utc) def set_status(conn: sqlite3.Connection, share_id: int, status: str, *, draft_text: str | None = None, final_text: str | None = None, post_url: str | None = None, snooze_until: str | None = None) -> bool: """Transition an ACTIVE share. Enforces the lifecycle: only queued/drafting/opened items transition here — `posted` is permanently terminal and skipped/snoozed recover via restore() (so dedup can't be undone and an item can't be reposted). `snoozed` requires a valid FUTURE timestamp (a null/past date would exclude it forever); leaving snooze otherwise clears snooze_until. opened/posted stamp their times.""" if status not in _VALID_STATUS: return False if status == "snoozed" and not _is_future(snooze_until): return False row = conn.execute("SELECT status FROM outbound_shares WHERE id = ?", (share_id,)).fetchone() if not row or row["status"] not in _ACTIVE_SET: # terminal/archived → use restore() return False # snooze_until is set only when snoozing; cleared on every other transition. sets = ["status = ?", "updated_at = CURRENT_TIMESTAMP", "snooze_until = ?"] params: list = [status, snooze_until if status == "snoozed" else None] if status == "opened": sets.append("opened_at = CURRENT_TIMESTAMP") if status == "posted": sets.append("posted_at = CURRENT_TIMESTAMP") if draft_text is not None: sets.append("draft_text = ?") params.append(draft_text) if final_text is not None: sets.append("final_text = ?") params.append(final_text) if post_url is not None: sets.append("post_url = ?") params.append(post_url) params.append(share_id) cur = conn.execute( f"UPDATE outbound_shares SET {', '.join(sets)} WHERE id = ? " "AND status IN ('queued','drafting','opened')", # atomic: don't transition a row that just changed params, ) conn.commit() return cur.rowcount > 0 def save_draft(conn: sqlite3.Connection, share_id: int, draft_text: str) -> bool: # Only ACTIVE rows accept a draft — a late debounced autosave that lands after # Posted/Skip/Snooze must be a no-op (never write to a terminal/archived row). cur = conn.execute( "UPDATE outbound_shares SET draft_text = ?, status = CASE status WHEN 'queued' THEN 'drafting' ELSE status END, " "updated_at = CURRENT_TIMESTAMP WHERE id = ? AND status IN ('queued','drafting','opened')", (draft_text, share_id), ) conn.commit() return cur.rowcount > 0 def restore(conn: sqlite3.Connection, share_id: int) -> bool: """Bring a skipped/snoozed item back to the working queue (mistaken-click safety).""" cur = conn.execute( "UPDATE outbound_shares SET status='queued', snooze_until=NULL, updated_at=CURRENT_TIMESTAMP " "WHERE id = ? AND status IN ('skipped','snoozed')", (share_id,), ) conn.commit() return cur.rowcount > 0