Durability pass: tests, clearer diversity/classify behavior, Calm Filters foundation
- Add pytest suite (34 tests) covering scoring thresholds, dedup clustering + representative selection + time window, brief source/category diversity, avoid-term phrase matching, and text canonicalization/truncation. - Rewrite _select_diverse with an explicit, tested contract (best-first, one per source, backfill, then inject a second category by evicting the lowest-ranked pick). - classify_articles now returns attempted/succeeded/skipped (ClassifyReport) so silent model failures are visible in both the cycle and classify output. - Fix clean_text truncation to stay within max_len (ellipsis no longer overshoots). - New filters.py: canonical FilterPrefs shape (include/mute topics+flavors, avoid_terms, pauses) and pure word/phrase-boundary matching engine seeding Calm Filters. Not yet wired into the API. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
+22
-11
@@ -141,21 +141,30 @@ def _candidate_articles(
|
||||
|
||||
|
||||
def _select_diverse(rows: list[sqlite3.Row], limit: int) -> list[sqlite3.Row]:
|
||||
selected = []
|
||||
seen_sources = set()
|
||||
seen_categories = set()
|
||||
"""Pick up to `limit` items from `rows` (already ranked best-first).
|
||||
|
||||
Contract:
|
||||
1. Prefer higher-ranked items.
|
||||
2. Source diversity: take at most one item per source while other sources
|
||||
remain; only repeat a source once distinct sources are exhausted.
|
||||
3. Category diversity: if the result ended up single-category and a different
|
||||
category is available in the pool, swap in the highest-ranked off-category
|
||||
candidate by evicting the lowest-ranked currently-selected item (so we
|
||||
gain breadth without dropping a higher-ranked pick).
|
||||
"""
|
||||
selected: list[sqlite3.Row] = []
|
||||
seen_sources: set = set()
|
||||
|
||||
# Pass 1: best-first, one per source.
|
||||
for row in rows:
|
||||
if len(selected) >= limit:
|
||||
break
|
||||
source = row["source_name"]
|
||||
category = row["default_category"]
|
||||
if source in seen_sources and len(rows) > limit:
|
||||
if row["source_name"] in seen_sources:
|
||||
continue
|
||||
selected.append(row)
|
||||
seen_sources.add(source)
|
||||
seen_categories.add(category)
|
||||
seen_sources.add(row["source_name"])
|
||||
|
||||
# Pass 2: if short on distinct sources, backfill best-first regardless.
|
||||
if len(selected) < limit:
|
||||
selected_ids = {row["id"] for row in selected}
|
||||
for row in rows:
|
||||
@@ -166,13 +175,15 @@ def _select_diverse(rows: list[sqlite3.Row], limit: int) -> list[sqlite3.Row]:
|
||||
selected.append(row)
|
||||
selected_ids.add(row["id"])
|
||||
|
||||
if len(seen_categories) < 2 and len(rows) > limit:
|
||||
# Pass 3: ensure >= 2 categories when the pool allows it.
|
||||
categories = {row["default_category"] for row in selected}
|
||||
if len(categories) < 2:
|
||||
selected_ids = {row["id"] for row in selected}
|
||||
for row in rows:
|
||||
if row["id"] in selected_ids:
|
||||
continue
|
||||
if row["default_category"] not in seen_categories:
|
||||
selected[-1] = row
|
||||
if row["default_category"] not in categories:
|
||||
selected[-1] = row # evict the lowest-ranked selected item
|
||||
break
|
||||
|
||||
return selected
|
||||
|
||||
+12
-4
@@ -144,20 +144,24 @@ def main() -> None:
|
||||
elif args.command == "classify":
|
||||
init_db(conn)
|
||||
client = llm_client_from_args(args)
|
||||
results = classify_articles(
|
||||
report = classify_articles(
|
||||
conn,
|
||||
client,
|
||||
limit=args.limit,
|
||||
include_rejected=args.include_rejected,
|
||||
dry_run=args.dry_run,
|
||||
)
|
||||
for article_id, scores in results:
|
||||
for article_id, scores in report.results:
|
||||
accepted = "yes" if scores["accepted"] else "no"
|
||||
print(
|
||||
f"[{article_id}] accepted={accepted} {scores['topic']}/{scores['flavor']} "
|
||||
f"reason={scores['reason_code']}"
|
||||
)
|
||||
print(f" {scores['reason_text']}")
|
||||
print(
|
||||
f"classify: attempted={report.attempted} succeeded={report.succeeded} "
|
||||
f"skipped={report.skipped}"
|
||||
)
|
||||
if args.dry_run:
|
||||
print("Dry run only; database was not updated.")
|
||||
elif args.command == "cycle":
|
||||
@@ -294,7 +298,7 @@ def _run_cycle_locked(conn: sqlite3.Connection, args: argparse.Namespace) -> Non
|
||||
print(f" classify {done}/{total} (article {article_id})", flush=True)
|
||||
|
||||
try:
|
||||
results = classify_articles(
|
||||
report = classify_articles(
|
||||
conn,
|
||||
client,
|
||||
limit=args.classify_limit,
|
||||
@@ -302,7 +306,11 @@ def _run_cycle_locked(conn: sqlite3.Connection, args: argparse.Namespace) -> Non
|
||||
only_unclassified=True,
|
||||
progress=_progress,
|
||||
)
|
||||
print(f"classify: {len(results)} new article(s) scored by {client.model}", flush=True)
|
||||
print(
|
||||
f"classify: attempted={report.attempted} succeeded={report.succeeded} "
|
||||
f"skipped={report.skipped} (model {client.model})",
|
||||
flush=True,
|
||||
)
|
||||
except Exception as exc: # endpoint down, timeout, etc. — keep going
|
||||
print(f"classify: skipped ({exc})", flush=True)
|
||||
|
||||
|
||||
@@ -0,0 +1,123 @@
|
||||
"""Calm Filters — the canonical preference model and pure matching engine.
|
||||
|
||||
Everything (localStorage today, query params on the API, a user_preferences row
|
||||
later) speaks this one shape, so the surfaces never drift. The functions here are
|
||||
deliberately pure and side-effect-free so they are easy to test and reuse from
|
||||
both the API and the CLI.
|
||||
|
||||
The humane surface ("Not today" / "Less like this" / "Always hide this") maps onto
|
||||
this machinery: a pause is a topic/flavor muted *until* a timestamp; a mute is a
|
||||
standing exclusion; avoid-terms drop anything mentioning a phrase the reader would
|
||||
rather not see.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime
|
||||
|
||||
# Split on any run of non-alphanumerics so matching is punctuation- and
|
||||
# case-insensitive, and anchored to whole words/phrases (no substring surprises:
|
||||
# "pan" must not match "pandemic", and "stock market" matches as a phrase).
|
||||
_NONWORD = re.compile(r"[^a-z0-9]+")
|
||||
|
||||
|
||||
def _normalize(text: str) -> str:
|
||||
"""Lowercase, collapse non-alphanumerics to single spaces, pad with spaces."""
|
||||
return " " + _NONWORD.sub(" ", text.lower()).strip() + " "
|
||||
|
||||
|
||||
def text_matches_avoid_terms(text: str | None, terms: list[str]) -> bool:
|
||||
"""True if text contains any avoid term as a whole word or phrase."""
|
||||
if not text or not terms:
|
||||
return False
|
||||
haystack = _normalize(text)
|
||||
for term in terms:
|
||||
needle = _normalize(term).strip()
|
||||
if needle and f" {needle} " in haystack:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
@dataclass
|
||||
class Pause:
|
||||
kind: str # "topic" or "flavor"
|
||||
value: str
|
||||
until: str # ISO 8601 UTC timestamp
|
||||
|
||||
def active(self, now: datetime) -> bool:
|
||||
try:
|
||||
until = datetime.fromisoformat(self.until.replace("Z", "+00:00"))
|
||||
except (ValueError, AttributeError):
|
||||
return False
|
||||
return until > now
|
||||
|
||||
|
||||
@dataclass
|
||||
class FilterPrefs:
|
||||
include_topics: list[str] = field(default_factory=list)
|
||||
include_flavors: list[str] = field(default_factory=list)
|
||||
mute_topics: list[str] = field(default_factory=list)
|
||||
mute_flavors: list[str] = field(default_factory=list)
|
||||
avoid_terms: list[str] = field(default_factory=list)
|
||||
pauses: list[Pause] = field(default_factory=list)
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: dict | None) -> "FilterPrefs":
|
||||
data = data or {}
|
||||
return cls(
|
||||
include_topics=list(data.get("include_topics") or []),
|
||||
include_flavors=list(data.get("include_flavors") or []),
|
||||
mute_topics=list(data.get("mute_topics") or []),
|
||||
mute_flavors=list(data.get("mute_flavors") or []),
|
||||
avoid_terms=list(data.get("avoid_terms") or []),
|
||||
pauses=[Pause(**p) for p in (data.get("pauses") or [])],
|
||||
)
|
||||
|
||||
def muted_topics(self, now: datetime) -> set[str]:
|
||||
"""Standing mutes plus any topic currently paused."""
|
||||
muted = set(self.mute_topics)
|
||||
muted |= {p.value for p in self.pauses if p.kind == "topic" and p.active(now)}
|
||||
return muted
|
||||
|
||||
def muted_flavors(self, now: datetime) -> set[str]:
|
||||
muted = set(self.mute_flavors)
|
||||
muted |= {p.value for p in self.pauses if p.kind == "flavor" and p.active(now)}
|
||||
return muted
|
||||
|
||||
def is_empty(self) -> bool:
|
||||
return not (
|
||||
self.include_topics
|
||||
or self.include_flavors
|
||||
or self.mute_topics
|
||||
or self.mute_flavors
|
||||
or self.avoid_terms
|
||||
or self.pauses
|
||||
)
|
||||
|
||||
|
||||
def allows(article: dict, prefs: FilterPrefs, now: datetime) -> bool:
|
||||
"""True if an article (a feed/brief row dict) survives the preferences."""
|
||||
topic = article.get("topic")
|
||||
flavor = article.get("flavor")
|
||||
|
||||
if prefs.include_topics and topic not in prefs.include_topics:
|
||||
return False
|
||||
if prefs.include_flavors and flavor not in prefs.include_flavors:
|
||||
return False
|
||||
if topic in prefs.muted_topics(now):
|
||||
return False
|
||||
if flavor in prefs.muted_flavors(now):
|
||||
return False
|
||||
blob = f"{article.get('title') or ''} {article.get('description') or ''}"
|
||||
if text_matches_avoid_terms(blob, prefs.avoid_terms):
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def filter_articles(articles: list[dict], prefs: FilterPrefs, now: datetime) -> list[dict]:
|
||||
"""Apply preferences to a list of article rows, preserving order."""
|
||||
if prefs.is_empty():
|
||||
return articles
|
||||
return [a for a in articles if allows(a, prefs, now)]
|
||||
+12
-2
@@ -220,6 +220,14 @@ class LocalModelClient:
|
||||
return parse_classifier_json(content)
|
||||
|
||||
|
||||
@dataclass
|
||||
class ClassifyReport:
|
||||
results: list[tuple[int, dict]]
|
||||
attempted: int
|
||||
succeeded: int
|
||||
skipped: int
|
||||
|
||||
|
||||
def classify_articles(
|
||||
conn: sqlite3.Connection,
|
||||
client: LocalModelClient,
|
||||
@@ -228,17 +236,19 @@ def classify_articles(
|
||||
dry_run: bool = False,
|
||||
only_unclassified: bool = False,
|
||||
progress: "Callable[[int, int, int], None] | None" = None,
|
||||
) -> list[tuple[int, dict]]:
|
||||
) -> ClassifyReport:
|
||||
rows = _classification_candidates(
|
||||
conn, limit=limit, include_rejected=include_rejected, only_unclassified=only_unclassified
|
||||
)
|
||||
results = []
|
||||
skipped = 0
|
||||
for index, row in enumerate(rows, start=1):
|
||||
try:
|
||||
scores = client.classify(row)
|
||||
except RuntimeError as exc:
|
||||
# One slow/failed article (timeout, bad response) shouldn't sink the
|
||||
# whole batch or discard work already committed. Skip and continue.
|
||||
skipped += 1
|
||||
print(f"[{row['id']}] skipped: {exc}")
|
||||
continue
|
||||
scores = normalize_scores(scores, model_name=client.model)
|
||||
@@ -248,7 +258,7 @@ def classify_articles(
|
||||
conn.commit()
|
||||
if progress is not None:
|
||||
progress(index, len(rows), row["id"])
|
||||
return results
|
||||
return ClassifyReport(results=results, attempted=len(rows), succeeded=len(results), skipped=skipped)
|
||||
|
||||
|
||||
def parse_classifier_json(content: str) -> dict:
|
||||
|
||||
+2
-1
@@ -26,7 +26,8 @@ def clean_text(value: str | None, max_len: int = 1000) -> str | None:
|
||||
text = html.unescape(text)
|
||||
text = WHITESPACE_RE.sub(" ", text).strip()
|
||||
if len(text) > max_len:
|
||||
return text[: max_len - 1].rstrip() + "..."
|
||||
# Keep the ellipsis inside max_len rather than overshooting by 3.
|
||||
return text[: max_len - 3].rstrip() + "..."
|
||||
return text or None
|
||||
|
||||
|
||||
|
||||
@@ -16,6 +16,9 @@ web = [
|
||||
"fastapi>=0.110",
|
||||
"uvicorn[standard]>=0.29",
|
||||
]
|
||||
test = [
|
||||
"pytest>=8",
|
||||
]
|
||||
|
||||
[project.scripts]
|
||||
goodnews = "goodnews.cli:main"
|
||||
|
||||
@@ -0,0 +1,50 @@
|
||||
from goodnews.briefs import _select_diverse
|
||||
|
||||
|
||||
def row(id, source, category):
|
||||
# _select_diverse only reads these three keys; plain dicts support [] access.
|
||||
return {"id": id, "source_name": source, "default_category": category}
|
||||
|
||||
|
||||
def test_prefers_distinct_sources_best_first():
|
||||
rows = [
|
||||
row(1, "A", "science"),
|
||||
row(2, "A", "science"), # same source as #1 — should be skipped while others remain
|
||||
row(3, "B", "science"),
|
||||
row(4, "C", "environment"),
|
||||
]
|
||||
selected = _select_diverse(rows, limit=3)
|
||||
ids = [r["id"] for r in selected]
|
||||
assert ids == [1, 3, 4] # one per source, ranked order preserved
|
||||
|
||||
|
||||
def test_backfills_when_sources_exhausted():
|
||||
rows = [row(1, "A", "science"), row(2, "A", "science"), row(3, "A", "science")]
|
||||
selected = _select_diverse(rows, limit=2)
|
||||
assert len(selected) == 2 # repeats source A only because no others exist
|
||||
|
||||
|
||||
def test_injects_second_category_without_shrinking():
|
||||
rows = [
|
||||
row(1, "A", "science"),
|
||||
row(2, "B", "science"),
|
||||
row(3, "C", "science"),
|
||||
row(4, "D", "environment"), # the only other category, lowest ranked
|
||||
]
|
||||
selected = _select_diverse(rows, limit=3)
|
||||
cats = {r["default_category"] for r in selected}
|
||||
assert len(selected) == 3
|
||||
assert len(cats) >= 2 # environment swapped in for diversity
|
||||
assert any(r["default_category"] == "environment" for r in selected)
|
||||
|
||||
|
||||
def test_keeps_single_category_when_no_alternative_exists():
|
||||
rows = [row(1, "A", "science"), row(2, "B", "science"), row(3, "C", "science")]
|
||||
selected = _select_diverse(rows, limit=3)
|
||||
assert len(selected) == 3
|
||||
assert {r["default_category"] for r in selected} == {"science"}
|
||||
|
||||
|
||||
def test_never_returns_more_than_limit():
|
||||
rows = [row(i, f"S{i}", "science") for i in range(10)]
|
||||
assert len(_select_diverse(rows, limit=5)) == 5
|
||||
@@ -0,0 +1,83 @@
|
||||
import math
|
||||
from array import array
|
||||
|
||||
import pytest
|
||||
|
||||
from goodnews.db import connect, init_db
|
||||
from goodnews.dedup import _day_ordinal, _unit, cluster_duplicates
|
||||
|
||||
|
||||
def test_unit_normalizes_to_length_one():
|
||||
u = _unit([3.0, 4.0])
|
||||
assert math.isclose(u[0], 0.6) and math.isclose(u[1], 0.8)
|
||||
|
||||
|
||||
def test_unit_handles_zero_vector():
|
||||
assert _unit([0.0, 0.0]) == [0.0, 0.0]
|
||||
|
||||
|
||||
def test_day_ordinal_parsing():
|
||||
from datetime import date
|
||||
|
||||
assert _day_ordinal("2026-05-30T12:00:00+00:00") == date(2026, 5, 30).toordinal()
|
||||
assert _day_ordinal(None) == 0
|
||||
assert _day_ordinal("not-a-date") == 0
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def conn():
|
||||
c = connect(":memory:")
|
||||
init_db(c)
|
||||
c.execute(
|
||||
"INSERT INTO sources (id, name, feed_url, trust_score) VALUES (1, 'S1', 'http://s1/feed', 5)"
|
||||
)
|
||||
yield c
|
||||
c.close()
|
||||
|
||||
|
||||
def _add(conn, article_id, vector, constructive, when="2026-05-30T10:00:00+00:00"):
|
||||
conn.execute(
|
||||
"INSERT INTO articles (id, source_id, canonical_url, title, published_at, url_hash) "
|
||||
"VALUES (?, 1, ?, ?, ?, ?)",
|
||||
(article_id, f"http://s1/{article_id}", f"Title {article_id}", when, f"hash{article_id}"),
|
||||
)
|
||||
conn.execute(
|
||||
"INSERT INTO article_scores (article_id, constructive_score, agency_score, "
|
||||
"human_benefit_score, cortisol_score, ragebait_score, pr_risk_score, accepted) "
|
||||
"VALUES (?, ?, 0, 0, 0, 0, 0, 1)",
|
||||
(article_id, constructive),
|
||||
)
|
||||
conn.execute(
|
||||
"INSERT INTO article_embeddings (article_id, vector, dim, model) VALUES (?, ?, ?, 'test')",
|
||||
(article_id, array("f", vector).tobytes(), len(vector)),
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
|
||||
def test_near_duplicates_collapse_to_highest_ranked(conn):
|
||||
# A and B are near-identical; A has the higher constructive score so it wins.
|
||||
_add(conn, 1, [1.0, 0.0, 0.0, 0.0], constructive=9) # A (rep)
|
||||
_add(conn, 2, [0.99, 0.02, 0.0, 0.0], constructive=3) # B (dup of A)
|
||||
_add(conn, 3, [0.0, 1.0, 0.0, 0.0], constructive=8) # C (distinct)
|
||||
|
||||
stats = cluster_duplicates(conn, threshold=0.86, window_days=3)
|
||||
assert stats["duplicates"] == 1
|
||||
|
||||
dup_of = {r["id"]: r["duplicate_of"] for r in conn.execute("SELECT id, duplicate_of FROM articles")}
|
||||
assert dup_of[2] == 1 # B points at A
|
||||
assert dup_of[1] is None # A is representative
|
||||
assert dup_of[3] is None # C stands alone
|
||||
|
||||
|
||||
def test_distinct_articles_are_not_clustered(conn):
|
||||
_add(conn, 1, [1.0, 0.0, 0.0, 0.0], constructive=5)
|
||||
_add(conn, 2, [0.0, 1.0, 0.0, 0.0], constructive=5)
|
||||
stats = cluster_duplicates(conn, threshold=0.86, window_days=3)
|
||||
assert stats["duplicates"] == 0
|
||||
|
||||
|
||||
def test_outside_time_window_not_clustered(conn):
|
||||
_add(conn, 1, [1.0, 0.0, 0.0, 0.0], constructive=9, when="2026-05-30T10:00:00+00:00")
|
||||
_add(conn, 2, [1.0, 0.0, 0.0, 0.0], constructive=3, when="2026-05-10T10:00:00+00:00")
|
||||
stats = cluster_duplicates(conn, threshold=0.86, window_days=3)
|
||||
assert stats["duplicates"] == 0 # identical vectors, but 20 days apart
|
||||
@@ -0,0 +1,88 @@
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from goodnews.filters import (
|
||||
FilterPrefs,
|
||||
Pause,
|
||||
filter_articles,
|
||||
text_matches_avoid_terms,
|
||||
)
|
||||
|
||||
NOW = datetime(2026, 6, 1, tzinfo=timezone.utc)
|
||||
|
||||
|
||||
def art(topic="science", flavor="discovery", title="A calm discovery", description=""):
|
||||
return {"topic": topic, "flavor": flavor, "title": title, "description": description}
|
||||
|
||||
|
||||
# --- avoid-term matching: the trust-critical pure function ---
|
||||
|
||||
def test_single_word_matches_whole_word_only():
|
||||
assert text_matches_avoid_terms("New cancer drug approved", ["cancer"])
|
||||
assert not text_matches_avoid_terms("Cancerous growth studied", ["cancer"])
|
||||
|
||||
|
||||
def test_substring_does_not_match():
|
||||
# "pan" must not match "pandemic"
|
||||
assert not text_matches_avoid_terms("Pandemic preparedness improves", ["pan"])
|
||||
|
||||
|
||||
def test_phrase_matches_as_phrase():
|
||||
assert text_matches_avoid_terms("The stock market crashed today", ["stock market"])
|
||||
assert not text_matches_avoid_terms("Stocks and other markets", ["stock market"])
|
||||
|
||||
|
||||
def test_punctuation_and_case_normalized():
|
||||
assert text_matches_avoid_terms("An Anti-Aging breakthrough", ["anti aging"])
|
||||
assert text_matches_avoid_terms("ELECTION results", ["election"])
|
||||
|
||||
|
||||
def test_empty_inputs_are_safe():
|
||||
assert not text_matches_avoid_terms("", ["cancer"])
|
||||
assert not text_matches_avoid_terms("anything", [])
|
||||
assert not text_matches_avoid_terms(None, ["cancer"])
|
||||
|
||||
|
||||
# --- filter_articles over the canonical prefs ---
|
||||
|
||||
def test_empty_prefs_pass_everything_through():
|
||||
items = [art(), art(topic="health")]
|
||||
assert filter_articles(items, FilterPrefs(), NOW) == items
|
||||
|
||||
|
||||
def test_mute_topic_drops_matching_articles():
|
||||
items = [art(topic="science"), art(topic="health")]
|
||||
prefs = FilterPrefs.from_dict({"mute_topics": ["health"]})
|
||||
out = filter_articles(items, prefs, NOW)
|
||||
assert [a["topic"] for a in out] == ["science"]
|
||||
|
||||
|
||||
def test_include_topics_keeps_only_those():
|
||||
items = [art(topic="science"), art(topic="animals"), art(topic="health")]
|
||||
prefs = FilterPrefs.from_dict({"include_topics": ["science", "animals"]})
|
||||
out = filter_articles(items, prefs, NOW)
|
||||
assert {a["topic"] for a in out} == {"science", "animals"}
|
||||
|
||||
|
||||
def test_avoid_terms_match_title_and_description():
|
||||
items = [art(title="Update on the election"), art(description="about an election too"), art()]
|
||||
prefs = FilterPrefs.from_dict({"avoid_terms": ["election"]})
|
||||
out = filter_articles(items, prefs, NOW)
|
||||
assert len(out) == 1
|
||||
|
||||
|
||||
def test_active_pause_hides_topic_but_expired_does_not():
|
||||
items = [art(topic="health")]
|
||||
active = FilterPrefs.from_dict(
|
||||
{"pauses": [{"kind": "topic", "value": "health", "until": "2026-06-02T00:00:00Z"}]}
|
||||
)
|
||||
expired = FilterPrefs.from_dict(
|
||||
{"pauses": [{"kind": "topic", "value": "health", "until": "2026-05-01T00:00:00Z"}]}
|
||||
)
|
||||
assert filter_articles(items, active, NOW) == []
|
||||
assert filter_articles(items, expired, NOW) == items
|
||||
|
||||
|
||||
def test_pause_active_helper():
|
||||
assert Pause("topic", "health", "2026-06-02T00:00:00Z").active(NOW)
|
||||
assert not Pause("topic", "health", "2026-05-01T00:00:00Z").active(NOW)
|
||||
assert not Pause("topic", "health", "garbage").active(NOW)
|
||||
@@ -0,0 +1,48 @@
|
||||
from goodnews.scoring import score_article
|
||||
|
||||
|
||||
def test_constructive_story_is_accepted():
|
||||
s = score_article("Community volunteers restore creek habitat", "A hopeful recovery effort", 3)
|
||||
assert s["accepted"] == 1
|
||||
assert s["constructive_score"] >= 5
|
||||
assert s["reason_code"] == "heuristic_constructive_candidate"
|
||||
|
||||
|
||||
def test_neutral_story_needs_review():
|
||||
s = score_article("The weather report for tomorrow", None, 3)
|
||||
assert s["accepted"] == 0
|
||||
assert s["reason_code"] == "heuristic_needs_review"
|
||||
|
||||
|
||||
def test_cortisol_heavy_is_rejected():
|
||||
s = score_article("War and death as murder and attack escalate", None, 3)
|
||||
assert s["accepted"] == 0
|
||||
assert s["cortisol_score"] > 5
|
||||
assert s["reason_code"] == "heuristic_reject_cortisol_heavy"
|
||||
|
||||
|
||||
def test_ragebait_is_rejected_before_cortisol():
|
||||
s = score_article("Senator slams rival and sparks backlash", None, 3)
|
||||
assert s["accepted"] == 0
|
||||
assert s["ragebait_score"] > 3
|
||||
assert s["reason_code"] == "heuristic_reject_ragebait_language"
|
||||
|
||||
|
||||
def test_pr_risk_from_source_and_terms_rejects():
|
||||
s = score_article("Startup announces funding round and unveils brand", None, 6)
|
||||
assert s["pr_risk_score"] > 7
|
||||
assert s["accepted"] == 0
|
||||
|
||||
|
||||
def test_all_scores_within_bounds():
|
||||
s = score_article("breakthrough cure restores hope " * 10, "progress " * 20, 3)
|
||||
for key in (
|
||||
"constructive_score",
|
||||
"cortisol_score",
|
||||
"ragebait_score",
|
||||
"agency_score",
|
||||
"human_benefit_score",
|
||||
"novelty_score",
|
||||
"pr_risk_score",
|
||||
):
|
||||
assert 0 <= s[key] <= 10, key
|
||||
@@ -0,0 +1,36 @@
|
||||
from goodnews.text import canonicalize_url, clean_text, sha256_text
|
||||
|
||||
|
||||
def test_clean_text_strips_tags_and_entities():
|
||||
assert clean_text("<p>Hello& world</p>") == "Hello& world"
|
||||
|
||||
|
||||
def test_clean_text_truncates():
|
||||
out = clean_text("x" * 50, max_len=10)
|
||||
assert out.endswith("...") and len(out) <= 10
|
||||
|
||||
|
||||
def test_clean_text_empty_is_none():
|
||||
assert clean_text("") is None
|
||||
assert clean_text(None) is None
|
||||
|
||||
|
||||
def test_canonicalize_strips_tracking_params():
|
||||
url = "https://Example.com/story?utm_source=x&id=7&fbclid=abc"
|
||||
out = canonicalize_url(url)
|
||||
assert "utm_source" not in out and "fbclid" not in out
|
||||
assert "id=7" in out
|
||||
assert out.startswith("https://example.com") # scheme/host lowercased
|
||||
|
||||
|
||||
def test_canonicalize_sorts_query_for_stable_hash():
|
||||
a = canonicalize_url("https://e.com/p?b=2&a=1")
|
||||
b = canonicalize_url("https://e.com/p?a=1&b=2")
|
||||
assert a == b
|
||||
assert sha256_text(a) == sha256_text(b)
|
||||
|
||||
|
||||
def test_canonicalize_rejects_non_http():
|
||||
assert canonicalize_url("ftp://e.com/x") is None
|
||||
assert canonicalize_url("javascript:alert(1)") is None
|
||||
assert canonicalize_url(None) is None
|
||||
Reference in New Issue
Block a user