"""Cross-source near-duplicate detection via local embeddings. The exact-URL dedupe in feeds.py only catches the literal same link. The same story carried by several outlets slips through as separate articles. Here we embed each article's title+snippet with the local embedding model, cluster near-identical ones within a short time window, and mark all but the best in each cluster as duplicates (articles.duplicate_of). Feed and brief queries then hide duplicates, keeping the single strongest version. Pure-stdlib math: vectors are normalised once so cosine similarity is a dot product, and comparisons are restricted to a date window, so no numpy is needed. """ from __future__ import annotations import math import sqlite3 from array import array from datetime import date from .llm import LocalModelClient DEFAULT_THRESHOLD = 0.86 DEFAULT_WINDOW_DAYS = 3 _EMBED_BATCH = 16 def _embed_text(title: str, description: str | None) -> str: text = title.strip() if description: text += ". " + description.strip() return text[:2000] def ensure_embeddings( conn: sqlite3.Connection, client: LocalModelClient, limit: int | None = None ) -> int: """Embed and store any articles that lack an embedding. Returns count added.""" rows = conn.execute( """ SELECT a.id, a.title, a.description FROM articles a LEFT JOIN article_embeddings e ON e.article_id = a.id WHERE e.article_id IS NULL ORDER BY a.id """ ).fetchall() if limit is not None: rows = rows[:limit] if not rows: return 0 added = 0 for start in range(0, len(rows), _EMBED_BATCH): batch = rows[start : start + _EMBED_BATCH] vectors = client.embed([_embed_text(r["title"], r["description"]) for r in batch]) for row, vector in zip(batch, vectors): conn.execute( "INSERT OR REPLACE INTO article_embeddings (article_id, vector, dim, model) " "VALUES (?, ?, ?, ?)", (row["id"], array("f", vector).tobytes(), len(vector), client.embed_model), ) added += 1 conn.commit() return added def _unit(vector: list[float]) -> list[float]: norm = math.sqrt(sum(x * x for x in vector)) if norm == 0: return vector return [x / norm for x in vector] def _day_ordinal(value: str | None) -> int: if not value: return 0 try: return date.fromisoformat(value[:10]).toordinal() except ValueError: return 0 def cluster_duplicates( conn: sqlite3.Connection, threshold: float = DEFAULT_THRESHOLD, window_days: int = DEFAULT_WINDOW_DAYS, ) -> dict: """Group near-identical articles and record duplicate_of links. Greedy single-link clustering: each article joins the first existing cluster whose anchor it matches (cosine >= threshold, within window_days); otherwise it starts a new cluster. The highest-ranked member of each cluster becomes the representative; the rest point at it. """ rows = conn.execute( """ SELECT a.id, COALESCE(a.published_at, a.discovered_at) AS dt, e.vector, (COALESCE(s.constructive_score,0) + COALESCE(s.agency_score,0) + COALESCE(s.human_benefit_score,0) + src.trust_score - COALESCE(s.cortisol_score,0) - COALESCE(s.ragebait_score,0) - COALESCE(s.pr_risk_score,0)) AS rank_score, COALESCE(s.accepted, 0) AS accepted FROM articles a JOIN article_embeddings e ON e.article_id = a.id JOIN sources src ON src.id = a.source_id LEFT JOIN article_scores s ON s.article_id = a.id ORDER BY dt """ ).fetchall() items = [] for r in rows: vec = _unit(array("f", r["vector"]).tolist()) items.append({"id": r["id"], "ord": _day_ordinal(r["dt"]), "vec": vec, "score": r["rank_score"], "accepted": bool(r["accepted"])}) clusters: list[dict] = [] # {anchor_vec, anchor_ord, members:[item]} for it in items: placed = False for cl in clusters: if abs(it["ord"] - cl["anchor_ord"]) > window_days: continue dot = sum(x * y for x, y in zip(it["vec"], cl["anchor_vec"])) if dot >= threshold: cl["members"].append(it) placed = True break if not placed: clusters.append({"anchor_vec": it["vec"], "anchor_ord": it["ord"], "members": [it]}) # Which articles are CURRENTLY a representative (something points at them)? Captured # BEFORE we reset, so we can keep an established canonical stable across runs. prior_reps = { row[0] for row in conn.execute( "SELECT DISTINCT duplicate_of FROM articles WHERE duplicate_of IS NOT NULL" ) } # Reset prior decisions for everything we considered, then re-apply. considered = [it["id"] for it in items] conn.executemany( "UPDATE articles SET duplicate_of = NULL WHERE id = ?", [(i,) for i in considered] ) dup_clusters = 0 duplicates = 0 for cl in clusters: if len(cl["members"]) < 2: continue dup_clusters += 1 # Representative priority (highest wins), in order: # 1. accepted/serveable — an accepted page must never be retired to a REJECTED # rep (that page would 404 with nothing to redirect to). # 2. established rep — if a member is already the cluster's canonical, keep it, # so an indexed URL doesn't churn when a newer twin arrives. # 3. quality score — decides genuinely-new clusters. # 4. -id — deterministic final tiebreak (older wins). rep = max(cl["members"], key=lambda m: ( 1 if m["accepted"] else 0, 1 if m["id"] in prior_reps else 0, m["score"], -m["id"], )) for m in cl["members"]: if m["id"] != rep["id"]: conn.execute( "UPDATE articles SET duplicate_of = ? WHERE id = ?", (rep["id"], m["id"]) ) duplicates += 1 conn.commit() return { "articles": len(items), "clusters": len(clusters), "duplicate_clusters": dup_clusters, "duplicates": duplicates, } def dedup( conn: sqlite3.Connection, client: LocalModelClient, threshold: float = DEFAULT_THRESHOLD, window_days: int = DEFAULT_WINDOW_DAYS, embed_limit: int | None = None, ) -> dict: embedded = ensure_embeddings(conn, client, limit=embed_limit) if embedded == 0: # Nothing new entered the corpus → the clusters and duplicate_of links are # unchanged, so skip the full re-cluster. It was re-running an O(n²) cosine # pass over EVERY article and rewriting duplicate_of for all ~3.7k of them # every cycle (~53s + a large WAL commit), which starved live API reads # (/api/brief 2-7s). Most cycles find no new articles, so this makes the # cycle near-instant and keeps reads fast. A real new article re-runs it. dups = conn.execute("SELECT COUNT(*) FROM articles WHERE duplicate_of IS NOT NULL").fetchone()[0] return {"embedded": 0, "articles": 0, "clusters": 0, "duplicate_clusters": 0, "duplicates": dups, "skipped": True} stats = cluster_duplicates(conn, threshold=threshold, window_days=window_days) stats["embedded"] = embedded return stats