Add semantic cross-source dedup via local embeddings
- LocalModelClient.embed() calls the OpenAI-compatible /embeddings endpoint (local nomic model); base_url shared with chat, model via GOODNEWS_EMBED_MODEL. - New article_embeddings table and articles.duplicate_of column (+ migration). - dedup module: embeds missing articles, clusters near-identical stories within a date window by cosine similarity (pure-stdlib, vectors normalised once), and marks all but the highest-ranked member of each cluster as a duplicate. - 'dedup' CLI command; cycle now runs poll -> classify -> dedup -> brief. - Feed and brief queries hide duplicates, so a story carried by multiple outlets shows once. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -15,6 +15,8 @@ python3 -m goodnews poll --limit 3
|
||||
python3 -m goodnews rescore
|
||||
python3 -m goodnews check-llm --base-url http://127.0.0.1:1234/v1 --model gpt-oss
|
||||
python3 -m goodnews classify --limit 10 --base-url http://127.0.0.1:1234/v1 --model gpt-oss
|
||||
python3 -m goodnews dedup --base-url http://127.0.0.1:1234/v1
|
||||
python3 -m goodnews check-feeds
|
||||
python3 -m goodnews build-brief --date 2026-05-27 --replace
|
||||
python3 -m goodnews show-brief
|
||||
python3 -m goodnews list-recent --limit 10
|
||||
@@ -49,6 +51,18 @@ and one **flavor**, allowing browsable category feeds (e.g. "feel-good animals",
|
||||
The allowed values live in `goodnews/taxonomy.py`. The accept/reject gate is kept
|
||||
deliberately broad ("not dreary"); ranking and category filters do the curation.
|
||||
|
||||
## Deduplication
|
||||
|
||||
Two layers:
|
||||
|
||||
- **Exact**: a URL hash UNIQUE constraint drops the literal same link at ingest.
|
||||
- **Semantic**: `dedup` embeds each article's title+snippet with the local
|
||||
embedding model, clusters near-identical stories within a few-day window
|
||||
(cosine similarity), and marks all but the highest-ranked in each cluster as
|
||||
`duplicate_of` the representative. Feed and brief queries hide duplicates, so
|
||||
the same story carried by several outlets appears once. This runs as part of
|
||||
`cycle`, so the scheduler keeps the corpus deduped automatically.
|
||||
|
||||
## Stored Article Data
|
||||
|
||||
For each article, the database stores:
|
||||
@@ -112,7 +126,7 @@ often as you like — it only polls sources that are *due* (per each source's
|
||||
rebuilds the current day's brief:
|
||||
|
||||
```bash
|
||||
python3 -m goodnews cycle # poll due -> classify new -> rebuild today's brief
|
||||
python3 -m goodnews cycle # poll due -> classify new -> dedup -> rebuild today's brief
|
||||
python3 -m goodnews cycle --force # poll every active source regardless of interval
|
||||
python3 -m goodnews cycle --no-classify # skip the LLM step (e.g. model box offline)
|
||||
```
|
||||
|
||||
@@ -118,6 +118,7 @@ def _candidate_articles(
|
||||
JOIN sources src ON src.id = a.source_id
|
||||
JOIN article_scores s ON s.article_id = a.id
|
||||
WHERE s.accepted = 1
|
||||
AND a.duplicate_of IS NULL
|
||||
AND date(COALESCE(a.published_at, a.discovered_at)) <= date(?)
|
||||
AND date(COALESCE(a.published_at, a.discovered_at)) > date(?, '-' || ? || ' days')
|
||||
AND a.id NOT IN (
|
||||
|
||||
@@ -8,6 +8,7 @@ from pathlib import Path
|
||||
|
||||
from .briefs import build_daily_brief, show_brief
|
||||
from .db import connect, init_db
|
||||
from .dedup import DEFAULT_THRESHOLD, DEFAULT_WINDOW_DAYS, dedup as run_dedup
|
||||
from .feeds import fetch_feed, parse_feed, poll_all_sources, poll_due_sources, poll_source
|
||||
from .llm import LocalModelClient, classify_articles
|
||||
from .scoring import score_article
|
||||
@@ -68,11 +69,19 @@ def main() -> None:
|
||||
)
|
||||
cycle_parser.add_argument("--classify-limit", type=int, default=40)
|
||||
cycle_parser.add_argument("--no-classify", action="store_true", help="Skip the LLM classify step")
|
||||
cycle_parser.add_argument("--no-dedup", action="store_true", help="Skip the embedding dedup step")
|
||||
cycle_parser.add_argument("--no-brief", action="store_true", help="Skip rebuilding today's brief")
|
||||
cycle_parser.add_argument("--force", action="store_true", help="Poll all active sources, ignoring intervals")
|
||||
cycle_parser.add_argument("--base-url", help="OpenAI-compatible base URL for classify")
|
||||
cycle_parser.add_argument("--model", help="Local model name for classify")
|
||||
|
||||
dedup_parser = subparsers.add_parser("dedup", help="Cluster near-duplicate stories via local embeddings")
|
||||
dedup_parser.add_argument("--threshold", type=float, default=DEFAULT_THRESHOLD, help="Cosine similarity cutoff")
|
||||
dedup_parser.add_argument("--window-days", type=int, default=DEFAULT_WINDOW_DAYS)
|
||||
dedup_parser.add_argument("--embed-limit", type=int, help="Cap how many missing embeddings to compute")
|
||||
dedup_parser.add_argument("--base-url", help="OpenAI-compatible base URL")
|
||||
dedup_parser.add_argument("--model", help="Chat model name (unused for embeddings)")
|
||||
|
||||
check_llm_parser = subparsers.add_parser("check-llm", help="Check local OpenAI-compatible model endpoint")
|
||||
check_llm_parser.add_argument("--base-url", help="OpenAI-compatible base URL, e.g. http://127.0.0.1:1234/v1")
|
||||
check_llm_parser.add_argument("--model", help="Expected local model name")
|
||||
@@ -153,6 +162,17 @@ def main() -> None:
|
||||
print("Dry run only; database was not updated.")
|
||||
elif args.command == "cycle":
|
||||
run_cycle(conn, args)
|
||||
elif args.command == "dedup":
|
||||
init_db(conn)
|
||||
client = llm_client_from_args(args)
|
||||
stats = run_dedup(
|
||||
conn, client, threshold=args.threshold, window_days=args.window_days, embed_limit=args.embed_limit
|
||||
)
|
||||
print(
|
||||
f"dedup: embedded={stats['embedded']} articles={stats['articles']} "
|
||||
f"clusters={stats['clusters']} duplicate_clusters={stats['duplicate_clusters']} "
|
||||
f"duplicates_hidden={stats['duplicates']}"
|
||||
)
|
||||
elif args.command == "check-llm":
|
||||
client = llm_client_from_args(args)
|
||||
try:
|
||||
@@ -256,6 +276,13 @@ def run_cycle(conn: sqlite3.Connection, args: argparse.Namespace) -> None:
|
||||
except Exception as exc: # endpoint down, timeout, etc. — keep going
|
||||
print(f"classify: skipped ({exc})")
|
||||
|
||||
if not args.no_dedup:
|
||||
try:
|
||||
stats = run_dedup(conn, llm_client_from_args(args))
|
||||
print(f"dedup: embedded={stats['embedded']} duplicates_hidden={stats['duplicates']}")
|
||||
except Exception as exc:
|
||||
print(f"dedup: skipped ({exc})")
|
||||
|
||||
if not args.no_brief:
|
||||
today = date.today().isoformat()
|
||||
try:
|
||||
|
||||
+19
-2
@@ -37,6 +37,7 @@ CREATE TABLE IF NOT EXISTS articles (
|
||||
raw_guid TEXT,
|
||||
url_hash TEXT NOT NULL UNIQUE,
|
||||
title_hash TEXT,
|
||||
duplicate_of INTEGER REFERENCES articles(id) ON DELETE SET NULL,
|
||||
FOREIGN KEY (source_id) REFERENCES sources(id)
|
||||
);
|
||||
|
||||
@@ -62,6 +63,14 @@ CREATE TABLE IF NOT EXISTS article_scores (
|
||||
scored_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS article_embeddings (
|
||||
article_id INTEGER PRIMARY KEY REFERENCES articles(id) ON DELETE CASCADE,
|
||||
vector BLOB NOT NULL,
|
||||
dim INTEGER NOT NULL,
|
||||
model TEXT NOT NULL,
|
||||
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS ingest_runs (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
source_id INTEGER REFERENCES sources(id) ON DELETE SET NULL,
|
||||
@@ -114,7 +123,15 @@ def _migrate(conn: sqlite3.Connection) -> None:
|
||||
CREATE TABLE IF NOT EXISTS never alters an existing table, so new columns
|
||||
need an explicit, idempotent ALTER guarded by the current column set.
|
||||
"""
|
||||
cols = {row["name"] for row in conn.execute("PRAGMA table_info(article_scores)")}
|
||||
score_cols = {row["name"] for row in conn.execute("PRAGMA table_info(article_scores)")}
|
||||
for column in ("topic", "flavor"):
|
||||
if column not in cols:
|
||||
if column not in score_cols:
|
||||
conn.execute(f"ALTER TABLE article_scores ADD COLUMN {column} TEXT")
|
||||
|
||||
article_cols = {row["name"] for row in conn.execute("PRAGMA table_info(articles)")}
|
||||
if "duplicate_of" not in article_cols:
|
||||
conn.execute(
|
||||
"ALTER TABLE articles ADD COLUMN duplicate_of INTEGER REFERENCES articles(id)"
|
||||
)
|
||||
# Created here (not in SCHEMA) so it runs after the column exists on upgrades.
|
||||
conn.execute("CREATE INDEX IF NOT EXISTS idx_articles_duplicate_of ON articles(duplicate_of)")
|
||||
|
||||
@@ -0,0 +1,171 @@
|
||||
"""Cross-source near-duplicate detection via local embeddings.
|
||||
|
||||
The exact-URL dedupe in feeds.py only catches the literal same link. The same
|
||||
story carried by several outlets slips through as separate articles. Here we
|
||||
embed each article's title+snippet with the local embedding model, cluster
|
||||
near-identical ones within a short time window, and mark all but the best in
|
||||
each cluster as duplicates (articles.duplicate_of). Feed and brief queries then
|
||||
hide duplicates, keeping the single strongest version.
|
||||
|
||||
Pure-stdlib math: vectors are normalised once so cosine similarity is a dot
|
||||
product, and comparisons are restricted to a date window, so no numpy is needed.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import math
|
||||
import sqlite3
|
||||
from array import array
|
||||
from datetime import date
|
||||
|
||||
from .llm import LocalModelClient
|
||||
|
||||
DEFAULT_THRESHOLD = 0.86
|
||||
DEFAULT_WINDOW_DAYS = 3
|
||||
_EMBED_BATCH = 16
|
||||
|
||||
|
||||
def _embed_text(title: str, description: str | None) -> str:
|
||||
text = title.strip()
|
||||
if description:
|
||||
text += ". " + description.strip()
|
||||
return text[:2000]
|
||||
|
||||
|
||||
def ensure_embeddings(
|
||||
conn: sqlite3.Connection, client: LocalModelClient, limit: int | None = None
|
||||
) -> int:
|
||||
"""Embed and store any articles that lack an embedding. Returns count added."""
|
||||
rows = conn.execute(
|
||||
"""
|
||||
SELECT a.id, a.title, a.description
|
||||
FROM articles a
|
||||
LEFT JOIN article_embeddings e ON e.article_id = a.id
|
||||
WHERE e.article_id IS NULL
|
||||
ORDER BY a.id
|
||||
"""
|
||||
).fetchall()
|
||||
if limit is not None:
|
||||
rows = rows[:limit]
|
||||
if not rows:
|
||||
return 0
|
||||
|
||||
added = 0
|
||||
for start in range(0, len(rows), _EMBED_BATCH):
|
||||
batch = rows[start : start + _EMBED_BATCH]
|
||||
vectors = client.embed([_embed_text(r["title"], r["description"]) for r in batch])
|
||||
for row, vector in zip(batch, vectors):
|
||||
conn.execute(
|
||||
"INSERT OR REPLACE INTO article_embeddings (article_id, vector, dim, model) "
|
||||
"VALUES (?, ?, ?, ?)",
|
||||
(row["id"], array("f", vector).tobytes(), len(vector), client.embed_model),
|
||||
)
|
||||
added += 1
|
||||
conn.commit()
|
||||
return added
|
||||
|
||||
|
||||
def _unit(vector: list[float]) -> list[float]:
|
||||
norm = math.sqrt(sum(x * x for x in vector))
|
||||
if norm == 0:
|
||||
return vector
|
||||
return [x / norm for x in vector]
|
||||
|
||||
|
||||
def _day_ordinal(value: str | None) -> int:
|
||||
if not value:
|
||||
return 0
|
||||
try:
|
||||
return date.fromisoformat(value[:10]).toordinal()
|
||||
except ValueError:
|
||||
return 0
|
||||
|
||||
|
||||
def cluster_duplicates(
|
||||
conn: sqlite3.Connection,
|
||||
threshold: float = DEFAULT_THRESHOLD,
|
||||
window_days: int = DEFAULT_WINDOW_DAYS,
|
||||
) -> dict:
|
||||
"""Group near-identical articles and record duplicate_of links.
|
||||
|
||||
Greedy single-link clustering: each article joins the first existing cluster
|
||||
whose anchor it matches (cosine >= threshold, within window_days); otherwise
|
||||
it starts a new cluster. The highest-ranked member of each cluster becomes
|
||||
the representative; the rest point at it.
|
||||
"""
|
||||
rows = conn.execute(
|
||||
"""
|
||||
SELECT
|
||||
a.id,
|
||||
COALESCE(a.published_at, a.discovered_at) AS dt,
|
||||
e.vector,
|
||||
(COALESCE(s.constructive_score,0) + COALESCE(s.agency_score,0)
|
||||
+ COALESCE(s.human_benefit_score,0) + src.trust_score
|
||||
- COALESCE(s.cortisol_score,0) - COALESCE(s.ragebait_score,0)
|
||||
- COALESCE(s.pr_risk_score,0)) AS rank_score
|
||||
FROM articles a
|
||||
JOIN article_embeddings e ON e.article_id = a.id
|
||||
JOIN sources src ON src.id = a.source_id
|
||||
LEFT JOIN article_scores s ON s.article_id = a.id
|
||||
ORDER BY dt
|
||||
"""
|
||||
).fetchall()
|
||||
|
||||
items = []
|
||||
for r in rows:
|
||||
vec = _unit(array("f", r["vector"]).tolist())
|
||||
items.append({"id": r["id"], "ord": _day_ordinal(r["dt"]), "vec": vec, "score": r["rank_score"]})
|
||||
|
||||
clusters: list[dict] = [] # {anchor_vec, anchor_ord, members:[item]}
|
||||
for it in items:
|
||||
placed = False
|
||||
for cl in clusters:
|
||||
if abs(it["ord"] - cl["anchor_ord"]) > window_days:
|
||||
continue
|
||||
dot = sum(x * y for x, y in zip(it["vec"], cl["anchor_vec"]))
|
||||
if dot >= threshold:
|
||||
cl["members"].append(it)
|
||||
placed = True
|
||||
break
|
||||
if not placed:
|
||||
clusters.append({"anchor_vec": it["vec"], "anchor_ord": it["ord"], "members": [it]})
|
||||
|
||||
# Reset prior decisions for everything we considered, then re-apply.
|
||||
considered = [it["id"] for it in items]
|
||||
conn.executemany(
|
||||
"UPDATE articles SET duplicate_of = NULL WHERE id = ?", [(i,) for i in considered]
|
||||
)
|
||||
|
||||
dup_clusters = 0
|
||||
duplicates = 0
|
||||
for cl in clusters:
|
||||
if len(cl["members"]) < 2:
|
||||
continue
|
||||
dup_clusters += 1
|
||||
rep = max(cl["members"], key=lambda m: (m["score"], -m["id"]))
|
||||
for m in cl["members"]:
|
||||
if m["id"] != rep["id"]:
|
||||
conn.execute(
|
||||
"UPDATE articles SET duplicate_of = ? WHERE id = ?", (rep["id"], m["id"])
|
||||
)
|
||||
duplicates += 1
|
||||
conn.commit()
|
||||
return {
|
||||
"articles": len(items),
|
||||
"clusters": len(clusters),
|
||||
"duplicate_clusters": dup_clusters,
|
||||
"duplicates": duplicates,
|
||||
}
|
||||
|
||||
|
||||
def dedup(
|
||||
conn: sqlite3.Connection,
|
||||
client: LocalModelClient,
|
||||
threshold: float = DEFAULT_THRESHOLD,
|
||||
window_days: int = DEFAULT_WINDOW_DAYS,
|
||||
embed_limit: int | None = None,
|
||||
) -> dict:
|
||||
embedded = ensure_embeddings(conn, client, limit=embed_limit)
|
||||
stats = cluster_duplicates(conn, threshold=threshold, window_days=window_days)
|
||||
stats["embedded"] = embedded
|
||||
return stats
|
||||
@@ -19,6 +19,7 @@ from .taxonomy import (
|
||||
|
||||
DEFAULT_BASE_URL = "http://127.0.0.1:1234/v1"
|
||||
DEFAULT_MODEL = "gpt-oss"
|
||||
DEFAULT_EMBED_MODEL = "text-embedding-nomic-embed-text-v1.5"
|
||||
DEFAULT_TIMEOUT = 180
|
||||
|
||||
|
||||
@@ -106,6 +107,7 @@ class LocalModelClient:
|
||||
model: str
|
||||
api_key: str | None = None
|
||||
timeout: int = DEFAULT_TIMEOUT
|
||||
embed_model: str = DEFAULT_EMBED_MODEL
|
||||
# Index into _RESPONSE_FORMATS that the server accepts; discovered lazily.
|
||||
_response_format_idx: int | None = None
|
||||
|
||||
@@ -116,8 +118,31 @@ class LocalModelClient:
|
||||
model=os.environ.get("GOODNEWS_LLM_MODEL", DEFAULT_MODEL),
|
||||
api_key=os.environ.get("GOODNEWS_LLM_API_KEY"),
|
||||
timeout=int(os.environ.get("GOODNEWS_LLM_TIMEOUT", DEFAULT_TIMEOUT)),
|
||||
embed_model=os.environ.get("GOODNEWS_EMBED_MODEL", DEFAULT_EMBED_MODEL),
|
||||
)
|
||||
|
||||
def embed(self, texts: list[str]) -> list[list[float]]:
|
||||
"""Return embedding vectors for a batch of texts via /embeddings."""
|
||||
body = json.dumps({"model": self.embed_model, "input": texts}).encode("utf-8")
|
||||
headers = {"Content-Type": "application/json"}
|
||||
if self.api_key:
|
||||
headers["Authorization"] = f"Bearer {self.api_key}"
|
||||
request = urllib.request.Request(
|
||||
f"{self.base_url}/embeddings", data=body, headers=headers, method="POST"
|
||||
)
|
||||
try:
|
||||
with urllib.request.urlopen(request, timeout=self.timeout) as response:
|
||||
data = json.loads(response.read().decode("utf-8"))
|
||||
except urllib.error.HTTPError as exc:
|
||||
detail = exc.read().decode("utf-8", errors="replace")
|
||||
raise RuntimeError(f"HTTP {exc.code} from embeddings: {detail}") from exc
|
||||
except urllib.error.URLError as exc:
|
||||
raise RuntimeError(f"could not reach embeddings at {self.base_url}: {exc.reason}") from exc
|
||||
try:
|
||||
return [item["embedding"] for item in data["data"]]
|
||||
except (KeyError, TypeError) as exc:
|
||||
raise RuntimeError(f"unexpected embeddings response: {data}") from exc
|
||||
|
||||
def classify(self, article: sqlite3.Row) -> dict:
|
||||
messages = [
|
||||
{"role": "system", "content": SYSTEM_PROMPT},
|
||||
|
||||
+1
-1
@@ -49,7 +49,7 @@ def feed(
|
||||
offset: int = 0,
|
||||
) -> list[dict]:
|
||||
"""Return ranked articles, optionally filtered by topic and/or flavor."""
|
||||
clauses = []
|
||||
clauses = ["a.duplicate_of IS NULL"]
|
||||
params: list = []
|
||||
if accepted_only:
|
||||
clauses.append("s.accepted = 1")
|
||||
|
||||
Reference in New Issue
Block a user