Initial commit: goodNews constructive-news ingestion prototype

Local-first RSS/Atom ingestion pipeline with metadata-only storage,
heuristic + local-LLM scoring, and daily brief builder.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
jay
2026-05-30 00:48:26 +00:00
commit 068073423f
14 changed files with 1722 additions and 0 deletions
+6
View File
@@ -0,0 +1,6 @@
__pycache__/
*.py[cod]
.venv/
data/*.sqlite3
data/*.sqlite3-*
+79
View File
@@ -0,0 +1,79 @@
# goodNews
Local-first constructive news ingestion prototype.
The first milestone is intentionally small: collect public RSS/Atom metadata, dedupe it, store short source-provided snippets, and attach early reason-coded heuristic scores. It does not store full article bodies.
## Commands
From this directory:
```bash
python3 -m goodnews init-db
python3 -m goodnews import-sources
python3 -m goodnews poll --limit 3
python3 -m goodnews rescore
python3 -m goodnews check-llm --base-url http://127.0.0.1:1234/v1 --model gpt-oss
python3 -m goodnews classify --limit 10 --base-url http://127.0.0.1:1234/v1 --model gpt-oss
python3 -m goodnews build-brief --date 2026-05-27 --replace
python3 -m goodnews show-brief
python3 -m goodnews list-recent --limit 10
python3 -m goodnews list-recent --accepted-only --limit 10
python3 -m goodnews source-report
python3 -m goodnews list-runs
```
The SQLite database lives at:
```txt
data/goodnews.sqlite3
```
Sources live at:
```txt
config/sources.toml
```
## Stored Article Data
For each article, the database stores:
- source
- canonical URL
- title
- short RSS/Atom description or summary
- author, if present
- published timestamp, if present
- image URL, if present
- language, if present
- hashes used for dedupe
- heuristic scores and reason codes
## Next Steps
1. Run the poller for a few days and inspect which sources produce useful candidates.
2. Add source-level quality notes and deactivate noisy feeds.
3. Replace or supplement `heuristic-v0` with a local model classifier.
4. Add a daily brief builder that selects 5 items using scores and source diversity.
5. Add a small web/API layer once the ingest data looks trustworthy.
## Local Model Configuration
The `classify` command expects an OpenAI-compatible local chat-completions server.
You can pass settings directly:
```bash
python3 -m goodnews classify --base-url http://127.0.0.1:1234/v1 --model gpt-oss --limit 10
```
Or use environment variables:
```bash
export GOODNEWS_LLM_BASE_URL=http://127.0.0.1:1234/v1
export GOODNEWS_LLM_MODEL=gpt-oss
python3 -m goodnews classify --limit 10
```
`classify` rewrites the current score/reason row for selected candidates. `rescore` can restore the fast heuristic scores.
+120
View File
@@ -0,0 +1,120 @@
[[sources]]
name = "Good News Network"
homepage_url = "https://www.goodnewsnetwork.org/"
feed_url = "https://www.goodnewsnetwork.org/feed/"
default_category = "constructive"
trust_score = 6
pr_risk_score = 3
poll_interval_minutes = 120
notes = "Explicit good-news source; useful for early calibration."
[[sources]]
name = "Positive News"
homepage_url = "https://www.positive.news/"
feed_url = "https://www.positive.news/feed/"
default_category = "constructive"
trust_score = 7
pr_risk_score = 3
poll_interval_minutes = 180
notes = "Constructive journalism source."
[[sources]]
name = "Reasons to be Cheerful"
homepage_url = "https://reasonstobecheerful.world/"
feed_url = "https://reasonstobecheerful.world/feed/"
default_category = "constructive"
trust_score = 7
pr_risk_score = 3
poll_interval_minutes = 180
notes = "Solutions-oriented reporting."
[[sources]]
name = "Happy Eco News"
homepage_url = "https://happyeconews.com/"
feed_url = "https://happyeconews.com/feed/"
default_category = "environment"
trust_score = 5
pr_risk_score = 4
poll_interval_minutes = 180
notes = "Environmental good-news candidate source."
[[sources]]
name = "Mongabay"
homepage_url = "https://news.mongabay.com/"
feed_url = "https://news.mongabay.com/feed/"
default_category = "environment"
trust_score = 8
pr_risk_score = 2
poll_interval_minutes = 120
notes = "Environmental reporting; not always low-cortisol, but often constructive."
[[sources]]
name = "ScienceDaily Top Science"
homepage_url = "https://www.sciencedaily.com/"
feed_url = "https://www.sciencedaily.com/rss/top/science.xml"
default_category = "science"
trust_score = 6
pr_risk_score = 3
poll_interval_minutes = 120
notes = "Science discovery feed."
[[sources]]
name = "ScienceDaily Health"
homepage_url = "https://www.sciencedaily.com/news/health_medicine/"
feed_url = "https://www.sciencedaily.com/rss/health_medicine.xml"
default_category = "health-progress"
trust_score = 6
pr_risk_score = 3
poll_interval_minutes = 120
notes = "Health and medicine research feed."
[[sources]]
name = "ScienceDaily Environment"
homepage_url = "https://www.sciencedaily.com/news/earth_climate/environmental_science/"
feed_url = "https://www.sciencedaily.com/rss/earth_climate/environmental_science.xml"
default_category = "environment"
trust_score = 6
pr_risk_score = 3
poll_interval_minutes = 120
notes = "Environment research feed."
[[sources]]
name = "NPR Science"
homepage_url = "https://www.npr.org/sections/science/"
feed_url = "https://feeds.npr.org/1007/rss.xml"
default_category = "science"
trust_score = 8
pr_risk_score = 2
poll_interval_minutes = 90
notes = "Mainstream science feed; mix of constructive and general coverage."
[[sources]]
name = "NPR Health"
homepage_url = "https://www.npr.org/sections/health/"
feed_url = "https://feeds.npr.org/1128/rss.xml"
default_category = "health-progress"
trust_score = 8
pr_risk_score = 2
poll_interval_minutes = 90
notes = "Health feed; needs cortisol filtering."
[[sources]]
name = "BBC Science and Environment"
homepage_url = "https://www.bbc.com/news/science_and_environment"
feed_url = "https://feeds.bbci.co.uk/news/science_and_environment/rss.xml"
default_category = "science"
trust_score = 8
pr_risk_score = 2
poll_interval_minutes = 90
notes = "Broad science/environment feed; needs filtering."
[[sources]]
name = "Futurity"
homepage_url = "https://www.futurity.org/"
feed_url = "https://www.futurity.org/feed/"
default_category = "science"
trust_score = 6
pr_risk_score = 4
poll_interval_minutes = 120
notes = "University research stories; watch PR framing."
+2
View File
@@ -0,0 +1,2 @@
__version__ = "0.1.0"
+6
View File
@@ -0,0 +1,6 @@
from .cli import main
if __name__ == "__main__":
main()
+167
View File
@@ -0,0 +1,167 @@
from __future__ import annotations
import sqlite3
from datetime import date
def build_daily_brief(
conn: sqlite3.Connection,
brief_date: str | None = None,
limit: int = 5,
replace: bool = False,
) -> int:
target_date = brief_date or date.today().isoformat()
existing = conn.execute("SELECT id FROM daily_briefs WHERE brief_date = ?", (target_date,)).fetchone()
if existing and not replace:
return int(existing["id"])
if existing and replace:
conn.execute("DELETE FROM daily_briefs WHERE id = ?", (existing["id"],))
brief_id = conn.execute(
"INSERT INTO daily_briefs (brief_date, title) VALUES (?, ?)",
(target_date, f"Five Good Things Today - {target_date}"),
).lastrowid
rows = _candidate_articles(conn, target_date)
selected = _select_diverse(rows, limit)
for index, row in enumerate(selected, start=1):
conn.execute(
"""
INSERT INTO daily_brief_items (brief_id, article_id, rank, selection_reason)
VALUES (?, ?, ?, ?)
""",
(
brief_id,
row["id"],
index,
_selection_reason(row),
),
)
conn.commit()
return int(brief_id)
def show_brief(conn: sqlite3.Connection, brief_date: str | None = None, limit: int = 10) -> list[sqlite3.Row]:
target_date = brief_date or _latest_brief_date(conn)
if not target_date:
return []
return conn.execute(
"""
SELECT
b.brief_date,
bi.rank,
bi.selection_reason,
a.title,
a.description,
a.canonical_url,
a.published_at,
src.name AS source_name,
src.default_category,
s.constructive_score,
s.cortisol_score,
s.ragebait_score,
s.agency_score,
s.human_benefit_score,
s.reason_code,
s.reason_text,
s.model_name
FROM daily_briefs b
JOIN daily_brief_items bi ON bi.brief_id = b.id
JOIN articles a ON a.id = bi.article_id
JOIN sources src ON src.id = a.source_id
LEFT JOIN article_scores s ON s.article_id = a.id
WHERE b.brief_date = ?
ORDER BY bi.rank
LIMIT ?
""",
(target_date, limit),
).fetchall()
def _candidate_articles(conn: sqlite3.Connection, target_date: str) -> list[sqlite3.Row]:
return conn.execute(
"""
SELECT
a.id,
a.title,
a.description,
a.canonical_url,
a.published_at,
a.discovered_at,
src.name AS source_name,
src.default_category,
src.trust_score,
s.constructive_score,
s.cortisol_score,
s.ragebait_score,
s.agency_score,
s.human_benefit_score,
s.novelty_score,
s.pr_risk_score,
s.reason_code,
s.reason_text,
s.model_name
FROM articles a
JOIN sources src ON src.id = a.source_id
JOIN article_scores s ON s.article_id = a.id
WHERE s.accepted = 1
AND date(COALESCE(a.published_at, a.discovered_at)) = date(?)
ORDER BY
(s.constructive_score + s.agency_score + s.human_benefit_score + src.trust_score
- s.cortisol_score - s.ragebait_score - s.pr_risk_score) DESC,
COALESCE(a.published_at, a.discovered_at) DESC
LIMIT 50
""",
(target_date,),
).fetchall()
def _select_diverse(rows: list[sqlite3.Row], limit: int) -> list[sqlite3.Row]:
selected = []
seen_sources = set()
seen_categories = set()
for row in rows:
if len(selected) >= limit:
break
source = row["source_name"]
category = row["default_category"]
if source in seen_sources and len(rows) > limit:
continue
selected.append(row)
seen_sources.add(source)
seen_categories.add(category)
if len(selected) < limit:
selected_ids = {row["id"] for row in selected}
for row in rows:
if len(selected) >= limit:
break
if row["id"] in selected_ids:
continue
selected.append(row)
selected_ids.add(row["id"])
if len(seen_categories) < 2 and len(rows) > limit:
selected_ids = {row["id"] for row in selected}
for row in rows:
if row["id"] in selected_ids:
continue
if row["default_category"] not in seen_categories:
selected[-1] = row
break
return selected
def _selection_reason(row: sqlite3.Row) -> str:
return (
f"{row['reason_code']}; constructive={row['constructive_score']}, "
f"agency={row['agency_score']}, human_benefit={row['human_benefit_score']}, "
f"cortisol={row['cortisol_score']}, source={row['source_name']}"
)
def _latest_brief_date(conn: sqlite3.Connection) -> str | None:
row = conn.execute("SELECT brief_date FROM daily_briefs ORDER BY brief_date DESC LIMIT 1").fetchone()
return row["brief_date"] if row else None
+352
View File
@@ -0,0 +1,352 @@
from __future__ import annotations
import argparse
import sqlite3
from pathlib import Path
from .briefs import build_daily_brief, show_brief
from .db import connect, init_db
from .feeds import poll_all_sources, poll_source
from .llm import LocalModelClient, classify_articles
from .scoring import score_article
from .sources import load_sources, upsert_sources
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_DB = ROOT / "data" / "goodnews.sqlite3"
DEFAULT_SOURCES = ROOT / "config" / "sources.toml"
def main() -> None:
parser = argparse.ArgumentParser(prog="goodnews")
parser.add_argument("--db", type=Path, default=DEFAULT_DB, help="SQLite database path")
subparsers = parser.add_subparsers(dest="command", required=True)
subparsers.add_parser("init-db", help="Create or update the SQLite schema")
import_parser = subparsers.add_parser("import-sources", help="Load sources from TOML")
import_parser.add_argument("--sources", type=Path, default=DEFAULT_SOURCES)
poll_parser = subparsers.add_parser("poll", help="Poll active RSS/Atom sources")
poll_parser.add_argument("--source", help="Poll one source by exact name")
poll_parser.add_argument("--limit", type=int, help="Poll only the first N active sources")
list_parser = subparsers.add_parser("list-recent", help="Show recently discovered articles")
list_parser.add_argument("--limit", type=int, default=20)
list_parser.add_argument("--accepted-only", action="store_true")
source_parser = subparsers.add_parser("list-sources", help="Show configured sources")
source_parser.add_argument("--active-only", action="store_true")
subparsers.add_parser("source-report", help="Show source-level ingestion and scoring stats")
runs_parser = subparsers.add_parser("list-runs", help="Show recent ingest runs")
runs_parser.add_argument("--limit", type=int, default=20)
subparsers.add_parser("rescore", help="Re-run heuristic scores for stored articles")
classify_parser = subparsers.add_parser("classify", help="Classify candidates with a local LLM")
classify_parser.add_argument("--limit", type=int, default=10)
classify_parser.add_argument("--include-rejected", action="store_true")
classify_parser.add_argument("--dry-run", action="store_true")
classify_parser.add_argument("--base-url", help="OpenAI-compatible base URL, e.g. http://127.0.0.1:1234/v1")
classify_parser.add_argument("--model", help="Local model name")
check_llm_parser = subparsers.add_parser("check-llm", help="Check local OpenAI-compatible model endpoint")
check_llm_parser.add_argument("--base-url", help="OpenAI-compatible base URL, e.g. http://127.0.0.1:1234/v1")
check_llm_parser.add_argument("--model", help="Expected local model name")
brief_parser = subparsers.add_parser("build-brief", help="Build/freeze a daily brief")
brief_parser.add_argument("--date", help="Brief date in YYYY-MM-DD format; defaults to today")
brief_parser.add_argument("--limit", type=int, default=5)
brief_parser.add_argument("--replace", action="store_true")
show_brief_parser = subparsers.add_parser("show-brief", help="Show a stored daily brief")
show_brief_parser.add_argument("--date", help="Brief date in YYYY-MM-DD format; defaults to latest brief")
show_brief_parser.add_argument("--limit", type=int, default=10)
args = parser.parse_args()
conn = connect(args.db)
if args.command == "init-db":
init_db(conn)
print(f"Initialized {args.db}")
elif args.command == "import-sources":
init_db(conn)
sources = load_sources(args.sources)
count = upsert_sources(conn, sources)
print(f"Imported {count} sources from {args.sources}")
elif args.command == "poll":
init_db(conn)
if args.source:
source = conn.execute("SELECT * FROM sources WHERE name = ?", (args.source,)).fetchone()
if not source:
raise SystemExit(f"No source named {args.source!r}")
result = poll_source(conn, source)
else:
result = poll_all_sources(conn, limit=args.limit)
print(_format_result(result))
elif args.command == "list-recent":
list_recent(conn, limit=args.limit, accepted_only=args.accepted_only)
elif args.command == "list-sources":
list_sources(conn, active_only=args.active_only)
elif args.command == "source-report":
source_report(conn)
elif args.command == "list-runs":
list_runs(conn, limit=args.limit)
elif args.command == "rescore":
count = rescore_articles(conn)
print(f"Rescored {count} articles")
elif args.command == "classify":
init_db(conn)
client = llm_client_from_args(args)
results = classify_articles(
conn,
client,
limit=args.limit,
include_rejected=args.include_rejected,
dry_run=args.dry_run,
)
for article_id, scores in results:
accepted = "yes" if scores["accepted"] else "no"
print(f"[{article_id}] accepted={accepted} reason={scores['reason_code']}")
print(f" {scores['reason_text']}")
if args.dry_run:
print("Dry run only; database was not updated.")
elif args.command == "check-llm":
client = llm_client_from_args(args)
try:
models = client.list_models()
except RuntimeError as exc:
raise SystemExit(str(exc))
print(f"Connected to {client.base_url}")
if models:
print("Models:")
for model in models:
marker = " *" if model == client.model else ""
print(f" {model}{marker}")
else:
print("Endpoint responded, but no models were listed.")
elif args.command == "build-brief":
init_db(conn)
brief_id = build_daily_brief(
conn,
brief_date=args.date,
limit=args.limit,
replace=args.replace,
)
print(f"Built brief {brief_id}")
print_brief(show_brief(conn, brief_date=args.date, limit=args.limit))
elif args.command == "show-brief":
print_brief(show_brief(conn, brief_date=args.date, limit=args.limit))
def list_recent(conn: sqlite3.Connection, limit: int, accepted_only: bool) -> None:
where = "WHERE s.accepted = 1" if accepted_only else ""
rows = conn.execute(
f"""
SELECT
a.id,
a.published_at,
src.name AS source_name,
a.title,
a.canonical_url,
s.accepted,
s.constructive_score,
s.cortisol_score,
s.ragebait_score,
s.reason_code
FROM articles a
JOIN sources src ON src.id = a.source_id
LEFT JOIN article_scores s ON s.article_id = a.id
{where}
ORDER BY COALESCE(a.published_at, a.discovered_at) DESC
LIMIT ?
""",
(limit,),
).fetchall()
for row in rows:
accepted = "yes" if row["accepted"] else "no"
print(f"[{row['id']}] {row['published_at'] or 'no date'} | {row['source_name']} | accepted={accepted}")
print(f" {row['title']}")
print(
" scores: "
f"constructive={row['constructive_score']} "
f"cortisol={row['cortisol_score']} "
f"ragebait={row['ragebait_score']} "
f"reason={row['reason_code']}"
)
print(f" {row['canonical_url']}")
def llm_client_from_args(args: argparse.Namespace) -> LocalModelClient:
client = LocalModelClient.from_env()
if getattr(args, "base_url", None):
client.base_url = args.base_url.rstrip("/")
if getattr(args, "model", None):
client.model = args.model
return client
def list_sources(conn: sqlite3.Connection, active_only: bool) -> None:
where = "WHERE active = 1" if active_only else ""
rows = conn.execute(
f"""
SELECT id, name, active, default_category, trust_score, pr_risk_score, feed_url
FROM sources
{where}
ORDER BY name
"""
).fetchall()
for row in rows:
state = "active" if row["active"] else "inactive"
print(
f"[{row['id']}] {row['name']} ({state}, {row['default_category']}, "
f"trust={row['trust_score']}, pr={row['pr_risk_score']})"
)
print(f" {row['feed_url']}")
def source_report(conn: sqlite3.Connection) -> None:
rows = conn.execute(
"""
SELECT
src.name,
src.default_category,
src.trust_score,
src.pr_risk_score AS source_pr_risk,
COUNT(a.id) AS articles,
SUM(CASE WHEN s.accepted = 1 THEN 1 ELSE 0 END) AS accepted,
ROUND(AVG(s.constructive_score), 1) AS avg_constructive,
ROUND(AVG(s.cortisol_score), 1) AS avg_cortisol,
ROUND(AVG(s.ragebait_score), 1) AS avg_ragebait,
MAX(a.published_at) AS newest_article
FROM sources src
LEFT JOIN articles a ON a.source_id = src.id
LEFT JOIN article_scores s ON s.article_id = a.id
GROUP BY src.id
ORDER BY accepted DESC, articles DESC, src.name
"""
).fetchall()
for row in rows:
articles = row["articles"] or 0
accepted = row["accepted"] or 0
rate = (accepted / articles * 100) if articles else 0
print(
f"{row['name']} | {row['default_category']} | "
f"articles={articles} accepted={accepted} ({rate:.1f}%)"
)
print(
f" trust={row['trust_score']} pr={row['source_pr_risk']} "
f"avg_constructive={row['avg_constructive']} "
f"avg_cortisol={row['avg_cortisol']} "
f"avg_ragebait={row['avg_ragebait']}"
)
print(f" newest={row['newest_article'] or 'none'}")
def list_runs(conn: sqlite3.Connection, limit: int) -> None:
rows = conn.execute(
"""
SELECT r.id, r.started_at, r.finished_at, r.status, src.name AS source_name,
r.items_seen, r.items_inserted, r.items_duplicate, r.error
FROM ingest_runs r
LEFT JOIN sources src ON src.id = r.source_id
ORDER BY r.id DESC
LIMIT ?
""",
(limit,),
).fetchall()
for row in rows:
print(
f"[{row['id']}] {row['status']} | {row['source_name'] or 'unknown'} | "
f"seen={row['items_seen']} inserted={row['items_inserted']} duplicate={row['items_duplicate']}"
)
if row["error"]:
print(f" error: {row['error']}")
def rescore_articles(conn: sqlite3.Connection) -> int:
rows = conn.execute(
"""
SELECT a.id, a.title, a.description, src.pr_risk_score
FROM articles a
JOIN sources src ON src.id = a.source_id
ORDER BY a.id
"""
).fetchall()
for row in rows:
scores = score_article(row["title"], row["description"], int(row["pr_risk_score"]))
conn.execute(
"""
INSERT INTO article_scores (
article_id, constructive_score, cortisol_score, ragebait_score,
agency_score, human_benefit_score, novelty_score, pr_risk_score,
accepted, reason_code, reason_text, model_name, scored_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
ON CONFLICT(article_id) DO UPDATE SET
constructive_score = excluded.constructive_score,
cortisol_score = excluded.cortisol_score,
ragebait_score = excluded.ragebait_score,
agency_score = excluded.agency_score,
human_benefit_score = excluded.human_benefit_score,
novelty_score = excluded.novelty_score,
pr_risk_score = excluded.pr_risk_score,
accepted = excluded.accepted,
reason_code = excluded.reason_code,
reason_text = excluded.reason_text,
model_name = excluded.model_name,
scored_at = CURRENT_TIMESTAMP
""",
(
row["id"],
scores["constructive_score"],
scores["cortisol_score"],
scores["ragebait_score"],
scores["agency_score"],
scores["human_benefit_score"],
scores["novelty_score"],
scores["pr_risk_score"],
scores["accepted"],
scores["reason_code"],
scores["reason_text"],
scores["model_name"],
),
)
conn.commit()
return len(rows)
def print_brief(rows: list[sqlite3.Row]) -> None:
if not rows:
print("No brief items found.")
return
date = rows[0]["brief_date"]
print(f"Five Good Things Today - {date}")
for row in rows:
print(f"{row['rank']}. {row['title']}")
print(f" {row['source_name']} | {row['default_category']} | {row['model_name']}")
print(f" reason: {row['reason_code']}")
print(f" {row['canonical_url']}")
def _format_result(result: dict) -> str:
if "sources" in result:
return (
f"Polled {result['sources']} sources: seen={result['seen']} "
f"inserted={result['inserted']} duplicate={result['duplicate']} failed={result['failed']}"
)
if result.get("status") == "failed":
return (
f"Poll failed: seen={result['seen']} inserted={result['inserted']} "
f"duplicate={result['duplicate']} error={result['error']}"
)
return (
f"Poll ok: seen={result['seen']} inserted={result['inserted']} "
f"duplicate={result['duplicate']}"
)
if __name__ == "__main__":
main()
+105
View File
@@ -0,0 +1,105 @@
from __future__ import annotations
import sqlite3
from pathlib import Path
SCHEMA = """
PRAGMA foreign_keys = ON;
CREATE TABLE IF NOT EXISTS sources (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
homepage_url TEXT,
feed_url TEXT NOT NULL UNIQUE,
source_type TEXT NOT NULL DEFAULT 'rss',
default_category TEXT,
trust_score INTEGER NOT NULL DEFAULT 5,
pr_risk_score INTEGER NOT NULL DEFAULT 3,
active INTEGER NOT NULL DEFAULT 1,
poll_interval_minutes INTEGER NOT NULL DEFAULT 60,
notes TEXT,
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS articles (
id INTEGER PRIMARY KEY AUTOINCREMENT,
source_id INTEGER NOT NULL REFERENCES sources(id) ON DELETE CASCADE,
canonical_url TEXT NOT NULL,
title TEXT NOT NULL,
description TEXT,
author TEXT,
published_at TEXT,
discovered_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
image_url TEXT,
language TEXT,
raw_guid TEXT,
url_hash TEXT NOT NULL UNIQUE,
title_hash TEXT,
FOREIGN KEY (source_id) REFERENCES sources(id)
);
CREATE INDEX IF NOT EXISTS idx_articles_published_at ON articles(published_at);
CREATE INDEX IF NOT EXISTS idx_articles_source_id ON articles(source_id);
CREATE INDEX IF NOT EXISTS idx_articles_title_hash ON articles(title_hash);
CREATE TABLE IF NOT EXISTS article_scores (
article_id INTEGER PRIMARY KEY REFERENCES articles(id) ON DELETE CASCADE,
constructive_score INTEGER,
cortisol_score INTEGER,
ragebait_score INTEGER,
agency_score INTEGER,
human_benefit_score INTEGER,
novelty_score INTEGER,
pr_risk_score INTEGER,
accepted INTEGER,
reason_code TEXT,
reason_text TEXT,
model_name TEXT,
scored_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS ingest_runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
source_id INTEGER REFERENCES sources(id) ON DELETE SET NULL,
started_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
finished_at TEXT,
status TEXT NOT NULL DEFAULT 'running',
items_seen INTEGER NOT NULL DEFAULT 0,
items_inserted INTEGER NOT NULL DEFAULT 0,
items_duplicate INTEGER NOT NULL DEFAULT 0,
error TEXT
);
CREATE TABLE IF NOT EXISTS daily_briefs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
brief_date TEXT NOT NULL UNIQUE,
title TEXT NOT NULL,
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
notes TEXT
);
CREATE TABLE IF NOT EXISTS daily_brief_items (
brief_id INTEGER NOT NULL REFERENCES daily_briefs(id) ON DELETE CASCADE,
article_id INTEGER NOT NULL REFERENCES articles(id) ON DELETE CASCADE,
rank INTEGER NOT NULL,
selection_reason TEXT,
PRIMARY KEY (brief_id, article_id),
UNIQUE (brief_id, rank)
);
"""
def connect(db_path: Path | str) -> sqlite3.Connection:
path = Path(db_path)
path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(path)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA foreign_keys = ON")
return conn
def init_db(conn: sqlite3.Connection) -> None:
conn.executescript(SCHEMA)
conn.commit()
+324
View File
@@ -0,0 +1,324 @@
from __future__ import annotations
import email.utils
import sqlite3
import urllib.error
import urllib.request
import xml.etree.ElementTree as ET
from dataclasses import dataclass
from datetime import UTC, datetime
from .scoring import score_article
from .text import canonicalize_url, clean_text, sha256_text
USER_AGENT = "goodNews/0.1 (+local constructive news prototype)"
@dataclass
class FeedItem:
title: str
url: str
description: str | None = None
author: str | None = None
published_at: str | None = None
image_url: str | None = None
language: str | None = None
raw_guid: str | None = None
def poll_all_sources(conn: sqlite3.Connection, limit: int | None = None) -> dict:
query = """
SELECT *
FROM sources
WHERE active = 1
ORDER BY id
"""
rows = conn.execute(query).fetchall()
if limit is not None:
rows = rows[:limit]
totals = {"sources": 0, "seen": 0, "inserted": 0, "duplicate": 0, "failed": 0}
for source in rows:
result = poll_source(conn, source)
totals["sources"] += 1
totals["seen"] += result["seen"]
totals["inserted"] += result["inserted"]
totals["duplicate"] += result["duplicate"]
totals["failed"] += 1 if result["status"] == "failed" else 0
return totals
def poll_source(conn: sqlite3.Connection, source: sqlite3.Row) -> dict:
run_id = conn.execute(
"INSERT INTO ingest_runs (source_id) VALUES (?)",
(source["id"],),
).lastrowid
conn.commit()
seen = inserted = duplicate = 0
try:
xml = fetch_feed(source["feed_url"])
items = parse_feed(xml)
seen = len(items)
for item in items:
inserted_now = insert_article(conn, source, item)
if inserted_now:
inserted += 1
else:
duplicate += 1
conn.execute(
"""
UPDATE ingest_runs
SET finished_at = CURRENT_TIMESTAMP,
status = 'ok',
items_seen = ?,
items_inserted = ?,
items_duplicate = ?
WHERE id = ?
""",
(seen, inserted, duplicate, run_id),
)
conn.commit()
return {"status": "ok", "seen": seen, "inserted": inserted, "duplicate": duplicate}
except Exception as exc:
conn.execute(
"""
UPDATE ingest_runs
SET finished_at = CURRENT_TIMESTAMP,
status = 'failed',
items_seen = ?,
items_inserted = ?,
items_duplicate = ?,
error = ?
WHERE id = ?
""",
(seen, inserted, duplicate, str(exc), run_id),
)
conn.commit()
return {
"status": "failed",
"seen": seen,
"inserted": inserted,
"duplicate": duplicate,
"error": str(exc),
}
def fetch_feed(url: str, timeout: int = 20) -> bytes:
request = urllib.request.Request(url, headers={"User-Agent": USER_AGENT})
try:
with urllib.request.urlopen(request, timeout=timeout) as response:
return response.read()
except urllib.error.HTTPError as exc:
raise RuntimeError(f"HTTP {exc.code} fetching {url}") from exc
except urllib.error.URLError as exc:
raise RuntimeError(f"failed fetching {url}: {exc.reason}") from exc
def parse_feed(xml: bytes) -> list[FeedItem]:
root = ET.fromstring(xml)
root_name = _local_name(root.tag)
if root_name == "feed":
return _parse_atom(root)
return _parse_rss(root)
def insert_article(conn: sqlite3.Connection, source: sqlite3.Row, item: FeedItem) -> bool:
canonical_url = canonicalize_url(item.url)
if not canonical_url or not item.title:
return False
title = clean_text(item.title, max_len=500)
description = clean_text(item.description, max_len=1000)
if not title:
return False
url_hash = sha256_text(canonical_url)
title_hash = sha256_text(title)
try:
cursor = conn.execute(
"""
INSERT INTO articles (
source_id, canonical_url, title, description, author,
published_at, image_url, language, raw_guid, url_hash, title_hash
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
source["id"],
canonical_url,
title,
description,
clean_text(item.author, max_len=250),
item.published_at,
canonicalize_url(item.image_url),
item.language,
item.raw_guid,
url_hash,
title_hash,
),
)
except sqlite3.IntegrityError:
return False
scores = score_article(title, description, int(source["pr_risk_score"]))
conn.execute(
"""
INSERT INTO article_scores (
article_id, constructive_score, cortisol_score, ragebait_score,
agency_score, human_benefit_score, novelty_score, pr_risk_score,
accepted, reason_code, reason_text, model_name
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
cursor.lastrowid,
scores["constructive_score"],
scores["cortisol_score"],
scores["ragebait_score"],
scores["agency_score"],
scores["human_benefit_score"],
scores["novelty_score"],
scores["pr_risk_score"],
scores["accepted"],
scores["reason_code"],
scores["reason_text"],
scores["model_name"],
),
)
conn.commit()
return True
def _parse_rss(root: ET.Element) -> list[FeedItem]:
channel = _first_child(root, "channel") or root
language = _first_text(channel, "language")
items = [element for element in root.iter() if _local_name(element.tag) == "item"]
parsed = []
for item in items:
title = _first_text(item, "title")
link = _first_text(item, "link")
guid = _first_text(item, "guid")
url = link or guid
if not title or not url:
continue
parsed.append(
FeedItem(
title=title,
url=url,
description=_first_text(item, "description", "summary", "encoded"),
author=_first_text(item, "author", "creator"),
published_at=_parse_date(_first_text(item, "pubDate", "published", "updated", "date")),
image_url=_find_image_url(item),
language=language,
raw_guid=guid,
)
)
return parsed
def _parse_atom(root: ET.Element) -> list[FeedItem]:
language = root.attrib.get("{http://www.w3.org/XML/1998/namespace}lang")
entries = [element for element in root if _local_name(element.tag) == "entry"]
parsed = []
for entry in entries:
title = _first_text(entry, "title")
url = _atom_link(entry)
if not title or not url:
continue
author = None
author_el = _first_child(entry, "author")
if author_el is not None:
author = _first_text(author_el, "name") or _text(author_el)
parsed.append(
FeedItem(
title=title,
url=url,
description=_first_text(entry, "summary", "content"),
author=author,
published_at=_parse_date(_first_text(entry, "published", "updated")),
image_url=_find_image_url(entry),
language=language,
raw_guid=_first_text(entry, "id"),
)
)
return parsed
def _atom_link(entry: ET.Element) -> str | None:
fallback = None
for child in entry:
if _local_name(child.tag) != "link":
continue
href = child.attrib.get("href")
if not href:
continue
if child.attrib.get("rel", "alternate") == "alternate":
return href
fallback = fallback or href
return fallback
def _find_image_url(element: ET.Element) -> str | None:
for child in element.iter():
name = _local_name(child.tag)
if name in {"thumbnail", "content"} and child.attrib.get("url"):
if child.attrib.get("medium") in {None, "image"}:
return child.attrib["url"]
if name == "enclosure" and child.attrib.get("url"):
mime = child.attrib.get("type", "")
if mime.startswith("image/"):
return child.attrib["url"]
return None
def _parse_date(value: str | None) -> str | None:
if not value:
return None
value = value.strip()
try:
parsed = email.utils.parsedate_to_datetime(value)
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=UTC)
return parsed.astimezone(UTC).isoformat()
except (TypeError, ValueError):
pass
try:
parsed = datetime.fromisoformat(value.replace("Z", "+00:00"))
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=UTC)
return parsed.astimezone(UTC).isoformat()
except ValueError:
return None
def _first_child(element: ET.Element, name: str) -> ET.Element | None:
for child in element:
if _local_name(child.tag) == name:
return child
return None
def _first_text(element: ET.Element, *names: str) -> str | None:
for child in element:
if _local_name(child.tag) in names:
value = _text(child)
if value:
return value
return None
def _text(element: ET.Element) -> str | None:
if element.text:
return element.text.strip()
return None
def _local_name(tag: str) -> str:
if "}" in tag:
return tag.rsplit("}", 1)[1]
return tag
+265
View File
@@ -0,0 +1,265 @@
from __future__ import annotations
import json
import os
import sqlite3
import urllib.error
import urllib.request
from dataclasses import dataclass
DEFAULT_BASE_URL = "http://127.0.0.1:1234/v1"
DEFAULT_MODEL = "gpt-oss"
SYSTEM_PROMPT = """You classify article metadata for a calm constructive-news digest.
Judge emotional aftertaste, not simple positivity. Accept stories that leave a reader informed without feeling drained, especially when they include repair, progress, agency, resilience, human benefit, scientific discovery, environmental improvement, community action, or useful perspective.
Reject stories centered on fear, outrage, partisan conflict, crime, tragedy, disaster repetition, celebrity drama, market panic, or corporate PR without clear public benefit.
Return only JSON with this exact shape:
{
"constructive_score": 0,
"cortisol_score": 0,
"ragebait_score": 0,
"agency_score": 0,
"human_benefit_score": 0,
"novelty_score": 0,
"pr_risk_score": 0,
"accepted": false,
"reason_code": "short_snake_case",
"reason_text": "one concise sentence"
}
"""
@dataclass
class LocalModelClient:
base_url: str
model: str
api_key: str | None = None
timeout: int = 90
@classmethod
def from_env(cls) -> "LocalModelClient":
return cls(
base_url=os.environ.get("GOODNEWS_LLM_BASE_URL", DEFAULT_BASE_URL).rstrip("/"),
model=os.environ.get("GOODNEWS_LLM_MODEL", DEFAULT_MODEL),
api_key=os.environ.get("GOODNEWS_LLM_API_KEY"),
)
def classify(self, article: sqlite3.Row) -> dict:
payload = {
"model": self.model,
"temperature": 0.1,
"messages": [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": _article_prompt(article)},
],
"response_format": {"type": "json_object"},
}
try:
return self._chat(payload)
except RuntimeError as exc:
if "HTTP 400" not in str(exc):
raise
payload.pop("response_format", None)
return self._chat(payload)
def list_models(self) -> list[str]:
headers = {}
if self.api_key:
headers["Authorization"] = f"Bearer {self.api_key}"
request = urllib.request.Request(f"{self.base_url}/models", headers=headers)
try:
with urllib.request.urlopen(request, timeout=10) as response:
data = json.loads(response.read().decode("utf-8"))
except urllib.error.HTTPError as exc:
detail = exc.read().decode("utf-8", errors="replace")
raise RuntimeError(f"HTTP {exc.code} from local model: {detail}") from exc
except urllib.error.URLError as exc:
raise RuntimeError(f"could not reach local model at {self.base_url}: {exc.reason}") from exc
models = data.get("data", [])
names = []
for model in models:
if isinstance(model, dict) and model.get("id"):
names.append(str(model["id"]))
return names
def _chat(self, payload: dict) -> dict:
body = json.dumps(payload).encode("utf-8")
headers = {"Content-Type": "application/json"}
if self.api_key:
headers["Authorization"] = f"Bearer {self.api_key}"
request = urllib.request.Request(
f"{self.base_url}/chat/completions",
data=body,
headers=headers,
method="POST",
)
try:
with urllib.request.urlopen(request, timeout=self.timeout) as response:
data = json.loads(response.read().decode("utf-8"))
except urllib.error.HTTPError as exc:
detail = exc.read().decode("utf-8", errors="replace")
raise RuntimeError(f"HTTP {exc.code} from local model: {detail}") from exc
except urllib.error.URLError as exc:
raise RuntimeError(f"could not reach local model at {self.base_url}: {exc.reason}") from exc
try:
content = data["choices"][0]["message"]["content"]
except (KeyError, IndexError, TypeError) as exc:
raise RuntimeError(f"unexpected local model response: {data}") from exc
return parse_classifier_json(content)
def classify_articles(
conn: sqlite3.Connection,
client: LocalModelClient,
limit: int,
include_rejected: bool = False,
dry_run: bool = False,
) -> list[tuple[int, dict]]:
rows = _classification_candidates(conn, limit=limit, include_rejected=include_rejected)
results = []
for row in rows:
scores = client.classify(row)
scores = normalize_scores(scores, model_name=client.model)
results.append((row["id"], scores))
if not dry_run:
upsert_article_score(conn, row["id"], scores)
if not dry_run:
conn.commit()
return results
def parse_classifier_json(content: str) -> dict:
content = content.strip()
try:
return json.loads(content)
except json.JSONDecodeError:
start = content.find("{")
end = content.rfind("}")
if start == -1 or end == -1 or end <= start:
raise RuntimeError(f"model did not return JSON: {content}")
return json.loads(content[start : end + 1])
def normalize_scores(data: dict, model_name: str) -> dict:
return {
"constructive_score": _bounded_int(data.get("constructive_score")),
"cortisol_score": _bounded_int(data.get("cortisol_score")),
"ragebait_score": _bounded_int(data.get("ragebait_score")),
"agency_score": _bounded_int(data.get("agency_score")),
"human_benefit_score": _bounded_int(data.get("human_benefit_score")),
"novelty_score": _bounded_int(data.get("novelty_score")),
"pr_risk_score": _bounded_int(data.get("pr_risk_score")),
"accepted": 1 if bool(data.get("accepted")) else 0,
"reason_code": str(data.get("reason_code") or "model_no_reason")[:120],
"reason_text": str(data.get("reason_text") or "")[:1000],
"model_name": model_name,
}
def upsert_article_score(conn: sqlite3.Connection, article_id: int, scores: dict) -> None:
conn.execute(
"""
INSERT INTO article_scores (
article_id, constructive_score, cortisol_score, ragebait_score,
agency_score, human_benefit_score, novelty_score, pr_risk_score,
accepted, reason_code, reason_text, model_name, scored_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
ON CONFLICT(article_id) DO UPDATE SET
constructive_score = excluded.constructive_score,
cortisol_score = excluded.cortisol_score,
ragebait_score = excluded.ragebait_score,
agency_score = excluded.agency_score,
human_benefit_score = excluded.human_benefit_score,
novelty_score = excluded.novelty_score,
pr_risk_score = excluded.pr_risk_score,
accepted = excluded.accepted,
reason_code = excluded.reason_code,
reason_text = excluded.reason_text,
model_name = excluded.model_name,
scored_at = CURRENT_TIMESTAMP
""",
(
article_id,
scores["constructive_score"],
scores["cortisol_score"],
scores["ragebait_score"],
scores["agency_score"],
scores["human_benefit_score"],
scores["novelty_score"],
scores["pr_risk_score"],
scores["accepted"],
scores["reason_code"],
scores["reason_text"],
scores["model_name"],
),
)
def _classification_candidates(
conn: sqlite3.Connection,
limit: int,
include_rejected: bool,
) -> list[sqlite3.Row]:
where = "" if include_rejected else "WHERE s.accepted = 1 OR s.constructive_score >= 4"
return conn.execute(
f"""
SELECT
a.id,
a.title,
a.description,
a.published_at,
a.canonical_url,
src.name AS source_name,
src.default_category,
src.trust_score AS source_trust_score,
src.pr_risk_score AS source_pr_risk_score,
s.constructive_score,
s.cortisol_score,
s.ragebait_score,
s.agency_score,
s.human_benefit_score,
s.pr_risk_score,
s.accepted,
s.reason_code
FROM articles a
JOIN sources src ON src.id = a.source_id
LEFT JOIN article_scores s ON s.article_id = a.id
{where}
ORDER BY
CASE WHEN s.model_name LIKE 'heuristic-%' THEN 0 ELSE 1 END,
COALESCE(a.published_at, a.discovered_at) DESC
LIMIT ?
""",
(limit,),
).fetchall()
def _article_prompt(article: sqlite3.Row) -> str:
return "\n".join(
[
f"Source: {article['source_name']}",
f"Source category: {article['default_category'] or 'unknown'}",
f"Source trust score: {article['source_trust_score']}/10",
f"Source PR risk score: {article['source_pr_risk_score']}/10",
f"Published: {article['published_at'] or 'unknown'}",
f"Title: {article['title']}",
f"Snippet: {article['description'] or ''}",
f"URL: {article['canonical_url']}",
]
)
def _bounded_int(value: object) -> int:
try:
parsed = int(value)
except (TypeError, ValueError):
parsed = 0
return max(0, min(10, parsed))
+169
View File
@@ -0,0 +1,169 @@
from __future__ import annotations
import re
POSITIVE_TERMS = {
"breakthrough",
"progress",
"improve",
"improves",
"improved",
"solution",
"solutions",
"recovery",
"restore",
"restores",
"rescued",
"rescue",
"volunteer",
"community",
"donate",
"donation",
"cure",
"treatment",
"therapy",
"clean energy",
"renewable",
"conservation",
"protect",
"protects",
"restoration",
"kindness",
"hope",
"first",
"record",
}
AGENCY_TERMS = {
"how",
"helps",
"helping",
"protect",
"protects",
"builds",
"creates",
"launches",
"teaches",
"learn",
"guide",
"tool",
"program",
"initiative",
"effort",
"plan",
"rebuild",
}
CORTISOL_TERMS = {
"war",
"killed",
"dead",
"death",
"murder",
"shooting",
"attack",
"crisis",
"catastrophe",
"disaster",
"collapse",
"panic",
"warning",
"threat",
"fear",
"fears",
"lawsuit",
"scandal",
}
RAGEBAIT_TERMS = {
"slams",
"blasts",
"furious",
"outrage",
"rage",
"shocking",
"you won't believe",
"sparks backlash",
"destroyed",
"humiliates",
}
PR_TERMS = {
"announces",
"unveils",
"funding round",
"raises",
"partnership",
"brand",
"sponsored",
"press release",
}
WORD_RE = re.compile(r"[a-z0-9']+")
def _count_terms(text: str, terms: set[str]) -> int:
lowered = text.lower()
words = set(WORD_RE.findall(lowered))
count = 0
for term in terms:
if " " in term:
count += 1 if term in lowered else 0
elif term in words:
count += 1
return count
def score_article(title: str, description: str | None, source_pr_risk: int) -> dict:
text = f"{title}. {description or ''}"
positive = _count_terms(text, POSITIVE_TERMS)
agency = _count_terms(text, AGENCY_TERMS)
cortisol = _count_terms(text, CORTISOL_TERMS)
ragebait = _count_terms(text, RAGEBAIT_TERMS)
pr_terms = _count_terms(text, PR_TERMS)
constructive_score = min(10, 2 + positive * 2 + agency)
agency_score = min(10, 1 + agency * 2)
cortisol_score = min(10, cortisol * 3)
ragebait_score = min(10, ragebait * 4)
pr_risk_score = min(10, source_pr_risk + pr_terms * 2)
human_benefit_score = min(10, positive * 2 + agency)
novelty_score = 5
accepted = (
constructive_score >= 5
and cortisol_score <= 5
and ragebait_score <= 3
and pr_risk_score <= 7
)
if accepted:
reason_code = "heuristic_constructive_candidate"
reason_text = "Constructive or agency-oriented language with low obvious cortisol/ragebait signals."
elif ragebait_score > 3:
reason_code = "heuristic_reject_ragebait_language"
reason_text = "Headline or snippet contains outrage-oriented language."
elif cortisol_score > 5:
reason_code = "heuristic_reject_cortisol_heavy"
reason_text = "Headline or snippet appears tragedy, threat, conflict, or crisis centered."
elif pr_risk_score > 7:
reason_code = "heuristic_reject_pr_risk"
reason_text = "Headline or source has signs of corporate PR framing."
else:
reason_code = "heuristic_needs_review"
reason_text = "Not enough constructive signal for automatic acceptance."
return {
"constructive_score": constructive_score,
"cortisol_score": cortisol_score,
"ragebait_score": ragebait_score,
"agency_score": agency_score,
"human_benefit_score": human_benefit_score,
"novelty_score": novelty_score,
"pr_risk_score": pr_risk_score,
"accepted": 1 if accepted else 0,
"reason_code": reason_code,
"reason_text": reason_text,
"model_name": "heuristic-v0",
}
+55
View File
@@ -0,0 +1,55 @@
from __future__ import annotations
import sqlite3
import tomllib
from pathlib import Path
def load_sources(path: Path | str) -> list[dict]:
data = tomllib.loads(Path(path).read_text(encoding="utf-8"))
sources = data.get("sources", [])
if not isinstance(sources, list):
raise ValueError("sources.toml must contain [[sources]] entries")
return sources
def upsert_sources(conn: sqlite3.Connection, source_defs: list[dict]) -> int:
count = 0
for source in source_defs:
conn.execute(
"""
INSERT INTO sources (
name, homepage_url, feed_url, source_type, default_category,
trust_score, pr_risk_score, active, poll_interval_minutes, notes,
updated_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
ON CONFLICT(feed_url) DO UPDATE SET
name = excluded.name,
homepage_url = excluded.homepage_url,
source_type = excluded.source_type,
default_category = excluded.default_category,
trust_score = excluded.trust_score,
pr_risk_score = excluded.pr_risk_score,
active = excluded.active,
poll_interval_minutes = excluded.poll_interval_minutes,
notes = excluded.notes,
updated_at = CURRENT_TIMESTAMP
""",
(
source["name"],
source.get("homepage_url"),
source["feed_url"],
source.get("source_type", "rss"),
source.get("default_category"),
int(source.get("trust_score", 5)),
int(source.get("pr_risk_score", 3)),
1 if source.get("active", True) else 0,
int(source.get("poll_interval_minutes", 60)),
source.get("notes"),
),
)
count += 1
conn.commit()
return count
+62
View File
@@ -0,0 +1,62 @@
from __future__ import annotations
import hashlib
import html
import re
from urllib.parse import parse_qsl, urlencode, urlsplit, urlunsplit
TAG_RE = re.compile(r"<[^>]+>")
WHITESPACE_RE = re.compile(r"\s+")
TRACKING_PREFIXES = ("utm_",)
TRACKING_PARAMS = {
"fbclid",
"gclid",
"mc_cid",
"mc_eid",
"igshid",
"ref",
}
def clean_text(value: str | None, max_len: int = 1000) -> str | None:
if not value:
return None
text = TAG_RE.sub(" ", value)
text = html.unescape(text)
text = WHITESPACE_RE.sub(" ", text).strip()
if len(text) > max_len:
return text[: max_len - 1].rstrip() + "..."
return text or None
def canonicalize_url(url: str | None) -> str | None:
if not url:
return None
url = html.unescape(url).strip()
if not url:
return None
parts = urlsplit(url)
if parts.scheme not in {"http", "https"} or not parts.netloc:
return None
query = []
for key, value in parse_qsl(parts.query, keep_blank_values=True):
lowered = key.lower()
if lowered in TRACKING_PARAMS or lowered.startswith(TRACKING_PREFIXES):
continue
query.append((key, value))
normalized = parts._replace(
scheme=parts.scheme.lower(),
netloc=parts.netloc.lower(),
query=urlencode(sorted(query), doseq=True),
fragment="",
)
return urlunsplit(normalized)
def sha256_text(value: str | None) -> str:
normalized = (value or "").strip().lower()
return hashlib.sha256(normalized.encode("utf-8")).hexdigest()
+10
View File
@@ -0,0 +1,10 @@
[project]
name = "goodnews"
version = "0.1.0"
description = "Local-first constructive news ingestion and filtering prototype."
requires-python = ">=3.11"
dependencies = []
[project.scripts]
goodnews = "goodnews.cli:main"