ddcfab3a11
New per-row "Articles" button on the Sources table expands a read-only inline
panel of the source's ACTUAL ingested articles — so the automated metrics
(paywall/image/acceptance/duplicate) can be verified against evidence instead of
trusted blind. Distinct from "Check" (which re-samples the LIVE feed for
would-pass quality); this shows what's already in the DB, which is what the table
metrics are computed from.
- Backend: GET /api/admin/sources/{id}/articles?filter=&limit=&offset= (admin,
read-only). queries.source_articles + source_articles_summary — per article:
title, url, date, accepted, reason (the "why"), topic/flavor, paywalled
(domain rule), has_image, duplicate. Summary = counts + source-level paywall
rule.
- Frontend: expandable panel with a summary header ("27 ingested · 18 accepted
· … · paywall rule: ON (domain)"), filter chips (All/Accepted/Rejected/No
image/Duplicates), compact rows with title→link + badges + reason, Load more.
So "100% paywall" or "0% images" becomes clickable evidence: open two articles
to tell a real paywall from a mis-flagged domain, or a true image gap from an
enrichment failure. Test: test_source_articles_inspector. 241 pytest + 11 vitest.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
521 lines
30 KiB
Python
521 lines
30 KiB
Python
import pytest
|
|
from fastapi.testclient import TestClient
|
|
|
|
|
|
def _make(tmp_path, monkeypatch, admin_email=""):
|
|
db = tmp_path / "t.sqlite3"
|
|
monkeypatch.setenv("GOODNEWS_DB", str(db))
|
|
monkeypatch.setenv("GOODNEWS_PUBLIC_BASE_URL", "http://testserver")
|
|
monkeypatch.setenv("GOODNEWS_ADMIN_EMAILS", admin_email)
|
|
import importlib
|
|
import goodnews.api as api
|
|
importlib.reload(api)
|
|
from goodnews.db import connect, init_db
|
|
c = connect(str(db)); init_db(c)
|
|
c.execute("INSERT INTO sources (id,name,feed_url,trust_score) VALUES (1,'S','http://s/f',5)")
|
|
c.execute("INSERT INTO articles (id,source_id,canonical_url,title,url_hash) VALUES (1,1,'http://s/1','t1','h1')")
|
|
c.execute("INSERT INTO article_scores (article_id,accepted,topic) VALUES (1,1,'science')")
|
|
c.execute("INSERT INTO article_tags (article_id,tag) VALUES (1,'science')")
|
|
c.commit(); c.close()
|
|
return api.create_app(), api
|
|
|
|
|
|
def _signin(app, api, email):
|
|
tc = TestClient(app)
|
|
sent = {}
|
|
import goodnews.email_send as es
|
|
orig = es.send_magic_link
|
|
es.send_magic_link = lambda to, link: sent.update(link=link)
|
|
try:
|
|
tc.post("/api/auth/email/start", json={"email": email})
|
|
tc.post("/api/auth/email/verify", json={"token": sent["link"].split("token=")[1]})
|
|
finally:
|
|
es.send_magic_link = orig
|
|
return tc
|
|
|
|
|
|
def test_admin_gating(tmp_path, monkeypatch):
|
|
app, api = _make(tmp_path, monkeypatch, admin_email="boss@x.com")
|
|
assert TestClient(app).get("/api/admin/stats").status_code == 401 # anon
|
|
nonadmin = _signin(app, api, "rando@x.com")
|
|
assert nonadmin.get("/api/admin/stats").status_code == 403 # signed in, not admin
|
|
assert nonadmin.get("/api/auth/me").json()["is_admin"] is False
|
|
admin = _signin(app, api, "Boss@X.com") # case-insensitive match
|
|
assert admin.get("/api/auth/me").json()["is_admin"] is True
|
|
assert admin.get("/api/admin/stats").status_code == 200
|
|
|
|
|
|
def test_admin_stats_shape(tmp_path, monkeypatch):
|
|
app, api = _make(tmp_path, monkeypatch, admin_email="boss@x.com")
|
|
admin = _signin(app, api, "boss@x.com")
|
|
# log a couple of events
|
|
admin.post("/api/events", json={"kind": "visit", "visitor": "v1"})
|
|
admin.post("/api/events", json={"kind": "open", "article_id": 1, "visitor": "v1"})
|
|
stats = admin.get("/api/admin/stats").json()
|
|
assert set(stats) >= {"visitors", "returning", "once", "top_articles", "top_groupings", "top_topics", "shares", "daily"}
|
|
assert stats["top_articles"][0]["id"] == 1 and stats["top_articles"][0]["opens"] == 1
|
|
assert any(g["tag"] == "science" for g in stats["top_groupings"])
|
|
|
|
|
|
def _src(tc, sid=1):
|
|
return next(s for s in tc.get("/api/admin/stats").json()["sources"] if s["id"] == sid)
|
|
|
|
|
|
def test_source_lifecycle_status_and_visibility(tmp_path, monkeypatch):
|
|
app, api = _make(tmp_path, monkeypatch, admin_email="boss@x.com")
|
|
tc = _signin(app, api, "boss@x.com")
|
|
# pause → status paused + active mirror 0
|
|
assert tc.post("/api/admin/sources/1/status", json={"status": "paused"}).json()["active"] == 0
|
|
assert _src(tc)["status"] == "paused" and _src(tc)["active"] == 0
|
|
# retire → status retired, still active=0, articles stay visible
|
|
tc.post("/api/admin/sources/1/status", json={"status": "retired"})
|
|
assert _src(tc)["status"] == "retired" and _src(tc)["active"] == 0
|
|
assert _src(tc)["content_visible"] == 1 # retire does NOT hide content
|
|
# restore → active again, mirror 1
|
|
tc.post("/api/admin/sources/1/status", json={"status": "active"})
|
|
assert _src(tc)["status"] == "active" and _src(tc)["active"] == 1
|
|
# hide content → feed excludes it; show → back
|
|
tc.post("/api/admin/sources/1/visibility", json={"visible": False})
|
|
assert _src(tc)["content_visible"] == 0
|
|
from goodnews import queries
|
|
import sqlite3, os
|
|
c = sqlite3.connect(os.environ["GOODNEWS_DB"]); c.row_factory = sqlite3.Row
|
|
assert queries.feed(c) == [] # hidden source's article drops out of the feed
|
|
tc.post("/api/admin/sources/1/visibility", json={"visible": True})
|
|
c2 = sqlite3.connect(os.environ["GOODNEWS_DB"]); c2.row_factory = sqlite3.Row
|
|
assert len(queries.feed(c2)) == 1 # back in the feed
|
|
# validation + 404
|
|
assert tc.post("/api/admin/sources/1/status", json={"status": "bogus"}).status_code == 422
|
|
assert tc.post("/api/admin/sources/999/status", json={"status": "paused"}).status_code == 404
|
|
|
|
|
|
def test_source_flag_and_gating(tmp_path, monkeypatch):
|
|
app, api = _make(tmp_path, monkeypatch, admin_email="boss@x.com")
|
|
tc = _signin(app, api, "boss@x.com")
|
|
tc.post("/api/admin/sources/1/review", json={"flag": True, "reason": "spammy lately"})
|
|
assert _src(tc)["review_flag"] == 1 and _src(tc)["review_reason"] == "spammy lately"
|
|
tc.post("/api/admin/sources/1/review", json={"flag": False})
|
|
assert _src(tc)["review_flag"] == 0 and _src(tc)["review_reason"] is None
|
|
anon = TestClient(app)
|
|
assert anon.post("/api/admin/sources/1/status", json={"status": "paused"}).status_code == 401
|
|
assert anon.post("/api/admin/sources/1/visibility", json={"visible": False}).status_code == 401
|
|
assert anon.post("/api/admin/sources/1/review", json={"flag": True}).status_code == 401
|
|
|
|
|
|
def test_source_health_includes_metrics(tmp_path, monkeypatch):
|
|
import sqlite3
|
|
from goodnews import queries
|
|
app, api = _make(tmp_path, monkeypatch, admin_email="boss@x.com")
|
|
import os
|
|
c = sqlite3.connect(os.environ["GOODNEWS_DB"]); c.row_factory = sqlite3.Row
|
|
sh = queries.source_health(c)
|
|
s = sh[0]
|
|
for key in ("active", "served", "accepted_total", "total_articles", "duplicates",
|
|
"acceptance_rate", "duplicate_rate", "review_reason", "next_due_at"):
|
|
assert key in s, f"missing {key}"
|
|
assert s["served"] == 1 and s["acceptance_rate"] == 100
|
|
|
|
|
|
def test_admin_stats_days_param_clamped(tmp_path, monkeypatch):
|
|
app, api = _make(tmp_path, monkeypatch, admin_email="boss@x.com")
|
|
tc = _signin(app, api, "boss@x.com")
|
|
assert tc.get("/api/admin/stats?days=7").json()["days"] == 7
|
|
assert tc.get("/api/admin/stats?days=90").json()["days"] == 90
|
|
assert tc.get("/api/admin/stats?days=999").json()["days"] == 30 # clamped
|
|
assert tc.get("/api/admin/stats").json()["days"] == 30 # default
|
|
|
|
|
|
def test_candidate_suggest_promote_paused(tmp_path, monkeypatch):
|
|
app, api = _make(tmp_path, monkeypatch, admin_email="boss@x.com")
|
|
monkeypatch.setattr(api.feeds, "preview_feed",
|
|
lambda url, **k: {"url": url, "sampled": 5, "accepted": 4, "examples_accepted": ["A", "B"]})
|
|
tc = _signin(app, api, "boss@x.com")
|
|
cand = tc.post("/api/admin/candidates", json={"feed_url": "http://good/feed", "name": "Good Feed"}).json()
|
|
assert cand["status"] == "suggested" and cand["preview"]["accepted"] == 4
|
|
cid = cand["id"]
|
|
assert any(c["id"] == cid for c in tc.get("/api/admin/candidates").json())
|
|
# promote defaults to paused (active-on-approval off) — no mirror drift
|
|
res = tc.post(f"/api/admin/candidates/{cid}/promote", json={}).json()
|
|
assert res["source"]["status"] == "paused" and res["source"]["active"] == 0
|
|
assert res["candidate"]["status"] == "promoted"
|
|
assert any(s["name"] == "Good Feed" for s in tc.get("/api/admin/stats").json()["sources"])
|
|
|
|
|
|
def test_candidate_deep_preview_and_dedup(tmp_path, monkeypatch):
|
|
app, api = _make(tmp_path, monkeypatch, admin_email="boss@x.com")
|
|
|
|
def fake_preview(url, **k):
|
|
# Echo back whether the LLM client was wired in + the sample size used.
|
|
return {"url": url, "sampled": k.get("sample"), "accepted": 4,
|
|
"classified": k.get("client") is not None}
|
|
monkeypatch.setattr(api.feeds, "preview_feed", fake_preview)
|
|
# Deep preview builds a model client; stub it so we never touch the real LAN model.
|
|
monkeypatch.setattr(api, "LocalModelClient", type("C", (), {"from_env": staticmethod(lambda: object())}))
|
|
tc = _signin(app, api, "boss@x.com")
|
|
|
|
cand = tc.post("/api/admin/candidates", json={"feed_url": "https://news.test/feed"}).json()
|
|
assert cand["preview"]["classified"] is False # add uses the fast heuristic
|
|
# Deep preview runs the real classifier on the smaller sample.
|
|
deep = tc.post(f"/api/admin/candidates/{cand['id']}/preview?deep=true").json()
|
|
assert deep["preview"]["classified"] is True and deep["preview"]["sampled"] == 8
|
|
# Dedup at ADD: exact + trivial variants (scheme / www / trailing slash / host case).
|
|
assert tc.post("/api/admin/candidates", json={"feed_url": "https://news.test/feed"}).status_code == 409
|
|
assert tc.post("/api/admin/candidates", json={"feed_url": "http://www.news.test/feed/"}).status_code == 409
|
|
# Promote succeeds the first time and creates the live source.
|
|
assert tc.post(f"/api/admin/candidates/{cand['id']}/promote", json={}).status_code == 200
|
|
assert tc.post("/api/admin/candidates", json={"feed_url": "https://NEWS.test/feed"}).status_code == 409
|
|
# Dedup at PROMOTE too: a stale/duplicate candidate (here, re-promoting the
|
|
# same one) can't bypass add and overwrite the live source's settings.
|
|
assert tc.post(f"/api/admin/candidates/{cand['id']}/promote", json={}).status_code == 409
|
|
|
|
|
|
def test_candidate_rename(tmp_path, monkeypatch):
|
|
app, api = _make(tmp_path, monkeypatch, admin_email="boss@x.com")
|
|
monkeypatch.setattr(api.feeds, "preview_feed", lambda url, **k: {"url": url, "sampled": 3, "accepted": 2})
|
|
tc = _signin(app, api, "boss@x.com")
|
|
cand = tc.post("/api/admin/candidates", json={"feed_url": "http://site/feed", "name": "Wrong Naem"}).json()
|
|
cid = cand["id"]
|
|
# Fix the typo in place — no reject/re-add, preview is preserved.
|
|
fixed = tc.post(f"/api/admin/candidates/{cid}/rename", json={"name": "Right Name"}).json()
|
|
assert fixed["name"] == "Right Name" and fixed["preview"]["accepted"] == 2
|
|
assert any(c["id"] == cid and c["name"] == "Right Name" for c in tc.get("/api/admin/candidates").json())
|
|
# Length is capped so an accidental pasted paragraph can't wreck the queue.
|
|
capped = tc.post(f"/api/admin/candidates/{cid}/rename", json={"name": "z" * 300}).json()
|
|
assert len(capped["name"]) == 160
|
|
tc.post(f"/api/admin/candidates/{cid}/rename", json={"name": "Right Name"}) # restore
|
|
# The fixed name carries through to promotion.
|
|
res = tc.post(f"/api/admin/candidates/{cid}/promote", json={}).json()
|
|
assert res["source"]["name"] == "Right Name"
|
|
# Settled candidates (promoted/rejected) can't be renamed server-side, not just hidden in the UI.
|
|
assert tc.post(f"/api/admin/candidates/{cid}/rename", json={"name": "Nope"}).status_code == 409
|
|
# Gated; unknown id → 404.
|
|
assert TestClient(app).post(f"/api/admin/candidates/{cid}/rename", json={"name": "x"}).status_code == 401
|
|
assert tc.post("/api/admin/candidates/99999/rename", json={"name": "x"}).status_code == 404
|
|
|
|
|
|
def test_candidate_reject_and_gating(tmp_path, monkeypatch):
|
|
app, api = _make(tmp_path, monkeypatch, admin_email="boss@x.com")
|
|
monkeypatch.setattr(api.feeds, "preview_feed", lambda url, **k: {"url": url, "sampled": 1, "accepted": 0})
|
|
tc = _signin(app, api, "boss@x.com")
|
|
cand = tc.post("/api/admin/candidates", json={"feed_url": "http://meh/feed"}).json()
|
|
assert tc.post(f"/api/admin/candidates/{cand['id']}/reject").json()["status"] == "rejected"
|
|
anon = TestClient(app)
|
|
assert anon.get("/api/admin/candidates").status_code == 401
|
|
assert anon.post("/api/admin/candidates", json={"feed_url": "http://x/f"}).status_code == 401
|
|
assert anon.post("/api/admin/candidates/1/promote", json={}).status_code == 401
|
|
|
|
|
|
def test_safe_fetch_feed_blocks_ssrf():
|
|
import pytest
|
|
from goodnews.feeds import safe_fetch_feed
|
|
for bad in ("http://127.0.0.1/x", "http://localhost/x", "file:///etc/passwd",
|
|
"http://169.254.169.254/latest", "ftp://x/y"):
|
|
with pytest.raises(RuntimeError):
|
|
safe_fetch_feed(bad, timeout=2)
|
|
|
|
|
|
def test_export_sources_csv(tmp_path, monkeypatch):
|
|
app, api = _make(tmp_path, monkeypatch, admin_email="boss@x.com")
|
|
tc = _signin(app, api, "boss@x.com")
|
|
r = tc.get("/api/admin/export/sources.csv")
|
|
assert r.status_code == 200 and r.headers["content-type"].startswith("text/csv")
|
|
assert 'attachment; filename="sources.csv"' in r.headers["content-disposition"]
|
|
lines = r.text.splitlines()
|
|
assert lines[0].startswith("name,feed_url,homepage,status,visible,served")
|
|
assert any("http://s/f" in ln for ln in lines[1:]) # the seeded source row
|
|
assert TestClient(app).get("/api/admin/export/sources.csv").status_code == 401 # gated
|
|
|
|
|
|
def test_export_audience_csv(tmp_path, monkeypatch):
|
|
app, api = _make(tmp_path, monkeypatch, admin_email="boss@x.com")
|
|
tc = _signin(app, api, "boss@x.com")
|
|
tc.post("/api/events", json={"kind": "visit", "visitor": "v1"})
|
|
r = tc.get("/api/admin/export/audience.csv?days=7")
|
|
assert r.status_code == 200 and r.headers["content-type"].startswith("text/csv")
|
|
body = r.text
|
|
assert "metric,value" in body and "window_days,7" in body
|
|
assert "date,visitors,opens" in body # daily time-series section
|
|
assert TestClient(app).get("/api/admin/export/audience.csv").status_code == 401 # gated
|
|
|
|
|
|
def test_export_sources_csv_escapes_formula_injection(tmp_path, monkeypatch):
|
|
import os, sqlite3
|
|
app, api = _make(tmp_path, monkeypatch, admin_email="boss@x.com")
|
|
c = sqlite3.connect(os.environ["GOODNEWS_DB"])
|
|
c.execute("UPDATE sources SET name = ?, review_flag = 1, review_reason = ? WHERE id = 1",
|
|
('=HYPERLINK("http://bad")', '+danger'))
|
|
c.commit(); c.close()
|
|
tc = _signin(app, api, "boss@x.com")
|
|
body = tc.get("/api/admin/export/sources.csv").text
|
|
assert "'=HYPERLINK" in body # leading apostrophe defuses the formula (CSV may quote the cell)
|
|
assert "'+danger" in body
|
|
assert ",=HYPERLINK" not in body # never written as a bare, evaluable formula cell
|
|
|
|
|
|
def test_source_check_preview_is_readonly(tmp_path, monkeypatch):
|
|
app, api = _make(tmp_path, monkeypatch, admin_email="boss@x.com")
|
|
monkeypatch.setattr(api.feeds, "preview_feed", lambda url, **k: {"url": url, "sampled": 8, "accepted": 6})
|
|
tc = _signin(app, api, "boss@x.com")
|
|
before = _src(tc)
|
|
r = tc.post("/api/admin/sources/1/preview").json()
|
|
assert r["sampled"] == 8 and r["accepted"] == 6
|
|
after = _src(tc)
|
|
# read-only: no state/health change, no poll attempt recorded
|
|
assert after["status"] == before["status"] and after["served"] == before["served"]
|
|
assert after["last_success_at"] == before["last_success_at"] and after["next_due_at"] == before["next_due_at"]
|
|
assert TestClient(app).post("/api/admin/sources/1/preview").status_code == 401 # gated
|
|
assert tc.post("/api/admin/sources/999/preview").status_code == 404
|
|
|
|
|
|
def test_digest_toggle_and_unsubscribe(tmp_path, monkeypatch):
|
|
import os, sqlite3
|
|
app, api = _make(tmp_path, monkeypatch, admin_email="boss@x.com")
|
|
tc = _signin(app, api, "reader@x.com")
|
|
assert tc.get("/api/auth/me").json()["digest_enabled"] is False
|
|
assert tc.post("/api/account/digest", json={"enabled": True}).json()["digest_enabled"] is True
|
|
assert tc.get("/api/auth/me").json()["digest_enabled"] is True
|
|
c = sqlite3.connect(os.environ["GOODNEWS_DB"])
|
|
uid, tok = c.execute("SELECT id, digest_unsub_token FROM users WHERE email='reader@x.com'").fetchone()
|
|
c.close()
|
|
assert tok # token generated on opt-in
|
|
# one-click unsubscribe: wrong token is rejected, right token disables
|
|
assert "invalid" in TestClient(app).get(f"/api/digest/unsubscribe?u={uid}&t=nope").text.lower()
|
|
assert "unsubscribed" in TestClient(app).get(f"/api/digest/unsubscribe?u={uid}&t={tok}").text.lower()
|
|
assert tc.get("/api/auth/me").json()["digest_enabled"] is False
|
|
# RFC 8058 one-click POST also disables
|
|
tc.post("/api/account/digest", json={"enabled": True})
|
|
assert TestClient(app).post(f"/api/digest/unsubscribe?u={uid}&t={tok}").json() == {"ok": True}
|
|
assert tc.get("/api/auth/me").json()["digest_enabled"] is False
|
|
assert TestClient(app).post("/api/account/digest", json={"enabled": True}).status_code == 401 # gated
|
|
|
|
|
|
def test_follows_and_following_feed(tmp_path, monkeypatch):
|
|
import os, sqlite3
|
|
app, api = _make(tmp_path, monkeypatch, admin_email="boss@x.com")
|
|
c = sqlite3.connect(os.environ["GOODNEWS_DB"])
|
|
c.execute("INSERT INTO sources (id,name,feed_url) VALUES (2,'Other','http://o/f')")
|
|
c.execute("INSERT INTO articles (id,source_id,canonical_url,title,url_hash) VALUES (2,2,'http://o/2','t2','h2')")
|
|
c.execute("INSERT INTO article_scores (article_id,accepted,topic) VALUES (2,1,'tech')")
|
|
c.commit(); c.close()
|
|
tc = _signin(app, api, "reader@x.com")
|
|
# no follows yet → empty following feed (not an error)
|
|
assert tc.get("/api/feed?following=true").json()["count"] == 0
|
|
# follow source 1 → only its article
|
|
assert tc.post("/api/follows", json={"kind": "source", "value": "1"}).json()["ok"] is True
|
|
assert any(x["value"] == "1" and x["name"] == "S" for x in tc.get("/api/follows").json())
|
|
assert [i["id"] for i in tc.get("/api/feed?following=true").json()["items"]] == [1]
|
|
# follow source 2 too → both
|
|
tc.post("/api/follows", json={"kind": "source", "value": "2"})
|
|
assert {i["id"] for i in tc.get("/api/feed?following=true").json()["items"]} == {1, 2}
|
|
# follow tag works too (article 1 carries 'science')
|
|
tc.post("/api/follows", json={"kind": "tag", "value": "Science"}) # normalized lower
|
|
assert any(x["kind"] == "tag" and x["value"] == "science" for x in tc.get("/api/follows").json())
|
|
# unfollow source 2 (DELETE via query) → back to {1}
|
|
tc.delete("/api/follows?kind=source&value=2")
|
|
assert {i["id"] for i in tc.get("/api/feed?following=true").json()["items"]} == {1}
|
|
# anon: following feed empty, follows API gated, bad source 404
|
|
assert TestClient(app).get("/api/feed?following=true").json()["count"] == 0
|
|
assert TestClient(app).get("/api/follows").status_code == 401
|
|
assert tc.post("/api/follows", json={"kind": "source", "value": "999"}).status_code == 404
|
|
|
|
|
|
def test_since_endpoint(tmp_path, monkeypatch):
|
|
import os, sqlite3
|
|
app, api = _make(tmp_path, monkeypatch, admin_email="boss@x.com")
|
|
c = sqlite3.connect(os.environ["GOODNEWS_DB"])
|
|
for aid, when in [(2, "2020-01-01 00:00:00"), (3, "2030-01-01 00:00:00")]:
|
|
c.execute("INSERT INTO articles (id,source_id,canonical_url,title,url_hash,discovered_at) VALUES (?,1,?,?,?,?)",
|
|
(aid, f"http://s/{aid}", f"t{aid}", f"h{aid}", when))
|
|
c.execute("INSERT INTO article_scores (article_id,accepted) VALUES (?,1)", (aid,))
|
|
c.commit(); c.close()
|
|
tc = TestClient(app)
|
|
r = tc.get("/api/since?ts=2027-01-01T00:00:00Z").json()
|
|
assert r["count"] == 1 and [i["id"] for i in r["items"]] == [3] # only the post-2027 article
|
|
assert tc.get("/api/since?ts=2099-01-01T00:00:00Z").json()["count"] == 0 # nothing newer
|
|
assert tc.get("/api/since?ts=not-a-date").json()["count"] == 0 # invalid ts → quiet 0
|
|
|
|
|
|
def test_puzzle_endpoint(tmp_path, monkeypatch):
|
|
import os
|
|
import sqlite3
|
|
from goodnews import games
|
|
from goodnews.localtime import local_today
|
|
app, api = _make(tmp_path, monkeypatch)
|
|
tc = TestClient(app)
|
|
r = tc.get("/api/puzzle/word?variant=5").json()
|
|
assert r["game"] == "word" and r["variant"] == "5" and r["length"] == 5 and r["guesses"] == 6
|
|
assert "answer" not in r # the public puzzle response never carries the answer
|
|
assert tc.get("/api/puzzle/word?variant=6").json()["guesses"] == 7
|
|
assert tc.get("/api/puzzle/word?variant=9").status_code == 404
|
|
assert tc.get("/api/puzzle/nonsense").status_code == 404
|
|
|
|
# server-adjudicated guessing (answer revealed only on solve / exhaustion)
|
|
c = sqlite3.connect(os.environ["GOODNEWS_DB"]); c.row_factory = sqlite3.Row
|
|
ans = games.generate_word_puzzle(c, local_today(), "5")["answer"]
|
|
mid = tc.post("/api/puzzle/word/guess", json={"variant": "5", "guess": "xxxxx", "n": 1}).json()
|
|
assert len(mid["colors"]) == 5 and mid["solved"] is False and mid["answer"] is None
|
|
win = tc.post("/api/puzzle/word/guess", json={"variant": "5", "guess": ans, "n": 2}).json()
|
|
assert win["solved"] is True and win["answer"] == ans and all(x == "correct" for x in win["colors"])
|
|
last = tc.post("/api/puzzle/word/guess", json={"variant": "5", "guess": "xxxxx", "n": 6}).json()
|
|
assert last["answer"] == ans # exhausting guesses reveals it even when wrong
|
|
# wrong length → 400
|
|
assert tc.post("/api/puzzle/word/guess", json={"variant": "5", "guess": "toolong", "n": 1}).status_code == 400
|
|
|
|
|
|
def test_wordsearch_endpoint(tmp_path, monkeypatch):
|
|
app, api = _make(tmp_path, monkeypatch)
|
|
tc = TestClient(app)
|
|
dirs = [(-1, -1), (-1, 0), (-1, 1), (0, -1), (0, 1), (1, -1), (1, 0), (1, 1)]
|
|
|
|
def findable(grid, n, w):
|
|
for r0 in range(n):
|
|
for c0 in range(n):
|
|
for dr, dc in dirs:
|
|
cells = [(r0 + dr * i, c0 + dc * i) for i in range(len(w))]
|
|
if all(0 <= rr < n and 0 <= cc < n for rr, cc in cells) and \
|
|
"".join(grid[rr][cc] for rr, cc in cells) == w:
|
|
return True
|
|
return False
|
|
|
|
themes, sizes, per_size = set(), {"small": 8, "med": 11, "large": 14}, {}
|
|
for tier, dim in sizes.items():
|
|
r = tc.get(f"/api/puzzle/wordsearch?variant={tier}").json()
|
|
assert r["game"] == "wordsearch" and r["theme"] and r["size"] == tier
|
|
assert len(r["grid"]) == dim and all(len(row) == dim for row in r["grid"]) # bigger tier → bigger grid
|
|
assert "placements" not in r # solution cells never sent
|
|
assert all(findable(r["grid"], dim, w) for w in r["words"]) # every word placed → solvable
|
|
themes.add(r["theme"]); per_size[tier] = set(r["words"])
|
|
assert len(themes) == 1 # all sizes share the day's one theme
|
|
# frozen promise: exact per-size counts ...
|
|
assert len(per_size["small"]) == 6 and len(per_size["med"]) == 9 and len(per_size["large"]) == 13
|
|
# ... and the three sizes are DISJOINT — each day is three distinct puzzles
|
|
assert per_size["small"] & per_size["med"] == set()
|
|
assert per_size["small"] & per_size["large"] == set()
|
|
assert per_size["med"] & per_size["large"] == set()
|
|
# an unknown size falls back to med
|
|
assert tc.get("/api/puzzle/wordsearch?variant=nope").json()["size"] == "med"
|
|
|
|
|
|
def test_wordsearch_thin_llm_falls_back(tmp_path, monkeypatch):
|
|
from goodnews.db import connect, init_db
|
|
from goodnews import games
|
|
|
|
class FakeClient:
|
|
def __init__(self, text):
|
|
self.text = text
|
|
|
|
def chat_text(self, msg):
|
|
return self.text
|
|
|
|
c = connect(":memory:"); init_db(c)
|
|
# Thin proposal (only ~6 valid words) must be REJECTED so Large can't underfill.
|
|
thin = FakeClient("THEME: Thin Theme\nWORDS: ONE, TWO, FOUR, FIVE, SEVEN, EIGHT, THREE")
|
|
p = games.generate_wordsearch_puzzle(c, "2026-07-01", client=thin)
|
|
assert p["theme"] != "Thin Theme" # fell back to a curated theme
|
|
large = games.wordsearch_response(c, "2026-07-01", "large")
|
|
assert len(large["words"]) == games.WS_TIERS["large"]["count"] # still full
|
|
|
|
# A rich proposal (>= WS_MIN_ACCEPT valid words) is accepted.
|
|
rich_words = [chr(65 + i) * 5 for i in range(26)] + ["ABCDE", "FGHIJ", "KLMNO", "PQRST"] # 30 distinct
|
|
rich = FakeClient("THEME: Rich Theme\nWORDS: " + ", ".join(rich_words))
|
|
p2 = games.generate_wordsearch_puzzle(c, "2026-07-02", client=rich)
|
|
assert p2["theme"] == "Rich Theme" and len(p2["words"]) >= games.WS_MIN_ACCEPT
|
|
|
|
|
|
def test_word_pool_admin(tmp_path, monkeypatch):
|
|
from goodnews import games
|
|
app, api = _make(tmp_path, monkeypatch, admin_email="boss@x.com")
|
|
assert TestClient(app).get("/api/admin/word/pool").status_code == 401 # gated
|
|
tc = _signin(app, api, "boss@x.com")
|
|
# lookup: valid dict word vs non-word vs wrong length
|
|
assert tc.get("/api/admin/word/lookup?word=thrive").json() == {
|
|
"word": "thrive", "length": 6, "alpha": True, "variant": "6",
|
|
"in_dictionary": True, "in_pool": True, "removed": False}
|
|
assert tc.get("/api/admin/word/lookup?word=zzzzz").json()["in_dictionary"] is False
|
|
assert tc.get("/api/admin/word/lookup?word=cat").json()["variant"] is None
|
|
# add a valid word, then it shows in the pool + lookup
|
|
res = tc.post("/api/admin/word/pool", json={"word": "PLUMB"}).json()
|
|
assert res["added"] is True and "plumb" in res["pool"]["5"]["added"]
|
|
assert tc.get("/api/admin/word/lookup?word=plumb").json()["in_pool"] is True
|
|
# rejections: non-dictionary word + wrong length + duplicate
|
|
assert tc.post("/api/admin/word/pool", json={"word": "qwxzv"}).status_code == 400
|
|
assert tc.post("/api/admin/word/pool", json={"word": "cat"}).status_code == 400
|
|
assert tc.post("/api/admin/word/pool", json={"word": "plumb"}).status_code == 400
|
|
# remove the admin-added word
|
|
tc.delete("/api/admin/word/pool/plumb")
|
|
assert "plumb" not in tc.get("/api/admin/word/pool").json()["5"]["added"]
|
|
# remove a CURATED word (only a tombstone can pull it) → restore brings it back
|
|
tc.delete("/api/admin/word/pool/thrive")
|
|
look = tc.get("/api/admin/word/lookup?word=thrive").json()
|
|
assert look["in_pool"] is False and look["removed"] is True
|
|
assert "thrive" in tc.get("/api/admin/word/pool").json()["6"]["removed"]
|
|
tc.post("/api/admin/word/pool/restore", json={"word": "thrive"})
|
|
assert tc.get("/api/admin/word/lookup?word=thrive").json()["in_pool"] is True
|
|
# bulk import: validates, dedupes, reports rejects
|
|
fresh5 = next(w for w in sorted(games._DICT["5"]) if w not in set(games._POOL["5"]))
|
|
imp = tc.post("/api/admin/word/pool/import",
|
|
json={"text": f"{fresh5.upper()}, {fresh5}, thrive, qwxzv, hi"}).json()
|
|
assert fresh5 in imp["added"] and imp["counts"]["added"] == 1
|
|
assert imp["counts"]["duplicates"] >= 1 # thrive already present
|
|
assert {r["word"] for r in imp["rejected"]} == {"qwxzv", "hi"}
|
|
assert fresh5 in tc.get("/api/admin/word/pool").json()["5"]["added"]
|
|
|
|
|
|
def test_client_error_telemetry(tmp_path, monkeypatch):
|
|
app, api = _make(tmp_path, monkeypatch, admin_email="boss@x.com")
|
|
anon = TestClient(app)
|
|
assert anon.post("/api/client-error",
|
|
json={"reason": "boot-timeout", "path": "/play", "version": "start.Bzfu1yPF.js"}).json()["ok"] is True
|
|
assert anon.get("/api/admin/client-errors").status_code == 401 # gated
|
|
tc = _signin(app, api, "boss@x.com")
|
|
rows = tc.get("/api/admin/client-errors").json()
|
|
assert len(rows) == 1 and rows[0]["reason"] == "boot-timeout" and rows[0]["path"] == "/play"
|
|
assert rows[0]["user_agent"] # captured from the request header
|
|
assert rows[0]["app_version"] == "start.Bzfu1yPF.js" # build correlation for deploy-related errors
|
|
assert rows[0]["bot"] is False
|
|
assert tc.get("/api/admin/stats").json()["client_errors"]["today"] == 1
|
|
# A throttled crawler tripping the beacon must NOT inflate the headline count,
|
|
# but stays visible (tagged) in the list.
|
|
anon.post("/api/client-error", json={"reason": "boot-timeout", "path": "/"},
|
|
headers={"user-agent": "Mozilla/5.0 (X11; Linux x86_64) HeadlessChrome/138.0 Safari/537.36"})
|
|
rows = tc.get("/api/admin/client-errors").json()
|
|
assert len(rows) == 2 and rows[0]["bot"] is True
|
|
stats = tc.get("/api/admin/stats").json()["client_errors"]
|
|
assert stats["today"] == 1 and stats["window"] == 1 # bot excluded from both
|
|
|
|
|
|
def test_wordsearch_theme_admin(tmp_path, monkeypatch):
|
|
app, api = _make(tmp_path, monkeypatch, admin_email="boss@x.com")
|
|
assert TestClient(app).get("/api/admin/wordsearch/themes").status_code == 401 # gated
|
|
tc = _signin(app, api, "boss@x.com")
|
|
w28 = ["table", "chair", "clock", "shelf", "couch", "pillow", "window", "carpet", "mirror", "candle",
|
|
"kettle", "drawer", "closet", "curtain", "cushion", "basket", "bottle", "towel", "broom", "ladder",
|
|
"stairs", "pantry", "blanket", "vase", "hallway", "doorway", "mantel", "hamper"]
|
|
# too few valid words → 400
|
|
assert tc.post("/api/admin/wordsearch/themes", json={"theme": "X", "words": ["cat", "dog"]}).status_code == 400
|
|
# save ok (>= 28); listed with the right count
|
|
res = tc.post("/api/admin/wordsearch/themes", json={"theme": "My House", "words": w28}).json()
|
|
assert res["saved"] and any(t["theme"] == "My House" and t["count"] == 28 for t in res["themes"])
|
|
tid = next(t["id"] for t in res["themes"] if t["theme"] == "My House")
|
|
# edit/update keeps the same id
|
|
upd = tc.post("/api/admin/wordsearch/themes", json={"theme": "House Stuff", "words": w28, "id": tid}).json()
|
|
assert any(t["id"] == tid and t["theme"] == "House Stuff" for t in upd["themes"])
|
|
# remove
|
|
left = tc.delete(f"/api/admin/wordsearch/themes/{tid}").json()
|
|
assert not any(t["id"] == tid for t in left)
|
|
|
|
|
|
def test_source_articles_inspector(tmp_path, monkeypatch):
|
|
app, api = _make(tmp_path, monkeypatch, admin_email="boss@x.com")
|
|
assert TestClient(app).get("/api/admin/sources/1/articles").status_code == 401 # gated
|
|
tc = _signin(app, api, "boss@x.com")
|
|
r = tc.get("/api/admin/sources/1/articles").json()
|
|
assert r["summary"]["total"] == 1 and r["summary"]["accepted"] == 1 and r["summary"]["no_image"] == 1
|
|
assert len(r["articles"]) == 1
|
|
a = r["articles"][0]
|
|
assert a["title"] == "t1" and a["accepted"] == 1 and a["has_image"] is False and a["paywalled"] is False
|
|
# filters resolve in SQL; rejected → none (the seeded article is accepted)
|
|
assert tc.get("/api/admin/sources/1/articles?filter=rejected").json()["articles"] == []
|
|
assert len(tc.get("/api/admin/sources/1/articles?filter=no_image").json()["articles"]) == 1
|
|
assert tc.get("/api/admin/sources/999/articles").status_code == 404 # unknown source
|