89c0fbe1f6
The deploy pipeline runs from the working tree, so a wave of shipped features
had never been committed. This snapshots git to what's actually running.
SEO impression recovery (live + verified):
- Duplicate /a/{id} now 301-redirect to their canonical twin instead of 404
(a hard 404 silently dropped already-indexed URLs and tanked impressions).
- Dedup representative selection reworked: accepted/serveable -> established
rep (URL stability) -> quality score, so an accepted page never retires to a
rejected rep and an indexed canonical doesn't churn when a newer twin arrives.
- HEAD /a/{id} returns the same status as GET (api_route GET+HEAD) instead of
falling through to the static mount and 404ing.
- `dedup --force-recluster`: cycle-locked, model-free re-cluster to re-apply the
policy to the existing corpus (shared cycle_lock context manager).
- CLI honors GOODNEWS_DB for its default --db (was silently ignored).
Publishing Desk (admin tool to post highlights to X via Web Intents):
- publishing.py queue/rank/handle-resolution; admin UI; full searchable emoji
picker (bundled data, no CDN) for the blurb editor.
Play games + site:
- Bloom (word-wheel), Memory Match, daily ritual set, Zen Den (dev-gated).
- English-only language gate; source prospecting; paywall + dedup hardening.
Tests: full suite green (349). Ignores tightened (node_modules, data/*.db).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
961 lines
43 KiB
Python
961 lines
43 KiB
Python
"""Daily puzzles for the calm Play hub.
|
||
|
||
Principle: **the LLM proposes, code disposes.** The LLM only contributes
|
||
creative flavor (a one-line "why today's word matters"); the daily answer is
|
||
picked deterministically by code from a pre-validated hopeful pool (every word
|
||
is guaranteed to be in the guess dictionary, so it's always typeable). Puzzles
|
||
are stored per (date, game, variant) so everyone gets the same one and shares
|
||
are comparable. Generation never blocks on or trusts the LLM for correctness.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import hashlib
|
||
import json
|
||
import random
|
||
import re
|
||
import sqlite3
|
||
from pathlib import Path
|
||
|
||
from . import bloom
|
||
|
||
_DATA = Path(__file__).parent / "data"
|
||
_POOL = json.loads((_DATA / "wordpool.json").read_text()) # curated static answer pool
|
||
# Guess dictionaries (same lists the client validates against) — used server-side to
|
||
# guarantee any admin-added answer is actually guessable.
|
||
_DICT = {v: set(json.loads((_DATA / f"words-{v}.json").read_text())) for v in ("5", "6")}
|
||
|
||
# Daily Word: 5 letters / 6 guesses · Long Word: 6 letters / 7 guesses.
|
||
WORD_VARIANTS = {"5": {"length": 5, "guesses": 6}, "6": {"length": 6, "guesses": 7}}
|
||
|
||
# Memory Match daily sync variants = "<tier>-<format>" (free play stays local).
|
||
MATCH_VARIANTS = {f"{t}-{f}" for t in ("gentle", "standard", "expert") for f in ("icons", "colors")}
|
||
|
||
|
||
def _seed(*parts: str) -> int:
|
||
return int(hashlib.sha256(":".join(parts).encode()).hexdigest(), 16)
|
||
|
||
|
||
def _db_pool(conn: sqlite3.Connection, variant: str) -> list[str]:
|
||
rows = conn.execute("SELECT word FROM word_pool WHERE variant=? ORDER BY word", (variant,)).fetchall()
|
||
return [r["word"] for r in rows]
|
||
|
||
|
||
def _db_removed(conn: sqlite3.Connection, variant: str) -> list[str]:
|
||
rows = conn.execute(
|
||
"SELECT word FROM word_pool_removed WHERE variant=? ORDER BY word", (variant,)
|
||
).fetchall()
|
||
return [r["word"] for r in rows]
|
||
|
||
|
||
def answer_pool(conn: sqlite3.Connection, variant: str) -> list[str]:
|
||
"""The day's answer pool = (curated static ∪ admin-added) − admin-removed.
|
||
|
||
Removals are tombstones, so even a baked-in curated word can be pulled
|
||
(e.g. on negative feedback) without editing the JSON or redeploying — and
|
||
a removal is reversible by lifting the tombstone (restore)."""
|
||
pool = (set(_POOL.get(variant, [])) | set(_db_pool(conn, variant))) - set(_db_removed(conn, variant))
|
||
return sorted(pool)
|
||
|
||
|
||
# --- Admin: Daily Word pool curation -------------------------------------------
|
||
|
||
def lookup_word(word: str) -> dict:
|
||
"""Is this word a valid, guessable answer candidate?"""
|
||
w = (word or "").strip().lower()
|
||
variant = str(len(w))
|
||
return {
|
||
"word": w,
|
||
"length": len(w),
|
||
"alpha": bool(w) and w.isalpha(),
|
||
"variant": variant if variant in WORD_VARIANTS else None,
|
||
"in_dictionary": w.isalpha() and w in _DICT.get(variant, set()),
|
||
}
|
||
|
||
|
||
def _validate_word(conn: sqlite3.Connection, w: str) -> tuple[str | None, str | None]:
|
||
"""Return (variant, error). variant is set only when w is a valid candidate."""
|
||
if not w.isalpha() or str(len(w)) not in WORD_VARIANTS:
|
||
return None, "not a 5- or 6-letter word"
|
||
variant = str(len(w))
|
||
if w not in _DICT[variant]:
|
||
return None, "not in the dictionary — it wouldn't be guessable"
|
||
return variant, None
|
||
|
||
|
||
def _put_word(conn: sqlite3.Connection, w: str, variant: str) -> None:
|
||
"""Make w a live answer: lift any tombstone, and record it as an addition
|
||
only if it isn't already part of the curated static list."""
|
||
if w not in set(_POOL.get(variant, [])):
|
||
conn.execute("INSERT OR IGNORE INTO word_pool (word, variant) VALUES (?, ?)", (w, variant))
|
||
conn.execute("DELETE FROM word_pool_removed WHERE variant=? AND word=?", (variant, w))
|
||
|
||
|
||
def add_pool_word(conn: sqlite3.Connection, word: str) -> dict:
|
||
"""Add a word to the answer pool. Enforces the invariant (alpha, 5/6 letters,
|
||
present in the guess dictionary) so the daily answer is always guessable."""
|
||
w = (word or "").strip().lower()
|
||
variant, err = _validate_word(conn, w)
|
||
if err:
|
||
return {"error": err}
|
||
live = (set(_POOL.get(variant, [])) | set(_db_pool(conn, variant))) - set(_db_removed(conn, variant))
|
||
if w in live:
|
||
return {"error": "already in the pool"}
|
||
_put_word(conn, w, variant)
|
||
conn.commit()
|
||
return {"word": w, "variant": variant, "added": True}
|
||
|
||
|
||
def remove_pool_word(conn: sqlite3.Connection, word: str) -> dict:
|
||
"""Remove ANY pool word — curated or admin-added — by laying down a tombstone.
|
||
Reversible via restore_pool_word. The daily picker subtracts tombstones."""
|
||
w = (word or "").strip().lower()
|
||
variant = str(len(w))
|
||
if variant not in WORD_VARIANTS:
|
||
return {"word": w, "removed": False}
|
||
conn.execute("INSERT OR IGNORE INTO word_pool_removed (word, variant) VALUES (?, ?)", (w, variant))
|
||
conn.commit()
|
||
return {"word": w, "variant": variant, "removed": True}
|
||
|
||
|
||
def restore_pool_word(conn: sqlite3.Connection, word: str) -> dict:
|
||
"""Undo a removal: lift the tombstone so the word rejoins the pool."""
|
||
w = (word or "").strip().lower()
|
||
variant = str(len(w))
|
||
cur = conn.execute("DELETE FROM word_pool_removed WHERE variant=? AND word=?", (variant, w))
|
||
conn.commit()
|
||
return {"word": w, "variant": variant, "restored": cur.rowcount > 0}
|
||
|
||
|
||
def import_pool_words(conn: sqlite3.Connection, words: list[str]) -> dict:
|
||
"""Bulk-add a vetted list. Validates each word (alpha · 5–6 · in dictionary),
|
||
ignores duplicates (already-live words), and reports rejects with reasons.
|
||
A word currently tombstoned is treated as importable (re-adding lifts it)."""
|
||
static = {v: set(_POOL.get(v, [])) for v in WORD_VARIANTS}
|
||
added = {v: set(_db_pool(conn, v)) for v in WORD_VARIANTS}
|
||
removed = {v: set(_db_removed(conn, v)) for v in WORD_VARIANTS}
|
||
accepted: list[str] = []
|
||
duplicates: list[str] = []
|
||
rejected: list[dict] = []
|
||
seen: set[str] = set()
|
||
for raw in words:
|
||
w = (raw or "").strip().lower()
|
||
if not w or w in seen:
|
||
continue
|
||
seen.add(w)
|
||
variant, err = _validate_word(conn, w)
|
||
if err:
|
||
rejected.append({"word": w, "reason": err})
|
||
continue
|
||
live = (static[variant] | added[variant]) - removed[variant]
|
||
if w in live:
|
||
duplicates.append(w)
|
||
continue
|
||
_put_word(conn, w, variant)
|
||
added[variant].add(w)
|
||
removed[variant].discard(w)
|
||
accepted.append(w)
|
||
conn.commit()
|
||
return {
|
||
"added": accepted,
|
||
"duplicates": duplicates,
|
||
"rejected": rejected,
|
||
"counts": {"added": len(accepted), "duplicates": len(duplicates), "rejected": len(rejected)},
|
||
}
|
||
|
||
|
||
def pool_summary(conn: sqlite3.Connection) -> dict:
|
||
"""Pool counts + the admin-added words and the removed (tombstoned) words, per variant."""
|
||
out = {}
|
||
for v in WORD_VARIANTS:
|
||
static = set(_POOL.get(v, []))
|
||
removed = set(_db_removed(conn, v))
|
||
added = [w for w in _db_pool(conn, v) if w not in removed]
|
||
total = len((static | set(added)) - removed)
|
||
out[v] = {
|
||
"curated": len(static),
|
||
"added": sorted(added),
|
||
"removed": sorted(removed),
|
||
"total": total,
|
||
}
|
||
return out
|
||
|
||
|
||
def _recent_answers(conn: sqlite3.Connection, variant: str, limit: int) -> set[str]:
|
||
rows = conn.execute(
|
||
"SELECT payload_json FROM daily_puzzles WHERE game='word' AND variant=? "
|
||
"ORDER BY puzzle_date DESC LIMIT ?",
|
||
(variant, limit),
|
||
).fetchall()
|
||
out = set()
|
||
for r in rows:
|
||
try:
|
||
out.add(json.loads(r["payload_json"])["answer"])
|
||
except (ValueError, KeyError, TypeError):
|
||
pass
|
||
return out
|
||
|
||
|
||
def _pick_answer(conn: sqlite3.Connection, date: str, variant: str) -> str:
|
||
pool = answer_pool(conn, variant)
|
||
if not pool: # safety net: removals must never empty the pool — fall back to curated
|
||
pool = sorted(_POOL.get(variant, []))
|
||
recent = _recent_answers(conn, variant, max(1, len(pool) // 2))
|
||
start = _seed(date, "word", variant) % len(pool)
|
||
for i in range(len(pool)):
|
||
cand = pool[(start + i) % len(pool)]
|
||
if cand not in recent:
|
||
return cand
|
||
return pool[start] # pool fully cycled — allow a repeat rather than fail
|
||
|
||
|
||
def _why(client, word: str) -> str | None:
|
||
if client is None:
|
||
return None
|
||
try:
|
||
msg = [
|
||
{"role": "system", "content": "You write one short, warm, plain sentence (no preamble, no quotes) — "
|
||
"a calm or gently interesting little note about the given word: what it "
|
||
"evokes, where we meet it in everyday life, or why it's pleasant to sit with."},
|
||
{"role": "user", "content": f"Word: {word}"},
|
||
]
|
||
text = (client.chat_text(msg) or "").strip().strip('"').replace("\n", " ")
|
||
return text[:200] or None
|
||
except Exception: # noqa: BLE001 — flavor only; never block puzzle creation
|
||
return None
|
||
|
||
|
||
def generate_word_puzzle(conn: sqlite3.Connection, date: str, variant: str, client=None) -> dict:
|
||
"""Ensure a Daily/Long Word puzzle exists for (date, variant). Idempotent.
|
||
Code picks the answer; the LLM only adds the optional 'why' (with fallback)."""
|
||
if variant not in WORD_VARIANTS:
|
||
variant = "5"
|
||
existing = conn.execute(
|
||
"SELECT payload_json FROM daily_puzzles WHERE puzzle_date=? AND game='word' AND variant=?",
|
||
(date, variant),
|
||
).fetchone()
|
||
if existing:
|
||
return json.loads(existing["payload_json"])
|
||
answer = _pick_answer(conn, date, variant)
|
||
payload = {
|
||
"answer": answer,
|
||
"why": _why(client, answer),
|
||
"length": WORD_VARIANTS[variant]["length"],
|
||
"guesses": WORD_VARIANTS[variant]["guesses"],
|
||
}
|
||
conn.execute(
|
||
"INSERT OR IGNORE INTO daily_puzzles (puzzle_date, game, variant, payload_json) VALUES (?, 'word', ?, ?)",
|
||
(date, variant, json.dumps(payload)),
|
||
)
|
||
conn.commit()
|
||
row = conn.execute(
|
||
"SELECT payload_json FROM daily_puzzles WHERE puzzle_date=? AND game='word' AND variant=?",
|
||
(date, variant),
|
||
).fetchone()
|
||
return json.loads(row["payload_json"])
|
||
|
||
|
||
def word_puzzle_response(conn: sqlite3.Connection, date: str, variant: str) -> dict:
|
||
"""Public puzzle shape — deliberately holds NO answer. Guesses are adjudicated
|
||
server-side (see adjudicate_word_guess), so the day's word never sits in the
|
||
network response for a curious user to read."""
|
||
p = generate_word_puzzle(conn, date, variant) # create on demand (no LLM) if missing
|
||
return {
|
||
"game": "word",
|
||
"variant": variant,
|
||
"date": date,
|
||
"length": p["length"],
|
||
"guesses": p["guesses"],
|
||
}
|
||
|
||
|
||
def _color(guess: str, answer: str) -> list[str]:
|
||
"""Two-pass Wordle colouring: greens first, then presents limited by counts."""
|
||
res = ["absent"] * len(answer)
|
||
counts: dict[str, int] = {}
|
||
for ch in answer:
|
||
counts[ch] = counts.get(ch, 0) + 1
|
||
for i, ch in enumerate(guess):
|
||
if i < len(answer) and ch == answer[i]:
|
||
res[i] = "correct"; counts[ch] -= 1
|
||
for i, ch in enumerate(guess):
|
||
if res[i] == "correct":
|
||
continue
|
||
if counts.get(ch, 0) > 0:
|
||
res[i] = "present"; counts[ch] -= 1
|
||
return res
|
||
|
||
|
||
def adjudicate_word_guess(conn: sqlite3.Connection, date: str, variant: str, guess: str, n: int) -> dict:
|
||
"""Colour a guess against the day's answer server-side. The answer (and 'why')
|
||
are revealed ONLY once solved or the guesses are spent — never up front."""
|
||
if variant not in WORD_VARIANTS:
|
||
variant = "5"
|
||
p = generate_word_puzzle(conn, date, variant)
|
||
length, maxg, answer = p["length"], p["guesses"], p["answer"]
|
||
guess = (guess or "").strip().lower()
|
||
if len(guess) != length or not guess.isalpha():
|
||
return {"error": "bad guess"}
|
||
solved = guess == answer
|
||
reveal = solved or n >= maxg
|
||
return {
|
||
"colors": _color(guess, answer),
|
||
"solved": solved,
|
||
"answer": answer if reveal else None,
|
||
"why": p.get("why") if reveal else None,
|
||
}
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Word Search — LLM proposes a theme + words, code validates and PLACES them in
|
||
# the grid (so it's always solvable). No answer to hide: the grid and word list
|
||
# are inherently visible; the play is finding them.
|
||
# ---------------------------------------------------------------------------
|
||
|
||
_DIRS = [(-1, -1), (-1, 0), (-1, 1), (0, -1), (0, 1), (1, -1), (1, 0), (1, 1)]
|
||
|
||
|
||
def _neighbour_fill(grid, cells, size: int) -> int:
|
||
"""Filled cells in a candidate's footprint+border that AREN'T its own cells —
|
||
a crowding measure, so placement can spread words out instead of clumping."""
|
||
own = set(cells)
|
||
rs = [r for r, _ in cells]
|
||
cs = [c for _, c in cells]
|
||
cnt = 0
|
||
for r in range(max(0, min(rs) - 1), min(size, max(rs) + 2)):
|
||
for c in range(max(0, min(cs) - 1), min(size, max(cs) + 2)):
|
||
if (r, c) not in own and grid[r][c] is not None:
|
||
cnt += 1
|
||
return cnt
|
||
|
||
# Size tiers. The three sizes draw DISJOINT word slices from the day's pool, so
|
||
# each is its own fresh puzzle (no repeats across sizes). small+med+large counts
|
||
# sum to WS_NEEDED, the minimum unique words a theme must supply.
|
||
WS_TIERS = {"small": {"grid": 8, "count": 6}, "med": {"grid": 11, "count": 9}, "large": {"grid": 14, "count": 13}}
|
||
_WS_ORDER = ["small", "med", "large"]
|
||
WS_NEEDED = sum(t["count"] for t in WS_TIERS.values()) # 28 unique words across the three
|
||
WS_TARGET = 32 # words to ask the LLM for
|
||
# Accept an LLM proposal only if it supplies enough UNIQUE words for all three
|
||
# disjoint puzzles; otherwise fall back to a curated theme (which always has enough).
|
||
WS_MIN_ACCEPT = WS_NEEDED
|
||
|
||
# Curated fallbacks — calm, neutral everyday scenes (4–8 letters, uppercase). Each
|
||
# has >= WS_NEEDED words so the three sizes get fully distinct sets.
|
||
_WS_FALLBACKS = [
|
||
("Around the house", ["TABLE", "CHAIR", "CLOCK", "SHELF", "COUCH", "PILLOW", "WINDOW", "CARPET", "MIRROR",
|
||
"CANDLE", "KETTLE", "DRAWER", "CLOSET", "CURTAIN", "CUSHION", "BASKET", "BOTTLE",
|
||
"TOWEL", "BROOM", "LADDER", "STAIRS", "PANTRY", "BLANKET", "VASE", "HALLWAY",
|
||
"DOORWAY", "MANTEL", "HAMPER", "GARAGE", "ATTIC"]),
|
||
("At the beach", ["WAVES", "SHELL", "SANDY", "TIDE", "SHORE", "TOWEL", "BREEZE", "SUNSET", "PEBBLE", "CORAL",
|
||
"OCEAN", "SAILS", "SURF", "SEAGULL", "BUCKET", "SPADE", "DUNES", "LAGOON", "DRIFT", "SALTY",
|
||
"SUNNY", "HORIZON", "COVE", "PIER", "SEAWEED", "FLIPPER", "PADDLE", "MARINA", "BREAKER",
|
||
"SANDAL"]),
|
||
("In the kitchen", ["BREAD", "SPOON", "PLATE", "KETTLE", "FLOUR", "APRON", "WHISK", "SUGAR", "BUTTER", "RECIPE",
|
||
"SIMMER", "PANTRY", "TEAPOT", "SAUCER", "LADLE", "KNIFE", "BOWL", "GRATER", "SKILLET",
|
||
"PEPPER", "GARLIC", "HONEY", "TOAST", "BATTER", "SPATULA", "COLANDER", "MIXER", "GRIDDLE",
|
||
"PITCHER", "NAPKIN"]),
|
||
("In the garden", ["BLOOM", "PETAL", "ROOTS", "LEAF", "GARDEN", "FLOWER", "SUNNY", "SEEDS", "MEADOW", "SPROUT",
|
||
"HEDGE", "TROWEL", "VINES", "SOIL", "SHRUB", "BUDS", "STALK", "DAISY", "TULIP", "FERNS",
|
||
"SHOVEL", "BRANCH", "BREEZE", "PEONY", "MOSS", "COMPOST", "BLOSSOM", "TENDRIL", "NECTAR",
|
||
"WATER"]),
|
||
("A walk outdoors", ["TRAIL", "MEADOW", "BROOK", "BIRDS", "BREEZE", "PEBBLE", "FOREST", "MAPLE", "ACORN",
|
||
"STREAM", "BRANCH", "VALLEY", "PATH", "HILLS", "RIVER", "FIELD", "CLOUDS", "LEAVES",
|
||
"MOSSY", "TWIGS", "FENCE", "BENCH", "RAMBLE", "BOULDER", "THICKET", "CLEARING",
|
||
"PASTURE", "ORCHARD", "HOLLOW", "SUNSET"]),
|
||
("Making music", ["PIANO", "DRUMS", "CHOIR", "MELODY", "GUITAR", "VIOLIN", "SINGER", "BALLAD", "RHYTHM",
|
||
"ENCORE", "TEMPO", "NOTES", "SONG", "FLUTE", "CELLO", "BRASS", "CHORD", "STRUM", "HARMONY",
|
||
"LYRICS", "BANJO", "ANTHEM", "TUNES", "TRUMPET", "ORGAN", "OCTAVE", "CONCERT", "SONATA",
|
||
"TREBLE", "MELODIC"]),
|
||
("Quiet calm", ["PEACE", "QUIET", "STILL", "SERENE", "REST", "SOOTHE", "GENTLE", "BREATHE", "CALM", "HUSH",
|
||
"DRIFT", "EASE", "DREAM", "RELAX", "MELLOW", "STEADY", "SETTLE", "LINGER", "PAUSE", "SLOW",
|
||
"SOFT", "WARM", "SILENCE", "REPOSE", "PLACID", "TRANQUIL", "QUIETLY", "UNWIND", "COZY",
|
||
"DROWSY"]),
|
||
("Small joys", ["SMILE", "LAUGH", "CHEER", "HAPPY", "MERRY", "DANCE", "DELIGHT", "GLOW", "PLAY", "GRIN",
|
||
"BEAM", "GLEE", "WARM", "GIGGLE", "SHARE", "TREAT", "SUNNY", "SWEET", "LUCKY", "CHARM",
|
||
"SPARK", "BLISS", "WONDER", "FROLIC", "CHUCKLE", "CHEERS", "JOYFUL", "SPARKLE", "TWINKLE",
|
||
"HUGS"]),
|
||
]
|
||
|
||
|
||
WS_WORD_MIN, WS_WORD_MAX = 4, 8
|
||
|
||
|
||
def _ws_clean_words(words: list[str]) -> list[str]:
|
||
"""Validate/clean a word-search list: alpha, 4–8 letters, uppercase, deduped."""
|
||
out, seen = [], set()
|
||
for w in words or []:
|
||
w = (w or "").strip().upper()
|
||
if w.isalpha() and WS_WORD_MIN <= len(w) <= WS_WORD_MAX and w not in seen:
|
||
seen.add(w)
|
||
out.append(w)
|
||
return out
|
||
|
||
|
||
def _ws_theme_bank(conn: sqlite3.Connection) -> list[tuple[str, list[str]]]:
|
||
"""Admin-authored themes (join the daily fallback rotation alongside curated)."""
|
||
out = []
|
||
for r in conn.execute("SELECT theme, words_json FROM wordsearch_themes ORDER BY id").fetchall():
|
||
try:
|
||
out.append((r["theme"], json.loads(r["words_json"])))
|
||
except (ValueError, TypeError):
|
||
pass
|
||
return out
|
||
|
||
|
||
def list_wordsearch_themes(conn: sqlite3.Connection) -> list[dict]:
|
||
rows = conn.execute(
|
||
"SELECT id, theme, words_json, created_at FROM wordsearch_themes ORDER BY id DESC"
|
||
).fetchall()
|
||
res = []
|
||
for r in rows:
|
||
try:
|
||
w = json.loads(r["words_json"])
|
||
except (ValueError, TypeError):
|
||
w = []
|
||
res.append({"id": r["id"], "theme": r["theme"], "words": w, "count": len(w), "created_at": r["created_at"]})
|
||
return res
|
||
|
||
|
||
def save_wordsearch_theme(conn: sqlite3.Connection, theme: str, words: list[str], tid: int | None = None) -> dict:
|
||
"""Add or update an admin theme. Requires a name and >= WS_NEEDED valid words
|
||
(enough for the three disjoint puzzles)."""
|
||
theme = (theme or "").strip()[:40]
|
||
clean = _ws_clean_words(words)
|
||
if not theme:
|
||
return {"error": "Give the theme a name."}
|
||
if len(clean) < WS_NEEDED:
|
||
return {"error": f"Need at least {WS_NEEDED} valid words (4–8 letters); you have {len(clean)}."}
|
||
if tid:
|
||
conn.execute("UPDATE wordsearch_themes SET theme=?, words_json=? WHERE id=?",
|
||
(theme, json.dumps(clean), tid))
|
||
else:
|
||
conn.execute("INSERT INTO wordsearch_themes (theme, words_json) VALUES (?, ?)",
|
||
(theme, json.dumps(clean)))
|
||
conn.commit()
|
||
return {"saved": True, "count": len(clean)}
|
||
|
||
|
||
def remove_wordsearch_theme(conn: sqlite3.Connection, tid: int) -> dict:
|
||
conn.execute("DELETE FROM wordsearch_themes WHERE id=?", (tid,))
|
||
conn.commit()
|
||
return {"removed": True}
|
||
|
||
|
||
def suggest_wordsearch_word(client, theme: str, existing: list[str]) -> dict:
|
||
"""AI assist: propose ONE fresh, valid word that fits the theme."""
|
||
if client is None:
|
||
return {"error": "AI assist isn't available right now."}
|
||
have = {(w or "").strip().upper() for w in (existing or [])}
|
||
try:
|
||
msg = [
|
||
{"role": "system", "content": "Reply with ONE single real English word, 4-8 letters, lowercase, "
|
||
"no punctuation or explanation."},
|
||
{"role": "user", "content": f"Give one word (4-8 letters) that fits a word-search puzzle themed "
|
||
f"'{theme}'. Avoid: {', '.join(sorted(have)[:80]).lower()}."},
|
||
]
|
||
for _ in range(4):
|
||
words = re.findall(r"[A-Za-z]+", client.chat_text(msg) or "")
|
||
hit = next((w.upper() for w in words
|
||
if w.isalpha() and WS_WORD_MIN <= len(w) <= WS_WORD_MAX and w.upper() not in have), None)
|
||
if hit:
|
||
return {"word": hit}
|
||
except Exception: # noqa: BLE001
|
||
pass
|
||
return {"error": "Couldn't find a fresh one — try adding it yourself."}
|
||
|
||
|
||
def _ws_propose(client) -> tuple[str, list[str]] | None:
|
||
"""LLM proposes a theme + words; code disposes (alpha / length / dedup)."""
|
||
if client is None:
|
||
return None
|
||
try:
|
||
msg = [
|
||
{"role": "system", "content": "You set up a calm word search. The theme can be uplifting OR just a "
|
||
"pleasant everyday scene (e.g. 'Around the house', 'At the beach', "
|
||
"'In the kitchen'). Reply exactly as two lines:\n"
|
||
"THEME: <2-4 word theme>\nWORDS: W1, W2, ... W28\n"
|
||
f"Give {WS_TARGET} single real words, 4-8 letters, UPPERCASE, related to the "
|
||
"theme, a good mix of lengths, nothing negative or unpleasant, no phrases."},
|
||
{"role": "user", "content": "Give me one calm theme."},
|
||
]
|
||
text = client.chat_text(msg) or ""
|
||
theme, words = None, []
|
||
for line in text.splitlines():
|
||
s = line.strip()
|
||
if s.upper().startswith("THEME:"):
|
||
theme = s.split(":", 1)[1].strip()[:40]
|
||
elif s.upper().startswith("WORDS:"):
|
||
words = [w.strip().upper() for w in re.split(r"[,\s]+", s.split(":", 1)[1]) if w.strip()]
|
||
words = [w for w in dict.fromkeys(words) if w.isalpha() and 4 <= len(w) <= 8]
|
||
if theme and len(words) >= WS_MIN_ACCEPT: # enough to fill Large + spare
|
||
return theme, words
|
||
except Exception: # noqa: BLE001 — fall back to a curated theme
|
||
pass
|
||
return None
|
||
|
||
|
||
def generate_wordsearch_puzzle(conn: sqlite3.Connection, date: str, client=None) -> dict:
|
||
"""Ensure today's theme + word list exists (idempotent). The per-size grid is
|
||
built at request time, so one LLM call serves all three sizes."""
|
||
existing = conn.execute(
|
||
"SELECT payload_json FROM daily_puzzles WHERE puzzle_date=? AND game='wordsearch' AND variant=''", (date,)
|
||
).fetchone()
|
||
if existing:
|
||
return json.loads(existing["payload_json"])
|
||
rng = random.Random(_seed(date, "wordsearch"))
|
||
proposed = _ws_propose(client)
|
||
if proposed:
|
||
theme, words = proposed
|
||
else:
|
||
# Fallback rotation = curated themes + admin-authored bank.
|
||
bank = _WS_FALLBACKS + _ws_theme_bank(conn)
|
||
theme, words = bank[rng.randrange(len(bank))]
|
||
words = [w.upper() for w in dict.fromkeys(words) if w.isalpha() and 4 <= len(w) <= 8]
|
||
payload = {"theme": theme, "words": words}
|
||
conn.execute(
|
||
"INSERT OR IGNORE INTO daily_puzzles (puzzle_date, game, variant, payload_json) VALUES (?, 'wordsearch', '', ?)",
|
||
(date, json.dumps(payload)),
|
||
)
|
||
conn.commit()
|
||
row = conn.execute(
|
||
"SELECT payload_json FROM daily_puzzles WHERE puzzle_date=? AND game='wordsearch' AND variant=''", (date,)
|
||
).fetchone()
|
||
return json.loads(row["payload_json"])
|
||
|
||
|
||
_WS_CROSS_TARGET = 0.5 # aim: about half the placements cross an existing word
|
||
|
||
|
||
def _zone(r: int, c: int, size: int) -> tuple[int, int]:
|
||
"""Which quadrant a cell falls in — coarse occupancy used to spread words."""
|
||
return (r * 2 // size, c * 2 // size)
|
||
|
||
|
||
def _place_words(words: list[str], size: int, seed: int) -> tuple[list[list[str | None]], list[tuple[str, list[tuple[int, int]]]]]:
|
||
"""Core placement (date-seeded, deterministic). Returns the letter grid (None
|
||
where unfilled) and [(word, cells)] for every word genuinely placed.
|
||
|
||
Interlock is a TARGET, not a side effect: each word either (a) must cross an
|
||
already-placed word — when crossings are running below ~half of placements —
|
||
or (b) anchors in open ground. Both modes steer toward the least crowded /
|
||
least developed quadrant, so crossings attach to lonely words at the edges of
|
||
structure rather than thickening one knot, and anchors spread across the
|
||
board. All valid spots are enumerated (the grid is tiny) — earlier random
|
||
sampling kept missing the rare crossing spots, which is why grids came out
|
||
as disconnected "clean" words."""
|
||
rng = random.Random(seed)
|
||
grid: list[list[str | None]] = [[None] * size for _ in range(size)]
|
||
zone_fill = {(zr, zc): 0 for zr in (0, 1) for zc in (0, 1)}
|
||
placements: list[tuple[str, list[tuple[int, int]]]] = []
|
||
crossed = 0
|
||
for word in sorted(words, key=len, reverse=True):
|
||
n = len(word)
|
||
if n > size:
|
||
continue
|
||
cands = [] # (overlap, cells) over every legal placement
|
||
for dr, dc in _DIRS:
|
||
for r0 in range(size):
|
||
for c0 in range(size):
|
||
if not (0 <= r0 + dr * (n - 1) < size and 0 <= c0 + dc * (n - 1) < size):
|
||
continue
|
||
cells = [(r0 + dr * i, c0 + dc * i) for i in range(n)]
|
||
if not all(grid[r][c] in (None, word[i]) for i, (r, c) in enumerate(cells)):
|
||
continue
|
||
cands.append((sum(1 for i, (r, c) in enumerate(cells) if grid[r][c] == word[i]), cells))
|
||
if not cands:
|
||
continue
|
||
crossing = [t for t in cands if t[0] > 0]
|
||
want_cross = bool(crossing) and crossed < _WS_CROSS_TARGET * len(placements)
|
||
scored = [] # (score, overlap, cells)
|
||
for overlap, cells in crossing if want_cross else cands:
|
||
crowd = _neighbour_fill(grid, cells, size)
|
||
zload = sum(zone_fill[_zone(r, c, size)] for r, c in cells) // n
|
||
# Crossing mode rewards extra overlaps; anchor mode is overlap-neutral
|
||
# (crowding already steers it to open ground).
|
||
scored.append(((overlap * 4 if want_cross else 0) - 2 * crowd - zload, overlap, cells))
|
||
scored.sort(key=lambda t: t[0], reverse=True)
|
||
top = [t for t in scored if t[0] >= scored[0][0] - 1] # near-best: variety without losing intent
|
||
_, overlap, cells = rng.choice(top)
|
||
for i, (r, c) in enumerate(cells):
|
||
if grid[r][c] is None:
|
||
grid[r][c] = word[i]
|
||
zone_fill[_zone(r, c, size)] += 1
|
||
placements.append((word, cells))
|
||
if overlap:
|
||
crossed += 1
|
||
return grid, placements
|
||
|
||
|
||
def _build_grid(words: list[str], size: int, seed: int) -> tuple[list[str], list[str]]:
|
||
"""Place words in a size×size grid (date-seeded, deterministic) and fill the
|
||
rest. Returns (rows, placed_words). Every returned word is genuinely placed."""
|
||
grid, placements = _place_words(words, size, seed)
|
||
rng = random.Random(_seed(str(seed), "fill"))
|
||
for r in range(size):
|
||
for c in range(size):
|
||
if grid[r][c] is None:
|
||
grid[r][c] = chr(65 + rng.randrange(26))
|
||
return ["".join(row) for row in grid], [w for w, _ in placements]
|
||
|
||
|
||
# --- Cross-device game state sync -------------------------------------------
|
||
# Per-puzzle progress is merged SERVER-SIDE on every save, so two devices
|
||
# converge regardless of push order. The merge is tailored to each game's nature.
|
||
|
||
def _merge_wordsearch(a: dict, b: dict) -> dict:
|
||
"""Union the found words (a find is monotonic — you can't un-find one, so the
|
||
union is always correct), credit the most ACTIVE play time either device has
|
||
banked (max — the clock only runs while the puzzle is on screen, so wall-clock
|
||
gaps between sittings never count), and keep the best (min) finish time."""
|
||
by_word = {}
|
||
for fw in list(a.get("foundWords") or []) + list(b.get("foundWords") or []):
|
||
w = fw.get("word") if isinstance(fw, dict) else None
|
||
if w and w not in by_word:
|
||
by_word[w] = fw
|
||
times = [m for m in (a.get("ms"), b.get("ms")) if m]
|
||
return {
|
||
"foundWords": list(by_word.values()),
|
||
"played": max(_int(a.get("played")), _int(b.get("played"))),
|
||
"ms": min(times) if times else 0,
|
||
}
|
||
|
||
|
||
def _word_rank(s: dict) -> tuple:
|
||
return (1 if s.get("status") in ("won", "lost") else 0, len(s.get("guesses") or []))
|
||
|
||
|
||
def _merge_word(a: dict, b: dict) -> dict:
|
||
"""Furthest progress wins: a finished game beats in-progress, more guesses
|
||
beats fewer. Picks one device's game WHOLE — never splices guess sequences."""
|
||
return a if _word_rank(a) >= _word_rank(b) else b
|
||
|
||
|
||
def _merge_bloom(a: dict, b: dict) -> dict:
|
||
"""Union found words — a find is monotonic (you can't un-find one), so the
|
||
union across devices is always correct. Score is recomputed by the sanitizer."""
|
||
found, seen = [], set()
|
||
for w in list(a.get("found") or []) + list(b.get("found") or []):
|
||
if isinstance(w, str) and w not in seen:
|
||
seen.add(w)
|
||
found.append(w)
|
||
return {"found": found}
|
||
|
||
|
||
def merge_game_state(game: str, a: dict | None, b: dict | None) -> dict:
|
||
if not a:
|
||
return dict(b or {})
|
||
if not b:
|
||
return dict(a or {})
|
||
if game == "wordsearch":
|
||
return _merge_wordsearch(a, b)
|
||
if game == "bloom":
|
||
return _merge_bloom(a, b)
|
||
if game == "match":
|
||
return _merge_match(a, b)
|
||
return _merge_word(a, b)
|
||
|
||
|
||
def load_game_state(conn: sqlite3.Connection, user_id: int, game: str, variant: str, date: str) -> dict | None:
|
||
row = conn.execute(
|
||
"SELECT state_json FROM game_state WHERE user_id=? AND game=? AND variant=? AND puzzle_date=?",
|
||
(user_id, game, variant, date),
|
||
).fetchone()
|
||
if not row:
|
||
return None
|
||
try:
|
||
return json.loads(row["state_json"])
|
||
except (ValueError, TypeError):
|
||
return None
|
||
|
||
|
||
def _int(x) -> int:
|
||
try:
|
||
return int(x)
|
||
except (TypeError, ValueError):
|
||
return 0
|
||
|
||
|
||
_WS_MS_CAP = 86_400_000 # clamp client-sent timings to one day — beyond that is junk
|
||
|
||
|
||
def _ms(x) -> int:
|
||
return max(0, min(_int(x), _WS_MS_CAP))
|
||
|
||
|
||
def _sanitize_wordsearch(conn: sqlite3.Connection, variant: str, date: str, state: dict) -> dict:
|
||
"""Trust only finds that are real for THIS puzzle: word in the day's list and
|
||
cells that actually spell it in the grid (validated when the puzzle exists,
|
||
shape-only otherwise). Dedupes, renumbers colours, and derives completion from
|
||
the real word count — never from a client-sent `ms` alone."""
|
||
words: list[str] = []
|
||
grid: list[str] = []
|
||
if variant in WS_TIERS and conn.execute(
|
||
"SELECT 1 FROM daily_puzzles WHERE puzzle_date=? AND game='wordsearch' AND variant=''", (date,)
|
||
).fetchone():
|
||
try:
|
||
p = wordsearch_response(conn, date, variant) # read-only; today's puzzle already exists
|
||
words, grid = list(p.get("words") or []), list(p.get("grid") or [])
|
||
except Exception: # noqa: BLE001
|
||
words, grid = [], []
|
||
wset = set(words)
|
||
clean: list[dict] = []
|
||
seen: set[str] = set()
|
||
for fw in (state.get("foundWords") or []):
|
||
if not isinstance(fw, dict):
|
||
continue
|
||
w, cells = fw.get("word"), fw.get("cells")
|
||
if not isinstance(w, str) or w in seen or not isinstance(cells, list):
|
||
continue
|
||
try:
|
||
cells = [[int(r), int(c)] for r, c in cells]
|
||
except (TypeError, ValueError):
|
||
continue
|
||
if len(cells) != len(w):
|
||
continue
|
||
if grid: # validate the find spells the word in the real grid
|
||
if w not in wset:
|
||
continue
|
||
spelled = "".join(grid[r][c] for r, c in cells if 0 <= r < len(grid) and 0 <= c < len(grid[r]))
|
||
if spelled != w:
|
||
continue
|
||
elif not (4 <= len(w) <= 12 and w.isalpha()): # no puzzle to check against → shape only
|
||
continue
|
||
seen.add(w)
|
||
clean.append({"word": w, "cells": cells, "ci": len(clean) % 10})
|
||
done = bool(words) and len(clean) == len(words)
|
||
return {"foundWords": clean, "played": _ms(state.get("played")),
|
||
"ms": _ms(state.get("ms")) if done else 0}
|
||
|
||
|
||
_WORD_COLOURS = {"absent", "present", "correct"}
|
||
|
||
|
||
def _sanitize_word(variant: str, state: dict) -> dict:
|
||
"""Validate shapes: status enum, guess count/length, colour rows, terminal fields."""
|
||
n = WORD_VARIANTS[variant]["length"]
|
||
maxg = WORD_VARIANTS[variant]["guesses"]
|
||
status = state.get("status") if state.get("status") in ("playing", "won", "lost") else "playing"
|
||
guesses = [g.lower() for g in (state.get("guesses") or [])[:maxg]
|
||
if isinstance(g, str) and len(g) == n and g.isalpha()]
|
||
cols = []
|
||
if isinstance(state.get("cols"), list):
|
||
for row in state["cols"][:len(guesses)]:
|
||
cols.append([c for c in row if c in _WORD_COLOURS][:n] if isinstance(row, list) else [])
|
||
out = {"guesses": guesses, "cols": cols, "status": status}
|
||
if status in ("won", "lost"):
|
||
ans = state.get("answer")
|
||
if isinstance(ans, str) and len(ans) == n and ans.isalpha():
|
||
out["answer"] = ans.lower()
|
||
if isinstance(state.get("why"), str):
|
||
out["why"] = state["why"][:600]
|
||
return out
|
||
|
||
|
||
def _sanitize_bloom(conn: sqlite3.Connection, date: str, state: dict) -> dict:
|
||
"""Trust only finds real for THIS wheel — a word in the day's DYNAMIC accept
|
||
set (broad dict + overrides, computed live; shape-only if the puzzle doesn't
|
||
exist yet). Dedupes and recomputes score server-side; Full Bloom = reaching the
|
||
designed puzzle's total (max_score). Never trusts a client-sent score/full."""
|
||
payload = bloom.stored_payload(conn, date)
|
||
valid = (set(bloom.accepted_words(conn, payload["center"], payload["outer"], True))
|
||
if payload else None)
|
||
clean, seen = [], set()
|
||
for w in (state.get("found") or []):
|
||
if not isinstance(w, str):
|
||
continue
|
||
w = w.strip().lower()
|
||
if not w or w in seen:
|
||
continue
|
||
if valid is not None:
|
||
if w not in valid:
|
||
continue
|
||
elif not (len(w) >= 4 and w.isalpha() and "s" not in w): # no puzzle yet → shape only
|
||
continue
|
||
seen.add(w)
|
||
clean.append(w)
|
||
clean.sort()
|
||
score = bloom.score_words(payload, clean) if payload else 0
|
||
out = {"found": clean, "score": score}
|
||
if payload and clean and score >= payload.get("max_score", 1):
|
||
out["full"] = True # Full Bloom — found the whole designed puzzle
|
||
return out
|
||
|
||
|
||
_MATCH_MAX_FACES = 12 # the largest board uses 8 faces; cap generously
|
||
_MATCH_FACES = {"gentle": 6, "standard": 8, "expert": 8} # faces per tier = completion target
|
||
# Valid face keys — MIRRORS the frontend (icons.js ICON_KEYS + palette.js COLOR_KEYS).
|
||
# Matched keys are validated against this so bogus/junk keys can't inflate the
|
||
# completion count. Adding a face on the frontend? Add it here too; a missing key only
|
||
# under-counts (benign, self-heals once synced), never crashes.
|
||
_MATCH_FACE_KEYS = frozenset({
|
||
"sun", "moon", "star", "cloud", "raindrop", "wave", "leaf", "flower", "seedling",
|
||
"tree", "mountain", "shell", "feather", "acorn", "butterfly", "rainbow", "heart",
|
||
"sparkle", "home", "book", "teacup", "candle", "lantern", "compass", "kite", "note",
|
||
"boat", "fish", "bird", "mushroom", "bell", "snowflake", "clover",
|
||
"color-rose", "color-coral", "color-amber", "color-gold", "color-lime", "color-green",
|
||
"color-teal", "color-cyan", "color-sky", "color-blue", "color-indigo", "color-violet",
|
||
"color-plum", "color-brown", "color-sand", "color-slate", "color-charcoal", "color-cream",
|
||
})
|
||
|
||
|
||
def _match_faces(variant: str) -> int:
|
||
return _MATCH_FACES.get((variant or "").split("-", 1)[0], 8)
|
||
|
||
|
||
def _sanitize_match(variant: str, state: dict) -> dict:
|
||
"""Light, durability-only sanitize. Memory Match has nothing to cheat — the
|
||
board is deterministic and fully visible, with no score/leaderboard — so we
|
||
just drop malformed junk: matched FACE KEYS (icon name / color key, never raw
|
||
indices, so progress survives layout tweaks), validated against the real face set
|
||
(junk can't count), deduped, with a clamped move count. `done` is DERIVED from the
|
||
matched count vs the tier's face target — never trusted from the client, so a
|
||
stale/bogus flag can't mark a board cleared (matters once the ritual reads it)."""
|
||
seen: set[str] = set()
|
||
matched: list[str] = []
|
||
for k in (state.get("matched") or []):
|
||
if isinstance(k, str) and k in _MATCH_FACE_KEYS and k not in seen:
|
||
seen.add(k)
|
||
matched.append(k)
|
||
if len(matched) >= _MATCH_MAX_FACES:
|
||
break
|
||
return {"matched": matched, "moves": max(0, min(_int(state.get("moves")), 100_000)),
|
||
"done": len(matched) >= _match_faces(variant)}
|
||
|
||
|
||
def _merge_match(a: dict, b: dict) -> dict:
|
||
"""Union matched faces across devices, keep the larger move count. `done` is not
|
||
carried here — the post-merge sanitize re-derives it from the matched count."""
|
||
matched = list(dict.fromkeys([*(a.get("matched") or []), *(b.get("matched") or [])]))[:_MATCH_MAX_FACES]
|
||
return {"matched": matched, "moves": max(_int(a.get("moves")), _int(b.get("moves")))}
|
||
|
||
|
||
def sanitize_game_state(conn: sqlite3.Connection, game: str, variant: str, date: str, state: dict) -> dict:
|
||
"""Never trust client JSON at the storage layer — normalize before merge/store."""
|
||
if game == "wordsearch":
|
||
return _sanitize_wordsearch(conn, variant, date, state or {})
|
||
if game == "bloom":
|
||
return _sanitize_bloom(conn, date, state or {})
|
||
if game == "match":
|
||
return _sanitize_match(variant, state or {})
|
||
return _sanitize_word(variant, state or {})
|
||
|
||
|
||
def save_game_state(conn: sqlite3.Connection, user_id: int, game: str, variant: str,
|
||
date: str, incoming: dict) -> dict:
|
||
"""Sanitize → merge with the stored state (server-authoritative) → sanitize → persist."""
|
||
clean_in = sanitize_game_state(conn, game, variant, date, incoming or {})
|
||
stored = load_game_state(conn, user_id, game, variant, date)
|
||
merged = sanitize_game_state(conn, game, variant, date, merge_game_state(game, stored, clean_in))
|
||
conn.execute(
|
||
"INSERT INTO game_state (user_id, game, variant, puzzle_date, state_json, updated_at) "
|
||
"VALUES (?,?,?,?,?,CURRENT_TIMESTAMP) "
|
||
"ON CONFLICT(user_id, game, variant, puzzle_date) DO UPDATE SET "
|
||
"state_json=excluded.state_json, updated_at=CURRENT_TIMESTAMP",
|
||
(user_id, game, variant, date, json.dumps(merged)),
|
||
)
|
||
conn.commit()
|
||
return merged
|
||
|
||
|
||
def game_stats(conn: sqlite3.Connection, user_id: int, game: str, variant: str) -> dict:
|
||
"""Derive the player's record for a variant from their synced states, so
|
||
streak / distribution / best are consistent across devices (not per-device counters)."""
|
||
rows = conn.execute(
|
||
"SELECT puzzle_date, state_json FROM game_state "
|
||
"WHERE user_id=? AND game=? AND variant=? ORDER BY puzzle_date DESC",
|
||
(user_id, game, variant),
|
||
).fetchall()
|
||
states = []
|
||
for r in rows:
|
||
try:
|
||
states.append(json.loads(r["state_json"]))
|
||
except (ValueError, TypeError):
|
||
pass
|
||
if game == "wordsearch":
|
||
times = [s.get("ms") for s in states if s.get("ms")]
|
||
return {"completed": sum(1 for s in states if s.get("ms")), "best": min(times) if times else 0}
|
||
if game == "bloom":
|
||
# Calm, no-pressure record: days played, lifetime words, Full Blooms, and
|
||
# the best tier ever reached (computed per day from that wheel's tiers).
|
||
tier_names = [t[0] for t in bloom.TIER_PCTS]
|
||
played = words = full = 0
|
||
best_idx = -1
|
||
for r in rows:
|
||
try:
|
||
s = json.loads(r["state_json"])
|
||
except (ValueError, TypeError):
|
||
continue
|
||
found = s.get("found") or []
|
||
if not found:
|
||
continue
|
||
played += 1
|
||
words += len(found)
|
||
if s.get("full"):
|
||
full += 1
|
||
p = bloom.stored_payload(conn, r["puzzle_date"])
|
||
if p:
|
||
sc = s.get("score") or 0
|
||
idx = max((i for i, t in enumerate(p["tiers"]) if sc >= t["score"]), default=0)
|
||
best_idx = max(best_idx, idx)
|
||
return {"played": played, "words": words, "full_blooms": full,
|
||
"best_tier": tier_names[best_idx] if best_idx >= 0 else None}
|
||
played = won = 0
|
||
dist: dict[int, int] = {}
|
||
streak = 0
|
||
streak_open = True # consecutive wins counting back from newest (matches the old local counter)
|
||
for s in states:
|
||
st = s.get("status")
|
||
if st in ("won", "lost"):
|
||
played += 1
|
||
if st == "won":
|
||
won += 1
|
||
g = len(s.get("guesses") or [])
|
||
dist[g] = dist.get(g, 0) + 1
|
||
if streak_open:
|
||
streak += 1
|
||
elif st == "lost":
|
||
streak_open = False
|
||
# 'playing' (e.g. today unfinished) neither counts nor breaks the streak
|
||
return {"played": played, "won": won, "streak": streak, "dist": dist}
|
||
|
||
|
||
def wordsearch_response(conn: sqlite3.Connection, date: str, size: str = "med") -> dict:
|
||
"""Public shape for a size tier: theme + placed words + grid. The grid is meant
|
||
to be seen — the play is finding the words — so there's nothing to hide."""
|
||
if size not in WS_TIERS:
|
||
size = "med"
|
||
p = generate_wordsearch_puzzle(conn, date) # on-demand (curated fallback) if missing
|
||
tier = WS_TIERS[size]
|
||
# Shuffle the day's words once (date-seeded → same order for every size) and hand
|
||
# each size a DISJOINT slice, so the three sizes are entirely distinct puzzles.
|
||
words = list(p["words"])
|
||
random.Random(_seed(date, "wordsearch", "shuffle")).shuffle(words)
|
||
start = sum(WS_TIERS[s]["count"] for s in _WS_ORDER[:_WS_ORDER.index(size)])
|
||
chosen = words[start:start + tier["count"]]
|
||
grid, placed = _build_grid(chosen, tier["grid"], _seed(date, "wordsearch", size))
|
||
return {"game": "wordsearch", "date": date, "size": size, "theme": p["theme"],
|
||
"words": placed, "grid": grid}
|
||
|
||
|
||
def generate_daily_puzzles(conn: sqlite3.Connection, date: str, client=None) -> int:
|
||
"""Cycle hook: pre-generate today's puzzles (word + word search) with the LLM."""
|
||
made = 0
|
||
for variant in WORD_VARIANTS:
|
||
before = conn.execute(
|
||
"SELECT 1 FROM daily_puzzles WHERE puzzle_date=? AND game='word' AND variant=?", (date, variant)
|
||
).fetchone()
|
||
if not before:
|
||
generate_word_puzzle(conn, date, variant, client=client)
|
||
made += 1
|
||
if not conn.execute(
|
||
"SELECT 1 FROM daily_puzzles WHERE puzzle_date=? AND game='wordsearch' AND variant=''", (date,)
|
||
).fetchone():
|
||
generate_wordsearch_puzzle(conn, date, client=client)
|
||
made += 1
|
||
if not conn.execute(
|
||
"SELECT 1 FROM daily_puzzles WHERE puzzle_date=? AND game='bloom' AND variant=''", (date,)
|
||
).fetchone():
|
||
bloom.generate_bloom_puzzle(conn, date) # pure code, no LLM
|
||
made += 1
|
||
return made
|