from __future__ import annotations import json import os import sqlite3 import urllib.error import urllib.request from collections.abc import Callable from dataclasses import dataclass from .taxonomy import ( ALLOWED_TAGS, FLAVORS, MAX_TAGS, TOPICS, coerce_flavor, coerce_tags, coerce_topic, flavors_prompt_block, tags_prompt_block, topics_prompt_block, ) DEFAULT_BASE_URL = "http://127.0.0.1:1234/v1" DEFAULT_MODEL = "gpt-oss" DEFAULT_EMBED_MODEL = "text-embedding-nomic-embed-text-v1.5" DEFAULT_TIMEOUT = 180 # Structured-output schema. Newer LM Studio / OpenAI-compatible servers want a # json_schema response_format (older ones took json_object); we try schema first # and fall back gracefully so the client works across server versions. _SCORE_FIELD = {"type": "integer", "minimum": 0, "maximum": 10} CLASSIFICATION_SCHEMA = { "type": "object", "additionalProperties": False, "required": [ "constructive_score", "cortisol_score", "ragebait_score", "agency_score", "human_benefit_score", "novelty_score", "pr_risk_score", "accepted", "topic", "flavor", "tags", "reason_code", "reason_text", "language", ], "properties": { "constructive_score": _SCORE_FIELD, "cortisol_score": _SCORE_FIELD, "ragebait_score": _SCORE_FIELD, "agency_score": _SCORE_FIELD, "human_benefit_score": _SCORE_FIELD, "novelty_score": _SCORE_FIELD, "pr_risk_score": _SCORE_FIELD, "accepted": {"type": "boolean"}, "topic": {"type": "string", "enum": list(TOPICS)}, "flavor": {"type": "string", "enum": list(FLAVORS)}, "tags": {"type": "array", "items": {"type": "string", "enum": list(ALLOWED_TAGS)}, "maxItems": MAX_TAGS}, "reason_code": {"type": "string"}, "reason_text": {"type": "string"}, "language": {"type": "string"}, # ISO 639-1 of the article's own text (en, de, es…) }, } # Response-format variants tried in order. Once one succeeds for a client, it is # pinned so we stop paying failed round-trips on every subsequent call. _RESPONSE_FORMATS = ( {"type": "json_schema", "json_schema": {"name": "classification", "strict": True, "schema": CLASSIFICATION_SCHEMA}}, {"type": "json_object"}, None, ) SYSTEM_PROMPT = """You classify article metadata for upbeatBytes, a calm news digest. The bar is NOT "is this happy?" — it is "will a reader finish this calm or a little better, never worse?" ACCEPT stories that are calm, neutral, insightful, or uplifting: they inform, teach, delight, or show progress or benefit. Neutral-but-absorbing is welcome — a discovery, a clear explainer, a clever build or gadget, a fascinating bit of science, space, nature, design, or culture, a genuinely useful insight — even when it isn't "feel-good." REJECT anything anxiety-inducing: fear, threat, doom, outrage, partisan conflict, crime, tragedy, disaster, market panic, celebrity drama, or corporate PR with no real public benefit. Also reject visceral-threat and body-horror hooks — disease outbreaks, parasites, infestations, contamination, recalls, poisonings, deadly or "flesh-eating" infections — EVEN when the piece is calmly written or framed as "monitoring," "surveillance," "awareness," or "public health." A measured, factual telling of an alarming subject still leaves a worse aftertaste. ESPECIALLY reject the comparison traps — anything that would make a reader feel inferior, behind, inadequate, envious, or pressured (status flexing, FOMO, hustle-grind, "you're falling behind"). When unsure, judge the emotional aftertaste, not the topic. Health and public-health stories ARE welcome when the subject itself is benign or hopeful — a treatment that helps, a disease in decline, prevention, recovery, caregiving, fitness, mental wellbeing, or a genuine medical advance. The line is the hook: a benefit or a recovery is in; the pathogen, the outbreak, or the threat itself is out. Score cortisol_score by the reader's personal, visceral, or public-health threat — NOT by dramatic vocabulary or the grandeur of the subject. Distant astronomy and cosmology (black holes, stars, cosmic events), engineering or equipment hazards, geological forces, scientific self-correction and measurement quirks, natural-history mechanisms, predator–prey biology, and historical discoveries are LOW cortisol (0–3) even when written with words like "deadly," "lethal," "destructive," "shocking," or "dangerous." A black hole winking across the cosmos, harsh lunar regolith that shreds equipment, a venomous snake's biology, or an ancient extinction is wonder, not dread — accept it. Reserve high cortisol for disease, contamination, outbreak, parasites, violence, or immediate human or animal suffering — that is what the reader's gut actually flinches from. On AI specifically: this is NOT "no AI" — it is "no AI dread." ACCEPT AI stories about practical tools, accessibility, medical/scientific/educational benefit, creative or maker use, environmental or resource gains, open research, humane design, or a specific bounded innovation. REJECT AI stories whose main frame is loss of human control, cognitive decline or "brain rot," job-displacement panic, surveillance panic, existential doom, harm-to-children or social-fabric panic, "you're falling behind" productivity anxiety, or adversarial arms-race framing. Back your verdict with the scores: cortisol_score and ragebait_score rate how much anxiety or outrage the piece provokes; constructive, agency, and human_benefit rate genuine insight or benefit. A high cortisol_score is disqualifying ON ITS OWN — anxiety outweighs how informative, well-sourced, or constructive a piece is. Do not let "informative" or "public health" rescue an unsettling subject. Also assign one primary topic and one flavor (the single best fit), plus 1-4 grouping tags. Primary topic (what the story is mainly about): {topics} Flavor (why it belongs in a calm, uplifting digest): {flavors} Grouping tags — choose ONLY from this controlled vocabulary: {tags} Tag discipline: assign 1-4 tags; prefer fewer, stronger ones; never tag by weak association; pick tags a reader would reasonably use to find this story later. Also report `language`: the ISO 639-1 code of the article's OWN text (the title and description), e.g. "en", "de", "es", "fr". Judge the language of the words, not the subject. This is detection only — score and accept the story on its merits as usual; the site decides separately what to do with non-English items. Return only JSON with this exact shape: {{ "constructive_score": 0, "cortisol_score": 0, "ragebait_score": 0, "agency_score": 0, "human_benefit_score": 0, "novelty_score": 0, "pr_risk_score": 0, "accepted": false, "topic": "one_of_the_allowed_topics", "flavor": "one_of_the_allowed_flavors", "tags": ["one_to_four_allowed_tags"], "reason_code": "short_snake_case", "reason_text": "one concise sentence", "language": "en" }} """.format(topics=topics_prompt_block(), flavors=flavors_prompt_block(), tags=tags_prompt_block()) @dataclass class LocalModelClient: base_url: str model: str api_key: str | None = None timeout: int = DEFAULT_TIMEOUT embed_model: str = DEFAULT_EMBED_MODEL # Index into _RESPONSE_FORMATS that the server accepts; discovered lazily. _response_format_idx: int | None = None @classmethod def from_env(cls) -> "LocalModelClient": return cls( base_url=os.environ.get("GOODNEWS_LLM_BASE_URL", DEFAULT_BASE_URL).rstrip("/"), model=os.environ.get("GOODNEWS_LLM_MODEL", DEFAULT_MODEL), api_key=os.environ.get("GOODNEWS_LLM_API_KEY"), timeout=int(os.environ.get("GOODNEWS_LLM_TIMEOUT", DEFAULT_TIMEOUT)), embed_model=os.environ.get("GOODNEWS_EMBED_MODEL", DEFAULT_EMBED_MODEL), ) def embed(self, texts: list[str]) -> list[list[float]]: """Return embedding vectors for a batch of texts via /embeddings.""" body = json.dumps({"model": self.embed_model, "input": texts}).encode("utf-8") headers = {"Content-Type": "application/json"} if self.api_key: headers["Authorization"] = f"Bearer {self.api_key}" request = urllib.request.Request( f"{self.base_url}/embeddings", data=body, headers=headers, method="POST" ) try: with urllib.request.urlopen(request, timeout=self.timeout) as response: data = json.loads(response.read().decode("utf-8")) except urllib.error.HTTPError as exc: detail = exc.read().decode("utf-8", errors="replace") raise RuntimeError(f"HTTP {exc.code} from embeddings: {detail}") from exc except urllib.error.URLError as exc: raise RuntimeError(f"could not reach embeddings at {self.base_url}: {exc.reason}") from exc try: return [item["embedding"] for item in data["data"]] except (KeyError, TypeError) as exc: raise RuntimeError(f"unexpected embeddings response: {data}") from exc def classify(self, article: sqlite3.Row) -> dict: messages = [ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": _article_prompt(article)}, ] # If we already learned which response_format the server accepts, use it. if self._response_format_idx is not None: return self._chat(self._build_payload(messages, _RESPONSE_FORMATS[self._response_format_idx])) # Otherwise escalate through the variants, pinning the first that works. last_exc: RuntimeError | None = None for idx, fmt in enumerate(_RESPONSE_FORMATS): try: result = self._chat(self._build_payload(messages, fmt)) self._response_format_idx = idx return result except RuntimeError as exc: if "HTTP 400" not in str(exc): raise last_exc = exc raise last_exc if last_exc else RuntimeError("no usable response_format") def _build_payload(self, messages: list[dict], response_format: dict | None) -> dict: payload = {"model": self.model, "temperature": 0.1, "messages": messages} if response_format is not None: payload["response_format"] = response_format return payload def list_models(self) -> list[str]: headers = {} if self.api_key: headers["Authorization"] = f"Bearer {self.api_key}" request = urllib.request.Request(f"{self.base_url}/models", headers=headers) try: with urllib.request.urlopen(request, timeout=10) as response: data = json.loads(response.read().decode("utf-8")) except urllib.error.HTTPError as exc: detail = exc.read().decode("utf-8", errors="replace") raise RuntimeError(f"HTTP {exc.code} from local model: {detail}") from exc except urllib.error.URLError as exc: raise RuntimeError(f"could not reach local model at {self.base_url}: {exc.reason}") from exc models = data.get("data", []) names = [] for model in models: if isinstance(model, dict) and model.get("id"): names.append(str(model["id"])) return names def chat_text(self, messages: list[dict]) -> str: """Plain chat completion → the raw message text (no JSON parsing). Used for free-form output like summaries; classification uses _chat, which JSON-parses the same content. """ return self._raw_content(self._build_payload(messages, None)) def rank_for_social(self, candidates: list[dict]) -> list[dict]: """ONE bounded COMPARATIVE pass over a small candidate set (not N calls). Returns a best-first list of {id, social_score 0-10, why, talking_points, angle, entities}. Bounded by self.timeout; callers fall back to deterministic ranking on ANY failure, so the Publishing Desk always works.""" if not candidates: return [] lines = [] for c in candidates: summ = " ".join((c.get("summary") or "").split())[:280] lines.append(f'- id={int(c["id"])} | topic={c.get("topic")} | {c["title"]} :: {summ}') user = ( "These are constructive-news articles. Compare them as candidates for a SHORT X " "(Twitter) post from a calm good-news account, and rank best-first by SOCIAL " "share-worthiness — would someone stop scrolling? That differs from how 'good' the " "article is.\n\n" + "\n".join(lines) + "\n\n" 'Reply with JSON only, exactly this shape:\n' '{"ranked": [{"id": , "social_score": <0-10>, ' '"why": "one sentence: why it stops the scroll", ' '"talking_points": ["3 short factual points a writer could use"], ' '"angle": "a possible conversational angle", ' '"entities": ["real org/person names mentioned, for tagging"]}]}\n' "Only use ids from the list above. Order best-first." ) messages = [ {"role": "system", "content": "You rank constructive news for social sharing. Reply with JSON only."}, {"role": "user", "content": user}, ] data = parse_classifier_json(self.chat_text(messages)) ranked = data.get("ranked") if isinstance(data, dict) else None if not isinstance(ranked, list): raise RuntimeError("rank_for_social: missing 'ranked' list") out = [] for r in ranked: if not isinstance(r, dict): continue try: rid = int(r.get("id")) except (TypeError, ValueError): continue # Require ACTUAL lists — a model that returns a bare string must not be # iterated into characters ("fact" → ["f","a","c","t"]). tp = r.get("talking_points") ents = r.get("entities") out.append({ "id": rid, "social_score": _bounded_int(r.get("social_score")), "why": str(r.get("why") or "")[:300], "talking_points": [str(p)[:200] for p in tp][:4] if isinstance(tp, list) else [], "angle": str(r.get("angle") or "")[:300], "entities": [str(e)[:80] for e in ents][:8] if isinstance(ents, list) else [], }) return out def _raw_content(self, payload: dict) -> str: body = json.dumps(payload).encode("utf-8") headers = {"Content-Type": "application/json"} if self.api_key: headers["Authorization"] = f"Bearer {self.api_key}" request = urllib.request.Request( f"{self.base_url}/chat/completions", data=body, headers=headers, method="POST", ) try: with urllib.request.urlopen(request, timeout=self.timeout) as response: data = json.loads(response.read().decode("utf-8")) except urllib.error.HTTPError as exc: detail = exc.read().decode("utf-8", errors="replace") raise RuntimeError(f"HTTP {exc.code} from local model: {detail}") from exc except urllib.error.URLError as exc: raise RuntimeError(f"could not reach local model at {self.base_url}: {exc.reason}") from exc try: return data["choices"][0]["message"]["content"] except (KeyError, IndexError, TypeError) as exc: raise RuntimeError(f"unexpected local model response: {data}") from exc def _chat(self, payload: dict) -> dict: return parse_classifier_json(self._raw_content(payload)) @dataclass class ClassifyReport: results: list[tuple[int, dict]] attempted: int succeeded: int skipped: int def classify_articles( conn: sqlite3.Connection, client: LocalModelClient, limit: int, include_rejected: bool = False, dry_run: bool = False, only_unclassified: bool = False, progress: "Callable[[int, int, int], None] | None" = None, ) -> ClassifyReport: rows = _classification_candidates( conn, limit=limit, include_rejected=include_rejected, only_unclassified=only_unclassified ) results = [] skipped = 0 for index, row in enumerate(rows, start=1): try: scores = client.classify(row) except RuntimeError as exc: # One slow/failed article (timeout, bad response) shouldn't sink the # whole batch or discard work already committed. Skip and continue. skipped += 1 print(f"[{row['id']}] skipped: {exc}") continue scores = normalize_scores(scores, model_name=client.model) results.append((row["id"], scores)) if not dry_run: upsert_article_score(conn, row["id"], scores) conn.commit() if progress is not None: progress(index, len(rows), row["id"]) return ClassifyReport(results=results, attempted=len(rows), succeeded=len(results), skipped=skipped) def parse_classifier_json(content: str) -> dict: content = content.strip() try: return json.loads(content) except json.JSONDecodeError: start = content.find("{") end = content.rfind("}") if start == -1 or end == -1 or end <= start: raise RuntimeError(f"model did not return JSON: {content}") return json.loads(content[start : end + 1]) def _is_english(language: str) -> bool: """Conservative: HOLD only when the model clearly reports a non-English language. Missing/blank/undetermined → treated as English, so a model hiccup never silently drops genuine English content (the corpus is ~all English today).""" lang = (language or "").strip().lower() if not lang or lang in ("und", "unknown", "mul", "zxx"): return True return lang == "en" or lang.startswith("en-") or lang.startswith("en_") def normalize_scores(data: dict, model_name: str) -> dict: language = str(data.get("language") or "").strip().lower()[:16] accepted = 1 if bool(data.get("accepted")) else 0 reason_code = str(data.get("reason_code") or "model_no_reason")[:120] reason_text = str(data.get("reason_text") or "")[:1000] # Language gate (code disposes): the public feed is English-only for now. A # non-English article is HELD — never shown — but PRESERVED with a distinct # reason so it isn't counted as a calm-filter rejection or a source failure, and # can be revisited when translation support lands (Phase 4 / GDELT). if not _is_english(language): accepted = 0 reason_code = "non_english" reason_text = f"Held — non-English ({language}); awaiting translation support." return { "constructive_score": _bounded_int(data.get("constructive_score")), "cortisol_score": _bounded_int(data.get("cortisol_score")), "ragebait_score": _bounded_int(data.get("ragebait_score")), "agency_score": _bounded_int(data.get("agency_score")), "human_benefit_score": _bounded_int(data.get("human_benefit_score")), "novelty_score": _bounded_int(data.get("novelty_score")), "pr_risk_score": _bounded_int(data.get("pr_risk_score")), "accepted": accepted, "topic": coerce_topic(data.get("topic")), "flavor": coerce_flavor(data.get("flavor")), "tags": coerce_tags(data.get("tags")), "reason_code": reason_code, "reason_text": reason_text, "language": language, "model_name": model_name, } def upsert_article_score(conn: sqlite3.Connection, article_id: int, scores: dict) -> None: conn.execute( """ INSERT INTO article_scores ( article_id, constructive_score, cortisol_score, ragebait_score, agency_score, human_benefit_score, novelty_score, pr_risk_score, accepted, topic, flavor, reason_code, reason_text, language, model_name, scored_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP) ON CONFLICT(article_id) DO UPDATE SET constructive_score = excluded.constructive_score, cortisol_score = excluded.cortisol_score, ragebait_score = excluded.ragebait_score, agency_score = excluded.agency_score, human_benefit_score = excluded.human_benefit_score, novelty_score = excluded.novelty_score, pr_risk_score = excluded.pr_risk_score, accepted = excluded.accepted, topic = excluded.topic, flavor = excluded.flavor, reason_code = excluded.reason_code, reason_text = excluded.reason_text, language = excluded.language, model_name = excluded.model_name, scored_at = CURRENT_TIMESTAMP """, ( article_id, scores["constructive_score"], scores["cortisol_score"], scores["ragebait_score"], scores["agency_score"], scores["human_benefit_score"], scores["novelty_score"], scores["pr_risk_score"], scores["accepted"], scores["topic"], scores["flavor"], scores["reason_code"], scores["reason_text"], scores.get("language"), scores["model_name"], ), ) # Replace this article's grouping tags (controlled vocabulary, 0-4). conn.execute("DELETE FROM article_tags WHERE article_id = ?", (article_id,)) for tag in scores.get("tags") or []: conn.execute( "INSERT OR IGNORE INTO article_tags (article_id, tag) VALUES (?, ?)", (article_id, tag) ) def _classification_candidates( conn: sqlite3.Connection, limit: int, include_rejected: bool, only_unclassified: bool = False, ) -> list[sqlite3.Row]: filters = [] if not include_rejected: filters.append("(s.accepted = 1 OR s.constructive_score >= 4)") if only_unclassified: # Articles still carrying the fast heuristic score, i.e. not yet judged # by the model. Lets a scheduled cycle only spend the LLM on new items. filters.append("s.model_name LIKE 'heuristic-%'") where = ("WHERE " + " AND ".join(filters)) if filters else "" return conn.execute( f""" SELECT a.id, a.title, a.description, a.published_at, a.canonical_url, src.name AS source_name, src.default_category, src.trust_score AS source_trust_score, src.pr_risk_score AS source_pr_risk_score, s.constructive_score, s.cortisol_score, s.ragebait_score, s.agency_score, s.human_benefit_score, s.pr_risk_score, s.accepted, s.reason_code FROM articles a JOIN sources src ON src.id = a.source_id LEFT JOIN article_scores s ON s.article_id = a.id {where} ORDER BY CASE WHEN s.model_name LIKE 'heuristic-%' THEN 0 ELSE 1 END, COALESCE(a.published_at, a.discovered_at) DESC LIMIT ? """, (limit,), ).fetchall() def _article_prompt(article: sqlite3.Row) -> str: return "\n".join( [ f"Source: {article['source_name']}", f"Source category: {article['default_category'] or 'unknown'}", f"Source trust score: {article['source_trust_score']}/10", f"Source PR risk score: {article['source_pr_risk_score']}/10", f"Published: {article['published_at'] or 'unknown'}", f"Title: {article['title']}", f"Snippet: {article['description'] or ''}", f"URL: {article['canonical_url']}", ] ) def _bounded_int(value: object) -> int: try: parsed = int(value) except (TypeError, ValueError): parsed = 0 return max(0, min(10, parsed))