Files
upbeatBytes/goodnews/llm.py
T
thejayman77 a47a1504c8 Phase B1: multi-tag groupings model (backend)
Three-layer organization: primary topic (one per article, for ranking and
brief balance) + grouping tags (1-4 per article from a controlled vocabulary,
the organic "wandering" axis) + tonal flavor.

- taxonomy: add technology + learning topics; 4 calm tag families
  (Discovery & Wonder, People & Kindness, Solutions & Progress, Mind & Craft)
  defined in code, not the DB; ALLOWED_TAGS union + coerce_tags validation.
- db: article_tags(article_id, tag) join table + tag index.
- llm: tags added to the classifier json_schema (enum-constrained, maxItems 4)
  and system prompt; normalize_scores coerces tags; upsert_article_score
  replaces a row's tags atomically on every (re)classification.
- queries: feed gains a tag filter and exposes tags via group_concat; tag_counts.
- api: Article.tags, feed tag param, and /api/families with per-tag counts.
- tests: coerce/normalize/upsert/tag-filter/reclassify-replace/tag_counts +
  /api/families. 99 passing.

Corpus reclassify (re-tag + new primary topics) runs separately against the
local LLM. Frontend (B2) pairs with this; the live site is unchanged until then.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-01 18:35:25 +00:00

426 lines
16 KiB
Python

