"""Publishing Desk Phase 1 — queue logic, top-up/dedup semantics, comparative LLM ranking with deterministic fallback, verified handle resolution, status transitions.""" from datetime import datetime, timedelta, timezone import pytest from goodnews import publishing from goodnews.db import connect, init_db BASE = "https://ub.test" def _ts(hours_ago: float) -> str: return (datetime.now(timezone.utc) - timedelta(hours=hours_ago)).strftime("%Y-%m-%d %H:%M:%S") @pytest.fixture def conn(): c = connect(":memory:"); init_db(c) yield c c.close() def _src(c, sid, x_handle=None, paywall_override=None, content_visible=1): c.execute( "INSERT INTO sources (id,name,feed_url,trust_score,content_visible,x_handle,paywall_override) " "VALUES (?,?,?,?,?,?,?)", (sid, f"Source {sid}", f"http://s{sid}/feed", 5, content_visible, x_handle, paywall_override), ) def _article(c, aid, sid, *, accepted=1, dup=None, novelty=5, constructive=5, topic="science", url=None, image="http://img/x.jpg", hours_ago=1.0, complete=True): c.execute( "INSERT INTO articles (id,source_id,canonical_url,title,url_hash,image_url,published_at) " "VALUES (?,?,?,?,?,?,?)", (aid, sid, url or f"https://ex{aid}.com/a", f"Title {aid}", f"h{aid}", image, _ts(hours_ago)), ) if dup is not None: c.execute("UPDATE articles SET duplicate_of=? WHERE id=?", (dup, aid)) c.execute( "INSERT INTO article_scores (article_id,accepted,novelty_score,constructive_score,topic,reason_code) " "VALUES (?,?,?,?,?, 'ok')", (aid, accepted, novelty, constructive, topic), ) if complete: c.execute( "INSERT INTO article_summaries (article_id,summary,what_happened,why_matters,why_belongs) " "VALUES (?,?,?,?,?)", (aid, f"Summary {aid}", "wh", "wm", "wb"), ) def _seed_n(c, n): """n eligible articles, each from its own source (so diversity caps don't interfere).""" for i in range(1, n + 1): _src(c, i) _article(c, i, i, novelty=10 - i, topic=f"t{i}") c.commit() class FakeClient: def __init__(self, ranked): self._ranked = ranked def rank_for_social(self, candidates): return self._ranked class BoomClient: def rank_for_social(self, candidates): raise RuntimeError("model down") # --- handle resolution ---------------------------------------------------------- def test_handles_source_first_then_entities_deduped_capped(conn): publishing.add_entity_handle(conn, "Anthropic", "AnthropicAI", "https://x.com/AnthropicAI") publishing.add_entity_handle(conn, "NASA", "NASA") out = publishing.resolve_handles(conn, ["Anthropic", "NASA", "Unknown Org"], source_handle="Phys_org") assert out[0]["via"] == "source" and out[0]["handle"] == "@Phys_org" assert len(out) == 2 # capped at 2 assert out[1]["handle"] == "@AnthropicAI" # first matched entity; NASA dropped by cap assert all(h["handle"].startswith("@") for h in out) def test_handles_aliases_resolve_consistently(conn): publishing.add_entity_handle(conn, "Johns Hopkins University", "HopkinsMedicine") publishing.add_entity_handle(conn, "Johns Hopkins", "HopkinsMedicine") # alias row, same handle a = publishing.resolve_handles(conn, ["Johns Hopkins University"]) b = publishing.resolve_handles(conn, ["johns hopkins"]) assert a and b and a[0]["handle"] == b[0]["handle"] == "@HopkinsMedicine" def test_handles_unknown_entity_is_not_guessed(conn): assert publishing.resolve_handles(conn, ["Some Random Startup"]) == [] def test_normalization_does_not_collide_identity_words(conn): # a handle stored for the SCHOOL must not get suggested for the STATE publishing.add_entity_handle(conn, "University of California", "UCBerkeley") assert publishing.resolve_handles(conn, ["California"]) == [] # no false match got = publishing.resolve_handles(conn, ["University of California"]) assert got and got[0]["handle"] == "@UCBerkeley" # exact still resolves def test_normalization_preserves_the_and_strips_only_trailing_legal(conn): # "the" is never dropped, and legal suffixes only strip from the END assert publishing.normalize_entity("The Who") == "the who" # not "who" assert publishing.normalize_entity("Inc. Magazine") == "inc magazine" # leading legal kept assert publishing.normalize_entity("Apple Inc") == "apple" # trailing legal stripped # so "The Who" and "WHO" resolve to their OWN handles, no cross-match publishing.add_entity_handle(conn, "The Who", "TheWho") publishing.add_entity_handle(conn, "WHO", "WHO") assert publishing.resolve_handles(conn, ["The Who"])[0]["handle"] == "@TheWho" assert publishing.resolve_handles(conn, ["WHO"])[0]["handle"] == "@WHO" def test_invalid_handles_are_rejected_not_stored(conn): for bad in ("", "@", "not a handle", "https://x.com/NASA", "NASA!", "way_too_long_handle_x"): assert publishing.valid_handle(bad) is None assert publishing.add_entity_handle(conn, "Some Org", bad) is False # good ones: tolerate one leading @, store canonical assert publishing.valid_handle("@NASA") == "NASA" assert publishing.add_entity_handle(conn, "NASA", "@NASA") is True assert publishing.resolve_handles(conn, ["NASA"])[0]["handle"] == "@NASA" # a junk source handle is never suggested either assert publishing.resolve_handles(conn, [], source_handle="@bad handle!") == [] # --- eligibility ---------------------------------------------------------------- def test_eligibility_excludes_the_unfit(conn): _src(c=conn, sid=1) _article(conn, 1, 1) # eligible _article(conn, 2, 1, accepted=0) # rejected _article(conn, 3, 1, dup=1) # duplicate _article(conn, 4, 1, complete=False) # no complete summary _article(conn, 5, 1, hours_ago=24 * 10) # too old _src(conn, 2, content_visible=0) _article(conn, 6, 2) # source hidden _src(conn, 3, paywall_override="paywalled") _article(conn, 7, 3) # paywalled conn.commit() ids = {c["id"] for c in publishing.eligible_candidates(conn)} assert ids == {1} # --- build: deterministic fallback + top-up/dedup ------------------------------- def test_build_tops_up_to_target_and_dedups(conn): _seed_n(conn, 6) r1 = publishing.build_queue(conn, BASE, client=None, target=3) assert r1["added"] == 3 and r1["ranked_by"] == "deterministic" q = publishing.list_queue(conn) assert len(q) == 3 and all(i["share_url"].startswith(BASE + "/a/") for i in q) assert "utm_source=x" in q[0]["share_url"] # rebuild at same target → already full → adds nothing (no duplicates) assert publishing.build_queue(conn, BASE, client=None, target=3)["added"] == 0 # post one → a slot frees → next rebuild tops up with a NEW article, never the posted one posted_id = q[0]["id"]; posted_article = q[0]["article_id"] publishing.set_status(conn, posted_id, "posted") r3 = publishing.build_queue(conn, BASE, client=None, target=3) assert r3["added"] == 1 active_articles = {i["article_id"] for i in publishing.list_queue(conn)} assert posted_article not in active_articles # posted never re-queued def test_build_preserves_saved_draft_on_requeue(conn): # a snoozed item that becomes eligible again must keep its draft text _seed_n(conn, 1) publishing.build_queue(conn, BASE, client=None, target=1) sid = publishing.list_queue(conn)[0]["id"] publishing.save_draft(conn, sid, "my carefully written blurb") # force an EXPIRED snooze directly (set_status rightly refuses a past date) conn.execute("UPDATE outbound_shares SET status='snoozed', snooze_until=? WHERE id=?", (_ts(1), sid)) conn.commit() publishing.build_queue(conn, BASE, client=None, target=1) # re-queues it row = conn.execute("SELECT status, draft_text FROM outbound_shares WHERE id=?", (sid,)).fetchone() assert row["status"] == "queued" and row["draft_text"] == "my carefully written blurb" # --- build: comparative LLM ranking + fallback ---------------------------------- def test_build_uses_llm_ranking_and_attaches_fields(conn): _seed_n(conn, 3) publishing.add_entity_handle(conn, "NASA", "NASA") ranked = [ {"id": 3, "social_score": 9, "why": "wow", "talking_points": ["a", "b", "c"], "angle": "ask a question", "entities": ["NASA"]}, {"id": 1, "social_score": 4, "why": "ok", "talking_points": [], "angle": "", "entities": []}, ] r = publishing.build_queue(conn, BASE, client=FakeClient(ranked), target=2) assert r["ranked_by"] == "llm" and r["added"] == 2 q = publishing.list_queue(conn) top = q[0] assert top["article_id"] == 3 and top["social_score"] == 9 # LLM order wins assert top["talking_points"] == ["a", "b", "c"] and top["angle"] == "ask a question" assert any(h["handle"] == "@NASA" for h in top["suggested_handles"]) def test_build_falls_back_when_llm_errors(conn): _seed_n(conn, 3) r = publishing.build_queue(conn, BASE, client=BoomClient(), target=2) assert r["ranked_by"] == "deterministic" and r["added"] == 2 # model down ≠ broken Desk def test_deterministic_fallback_seeds_aids_but_leaves_score_and_angle_empty(conn): # Codex Fix-1: with no LLM, the card still carries writing aids (rationale + # talking points from the already-generated summary), but interest score and # angle stay None on purpose — those are LLM-only judgments, never manufactured. _seed_n(conn, 1) publishing.build_queue(conn, BASE, client=None, target=1) item = publishing.list_queue(conn)[0] assert item["rationale"] == "Summary 1" # seeded from the summary assert item["talking_points"] == ["wh", "wm", "wb"] # seeded from the explanation assert item["social_score"] is None and item["angle"] is None # LLM-only, left empty # --- adversarial: malformed LLM output ------------------------------------------ def test_duplicate_llm_ids_do_not_inflate_the_queue(conn): # the model repeats id 1; only 2 real articles exist. added/active must reflect # ACTUAL unique rows, never the inflated loop count Codex saw. _seed_n(conn, 2) ranked = [{"id": 1, "social_score": 9}, {"id": 1, "social_score": 9}, {"id": 1, "social_score": 9}, {"id": 2, "social_score": 5}] r = publishing.build_queue(conn, BASE, client=FakeClient(ranked), target=5) q = publishing.list_queue(conn) assert r["added"] == len(q) == 2 # not 5, not 3 assert len({i["article_id"] for i in q}) == 2 # unique articles def test_string_fields_do_not_become_char_arrays(conn): # model returns strings where lists are expected; build must store [], not ['f','a'..] _seed_n(conn, 1) ranked = [{"id": 1, "social_score": 7, "talking_points": "fact", "entities": "NASA"}] publishing.build_queue(conn, BASE, client=FakeClient(ranked), target=1) item = publishing.list_queue(conn)[0] assert item["talking_points"] == [] and item["entities"] == [] # --- lifecycle enforcement ------------------------------------------------------ def test_posted_is_terminal_and_cannot_be_requeued(conn): _seed_n(conn, 1) publishing.build_queue(conn, BASE, client=None, target=1) sid = publishing.list_queue(conn)[0]["id"] assert publishing.set_status(conn, sid, "posted") is True assert publishing.set_status(conn, sid, "queued") is False # no resurrection assert publishing.restore(conn, sid) is False # restore won't revive posted assert conn.execute("SELECT status FROM outbound_shares WHERE id=?", (sid,)).fetchone()["status"] == "posted" def test_late_autosave_is_rejected_after_terminal(conn): # Codex Fix-2: a debounced autosave that lands AFTER the item is posted must # not write to the terminal row (no clobbering what was actually published). _seed_n(conn, 1) publishing.build_queue(conn, BASE, client=None, target=1) sid = publishing.list_queue(conn)[0]["id"] assert publishing.save_draft(conn, sid, "draft while active") is True publishing.set_status(conn, sid, "posted") assert publishing.save_draft(conn, sid, "late autosave") is False # no-op on terminal row = conn.execute("SELECT draft_text FROM outbound_shares WHERE id=?", (sid,)).fetchone() assert row["draft_text"] == "draft while active" # the late write was ignored def test_posted_rows_never_appear_in_queue_or_archived_tray(conn): # Codex Fix-4: posted history is terminal and excluded everywhere the UI lists # rows — neither the working queue nor the archived tray ever grows with it. _seed_n(conn, 1) publishing.build_queue(conn, BASE, client=None, target=1) sid = publishing.list_queue(conn)[0]["id"] publishing.set_status(conn, sid, "posted") assert publishing.list_queue(conn) == [] # not in working queue assert publishing.list_queue(conn, include_archived=True) == [] # not in archived tray def test_snooze_requires_a_future_date(conn): _seed_n(conn, 1) publishing.build_queue(conn, BASE, client=None, target=1) sid = publishing.list_queue(conn)[0]["id"] assert publishing.set_status(conn, sid, "snoozed", snooze_until=None) is False # null assert publishing.set_status(conn, sid, "snoozed", snooze_until=_ts(1)) is False # past assert publishing.set_status(conn, sid, "snoozed", snooze_until=_ts(-48)) is True # future # leaving snooze later (via restore) clears the date publishing.restore(conn, sid) assert conn.execute("SELECT snooze_until FROM outbound_shares WHERE id=?", (sid,)).fetchone()["snooze_until"] is None # --- status transitions + restore + snooze -------------------------------------- def test_skip_is_reversible_and_snooze_is_separate(conn): _seed_n(conn, 2) publishing.build_queue(conn, BASE, client=None, target=2) q = publishing.list_queue(conn) a, b = q[0]["id"], q[1]["id"] publishing.set_status(conn, a, "skipped") assert a not in {i["id"] for i in publishing.list_queue(conn)} # gone from working queue assert a in {i["id"] for i in publishing.list_queue(conn, include_archived=True)} # but in the tray assert publishing.restore(conn, a) is True assert a in {i["id"] for i in publishing.list_queue(conn)} # restored # snooze: not in working queue, holds a snooze_until, restorable publishing.set_status(conn, b, "snoozed", snooze_until=_ts(-24)) # 24h in the future row = conn.execute("SELECT status, snooze_until FROM outbound_shares WHERE id=?", (b,)).fetchone() assert row["status"] == "snoozed" and row["snooze_until"] assert b not in {i["id"] for i in publishing.list_queue(conn)} def test_inflight_build_does_not_clobber_a_freshly_extended_snooze(conn): # Build snapshots eligibility, then the model ranks. If the user RE-SNOOZES to the # future mid-rank, the finished build must NOT revive it (only EXPIRED snoozes revive). _seed_n(conn, 1) publishing.build_queue(conn, BASE, client=None, target=1) sid = publishing.list_queue(conn)[0]["id"] conn.execute("UPDATE outbound_shares SET status='snoozed', snooze_until=? WHERE id=?", (_ts(1), sid)) # expired → eligible conn.commit() future = _ts(-48) # 48h ahead class RaceClient: def rank_for_social(self, candidates): # mid-build interleave: user extends the snooze into the future conn.execute("UPDATE outbound_shares SET snooze_until=? WHERE id=?", (future, sid)) conn.commit() return [{"id": 1, "social_score": 9}] publishing.build_queue(conn, BASE, client=RaceClient(), target=1) row = conn.execute("SELECT status, snooze_until FROM outbound_shares WHERE id=?", (sid,)).fetchone() assert row["status"] == "snoozed" and row["snooze_until"] == future # left alone, not re-queued