from __future__ import annotations
import json
import os
import sqlite3
import urllib.error
import urllib.request
from collections.abc import Callable
from dataclasses import dataclass
from .taxonomy import (
ALLOWED_TAGS,
FLAVORS,
MAX_TAGS,
TOPICS,
coerce_flavor,
coerce_tags,
coerce_topic,
flavors_prompt_block,
tags_prompt_block,
topics_prompt_block,
)
DEFAULT_BASE_URL = "http://127.0.0.1:1234/v1"
DEFAULT_MODEL = "gpt-oss"
DEFAULT_EMBED_MODEL = "text-embedding-nomic-embed-text-v1.5"
DEFAULT_TIMEOUT = 180
# Structured-output schema. Newer LM Studio / OpenAI-compatible servers want a
# json_schema response_format (older ones took json_object); we try schema first
# and fall back gracefully so the client works across server versions.
_SCORE_FIELD = {"type": "integer", "minimum": 0, "maximum": 10}
CLASSIFICATION_SCHEMA = {
"type": "object",
"additionalProperties": False,
"required": [
"constructive_score",
"cortisol_score",
"ragebait_score",
"agency_score",
"human_benefit_score",
"novelty_score",
"pr_risk_score",
"accepted",
"topic",
"flavor",
"tags",
"reason_code",
"reason_text",
],
"properties": {
"constructive_score": _SCORE_FIELD,
"cortisol_score": _SCORE_FIELD,
"ragebait_score": _SCORE_FIELD,
"agency_score": _SCORE_FIELD,
"human_benefit_score": _SCORE_FIELD,
"novelty_score": _SCORE_FIELD,
"pr_risk_score": _SCORE_FIELD,
"accepted": {"type": "boolean"},
"topic": {"type": "string", "enum": list(TOPICS)},
"flavor": {"type": "string", "enum": list(FLAVORS)},
"tags": {"type": "array", "items": {"type": "string", "enum": list(ALLOWED_TAGS)}, "maxItems": MAX_TAGS},
"reason_code": {"type": "string"},
"reason_text": {"type": "string"},
},
}
# Response-format variants tried in order. Once one succeeds for a client, it is
# pinned so we stop paying failed round-trips on every subsequent call.
_RESPONSE_FORMATS = (
{"type": "json_schema", "json_schema": {"name": "classification", "strict": True, "schema": CLASSIFICATION_SCHEMA}},
{"type": "json_object"},
None,
)
SYSTEM_PROMPT = """You classify article metadata for a calm constructive-news digest.
Judge emotional aftertaste, not simple positivity. Accept stories that leave a reader informed without feeling drained, especially when they include repair, progress, agency, resilience, human benefit, scientific discovery, environmental improvement, community action, or useful perspective.
Reject stories centered on fear, outrage, partisan conflict, crime, tragedy, disaster repetition, celebrity drama, market panic, or corporate PR without clear public benefit.
Also assign one primary topic and one flavor (the single best fit), plus 1-4 grouping tags.
Primary topic (what the story is mainly about):
{topics}
Flavor (why it belongs in a calm, uplifting digest):
{flavors}
Grouping tags — choose ONLY from this controlled vocabulary:
{tags}
Tag discipline: assign 1-4 tags; prefer fewer, stronger ones; never tag by weak
association; pick tags a reader would reasonably use to find this story later.
Return only JSON with this exact shape:
{{
"constructive_score": 0,
"cortisol_score": 0,
"ragebait_score": 0,
"agency_score": 0,
"human_benefit_score": 0,
"novelty_score": 0,
"pr_risk_score": 0,
"accepted": false,
"topic": "one_of_the_allowed_topics",
"flavor": "one_of_the_allowed_flavors",
"tags": ["one_to_four_allowed_tags"],
"reason_code": "short_snake_case",
"reason_text": "one concise sentence"
}}
""".format(topics=topics_prompt_block(), flavors=flavors_prompt_block(), tags=tags_prompt_block())
@dataclass
class LocalModelClient:
base_url: str
model: str
api_key: str | None = None
timeout: int = DEFAULT_TIMEOUT
embed_model: str = DEFAULT_EMBED_MODEL
# Index into _RESPONSE_FORMATS that the server accepts; discovered lazily.
_response_format_idx: int | None = None
@classmethod
def from_env(cls) -> "LocalModelClient":
return cls(
base_url=os.environ.get("GOODNEWS_LLM_BASE_URL", DEFAULT_BASE_URL).rstrip("/"),
model=os.environ.get("GOODNEWS_LLM_MODEL", DEFAULT_MODEL),
api_key=os.environ.get("GOODNEWS_LLM_API_KEY"),
timeout=int(os.environ.get("GOODNEWS_LLM_TIMEOUT", DEFAULT_TIMEOUT)),
embed_model=os.environ.get("GOODNEWS_EMBED_MODEL", DEFAULT_EMBED_MODEL),
)
def embed(self, texts: list[str]) -> list[list[float]]:
"""Return embedding vectors for a batch of texts via /embeddings."""
body = json.dumps({"model": self.embed_model, "input": texts}).encode("utf-8")
headers = {"Content-Type": "application/json"}
if self.api_key:
headers["Authorization"] = f"Bearer {self.api_key}"
request = urllib.request.Request(
f"{self.base_url}/embeddings", data=body, headers=headers, method="POST"
)
try:
with urllib.request.urlopen(request, timeout=self.timeout) as response:
data = json.loads(response.read().decode("utf-8"))
except urllib.error.HTTPError as exc:
detail = exc.read().decode("utf-8", errors="replace")
raise RuntimeError(f"HTTP {exc.code} from embeddings: {detail}") from exc
except urllib.error.URLError as exc:
raise RuntimeError(f"could not reach embeddings at {self.base_url}: {exc.reason}") from exc
try:
return [item["embedding"] for item in data["data"]]
except (KeyError, TypeError) as exc:
raise RuntimeError(f"unexpected embeddings response: {data}") from exc
def classify(self, article: sqlite3.Row) -> dict:
messages = [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": _article_prompt(article)},
]
# If we already learned which response_format the server accepts, use it.
if self._response_format_idx is not None:
return self._chat(self._build_payload(messages, _RESPONSE_FORMATS[self._response_format_idx]))
# Otherwise escalate through the variants, pinning the first that works.
last_exc: RuntimeError | None = None
for idx, fmt in enumerate(_RESPONSE_FORMATS):
try:
result = self._chat(self._build_payload(messages, fmt))
self._response_format_idx = idx
return result
except RuntimeError as exc:
if "HTTP 400" not in str(exc):
raise
last_exc = exc
raise last_exc if last_exc else RuntimeError("no usable response_format")
def _build_payload(self, messages: list[dict], response_format: dict | None) -> dict:
payload = {"model": self.model, "temperature": 0.1, "messages": messages}
if response_format is not None:
payload["response_format"] = response_format
return payload
def list_models(self) -> list[str]:
headers = {}
if self.api_key:
headers["Authorization"] = f"Bearer {self.api_key}"
request = urllib.request.Request(f"{self.base_url}/models", headers=headers)
try:
with urllib.request.urlopen(request, timeout=10) as response:
data = json.loads(response.read().decode("utf-8"))
except urllib.error.HTTPError as exc:
detail = exc.read().decode("utf-8", errors="replace")
raise RuntimeError(f"HTTP {exc.code} from local model: {detail}") from exc
except urllib.error.URLError as exc:
raise RuntimeError(f"could not reach local model at {self.base_url}: {exc.reason}") from exc
models = data.get("data", [])
names = []
for model in models:
if isinstance(model, dict) and model.get("id"):
names.append(str(model["id"]))
return names
def _chat(self, payload: dict) -> dict:
body = json.dumps(payload).encode("utf-8")
headers = {"Content-Type": "application/json"}
if self.api_key:
headers["Authorization"] = f"Bearer {self.api_key}"
request = urllib.request.Request(
f"{self.base_url}/chat/completions",
data=body,
headers=headers,
method="POST",
)
try:
with urllib.request.urlopen(request, timeout=self.timeout) as response:
data = json.loads(response.read().decode("utf-8"))
except urllib.error.HTTPError as exc:
detail = exc.read().decode("utf-8", errors="replace")
raise RuntimeError(f"HTTP {exc.code} from local model: {detail}") from exc
except urllib.error.URLError as exc:
raise RuntimeError(f"could not reach local model at {self.base_url}: {exc.reason}") from exc
try:
content = data["choices"][0]["message"]["content"]
except (KeyError, IndexError, TypeError) as exc:
raise RuntimeError(f"unexpected local model response: {data}") from exc
return parse_classifier_json(content)
@dataclass
class ClassifyReport:
results: list[tuple[int, dict]]
attempted: int
succeeded: int
skipped: int
def classify_articles(
conn: sqlite3.Connection,
client: LocalModelClient,
limit: int,
include_rejected: bool = False,
dry_run: bool = False,
only_unclassified: bool = False,
progress: "Callable[[int, int, int], None] | None" = None,
) -> ClassifyReport:
rows = _classification_candidates(
conn, limit=limit, include_rejected=include_rejected, only_unclassified=only_unclassified
)
results = []
skipped = 0
for index, row in enumerate(rows, start=1):
try:
scores = client.classify(row)
except RuntimeError as exc:
# One slow/failed article (timeout, bad response) shouldn't sink the
# whole batch or discard work already committed. Skip and continue.
skipped += 1
print(f"[{row['id']}] skipped: {exc}")
continue
scores = normalize_scores(scores, model_name=client.model)
results.append((row["id"], scores))
if not dry_run:
upsert_article_score(conn, row["id"], scores)
conn.commit()
if progress is not None:
progress(index, len(rows), row["id"])
return ClassifyReport(results=results, attempted=len(rows), succeeded=len(results), skipped=skipped)
def parse_classifier_json(content: str) -> dict:
content = content.strip()
try:
return json.loads(content)
except json.JSONDecodeError:
start = content.find("{")
end = content.rfind("}")
if start == -1 or end == -1 or end <= start:
raise RuntimeError(f"model did not return JSON: {content}")
return json.loads(content[start : end + 1])
def normalize_scores(data: dict, model_name: str) -> dict:
return {
"constructive_score": _bounded_int(data.get("constructive_score")),
"cortisol_score": _bounded_int(data.get("cortisol_score")),
"ragebait_score": _bounded_int(data.get("ragebait_score")),
"agency_score": _bounded_int(data.get("agency_score")),
"human_benefit_score": _bounded_int(data.get("human_benefit_score")),
"novelty_score": _bounded_int(data.get("novelty_score")),
"pr_risk_score": _bounded_int(data.get("pr_risk_score")),
"accepted": 1 if bool(data.get("accepted")) else 0,
"topic": coerce_topic(data.get("topic")),
"flavor": coerce_flavor(data.get("flavor")),
"tags": coerce_tags(data.get("tags")),
"reason_code": str(data.get("reason_code") or "model_no_reason")[:120],
"reason_text": str(data.get("reason_text") or "")[:1000],
"model_name": model_name,
}
def upsert_article_score(conn: sqlite3.Connection, article_id: int, scores: dict) -> None:
conn.execute(
"""
INSERT INTO article_scores (
article_id, constructive_score, cortisol_score, ragebait_score,
agency_score, human_benefit_score, novelty_score, pr_risk_score,
accepted, topic, flavor, reason_code, reason_text, model_name, scored_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
ON CONFLICT(article_id) DO UPDATE SET
constructive_score = excluded.constructive_score,
cortisol_score = excluded.cortisol_score,
ragebait_score = excluded.ragebait_score,
agency_score = excluded.agency_score,
human_benefit_score = excluded.human_benefit_score,
novelty_score = excluded.novelty_score,
pr_risk_score = excluded.pr_risk_score,
accepted = excluded.accepted,
topic = excluded.topic,
flavor = excluded.flavor,
reason_code = excluded.reason_code,
reason_text = excluded.reason_text,
model_name = excluded.model_name,
scored_at = CURRENT_TIMESTAMP
""",
(
article_id,
scores["constructive_score"],
scores["cortisol_score"],
scores["ragebait_score"],
scores["agency_score"],
scores["human_benefit_score"],
scores["novelty_score"],
scores["pr_risk_score"],
scores["accepted"],
scores["topic"],
scores["flavor"],
scores["reason_code"],
scores["reason_text"],
scores["model_name"],
),
)
# Replace this article's grouping tags (controlled vocabulary, 0-4).
conn.execute("DELETE FROM article_tags WHERE article_id = ?", (article_id,))
for tag in scores.get("tags") or []:
conn.execute(
"INSERT OR IGNORE INTO article_tags (article_id, tag) VALUES (?, ?)", (article_id, tag)
)
def _classification_candidates(
conn: sqlite3.Connection,
limit: int,
include_rejected: bool,
only_unclassified: bool = False,
) -> list[sqlite3.Row]:
filters = []
if not include_rejected:
filters.append("(s.accepted = 1 OR s.constructive_score >= 4)")
if only_unclassified:
# Articles still carrying the fast heuristic score, i.e. not yet judged
# by the model. Lets a scheduled cycle only spend the LLM on new items.
filters.append("s.model_name LIKE 'heuristic-%'")
where = ("WHERE " + " AND ".join(filters)) if filters else ""
return conn.execute(
f"""
SELECT
a.id,
a.title,
a.description,
a.published_at,
a.canonical_url,
src.name AS source_name,
src.default_category,
src.trust_score AS source_trust_score,
src.pr_risk_score AS source_pr_risk_score,
s.constructive_score,
s.cortisol_score,
s.ragebait_score,
s.agency_score,
s.human_benefit_score,
s.pr_risk_score,
s.accepted,
s.reason_code
FROM articles a
JOIN sources src ON src.id = a.source_id
LEFT JOIN article_scores s ON s.article_id = a.id
{where}
ORDER BY
CASE WHEN s.model_name LIKE 'heuristic-%' THEN 0 ELSE 1 END,
COALESCE(a.published_at, a.discovered_at) DESC
LIMIT ?
""",
(limit,),
).fetchall()
def _article_prompt(article: sqlite3.Row) -> str:
return "\n".join(
[
f"Source: {article['source_name']}",
f"Source category: {article['default_category'] or 'unknown'}",
f"Source trust score: {article['source_trust_score']}/10",
f"Source PR risk score: {article['source_pr_risk_score']}/10",
f"Published: {article['published_at'] or 'unknown'}",
f"Title: {article['title']}",
f"Snippet: {article['description'] or ''}",
f"URL: {article['canonical_url']}",
]
)
def _bounded_int(value: object) -> int:
try:
parsed = int(value)
except (TypeError, ValueError):
parsed = 0
return max(0, min(10, parsed))