Add interval-aware polling and a 'cycle' command for scheduling
- poll_due_sources(): polls only sources whose last successful poll is older than their poll_interval_minutes (or never polled), finally giving that config field meaning. - classify gains only_unclassified to spend the LLM solely on new (heuristic) articles, so a frequent scheduled run stays cheap. - 'cycle' command runs poll-due -> classify-new -> rebuild-today's-brief, with each step non-fatal so a down model endpoint or empty day never aborts it. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
+46
-1
@@ -3,11 +3,12 @@ from __future__ import annotations
|
||||
import argparse
|
||||
import os
|
||||
import sqlite3
|
||||
from datetime import date
|
||||
from pathlib import Path
|
||||
|
||||
from .briefs import build_daily_brief, show_brief
|
||||
from .db import connect, init_db
|
||||
from .feeds import poll_all_sources, poll_source
|
||||
from .feeds import poll_all_sources, poll_due_sources, poll_source
|
||||
from .llm import LocalModelClient, classify_articles
|
||||
from .scoring import score_article
|
||||
from .sources import load_sources, upsert_sources
|
||||
@@ -59,6 +60,16 @@ def main() -> None:
|
||||
classify_parser.add_argument("--base-url", help="OpenAI-compatible base URL, e.g. http://127.0.0.1:1234/v1")
|
||||
classify_parser.add_argument("--model", help="Local model name")
|
||||
|
||||
cycle_parser = subparsers.add_parser(
|
||||
"cycle", help="Poll due sources, classify new articles, rebuild today's brief (for scheduling)"
|
||||
)
|
||||
cycle_parser.add_argument("--classify-limit", type=int, default=40)
|
||||
cycle_parser.add_argument("--no-classify", action="store_true", help="Skip the LLM classify step")
|
||||
cycle_parser.add_argument("--no-brief", action="store_true", help="Skip rebuilding today's brief")
|
||||
cycle_parser.add_argument("--force", action="store_true", help="Poll all active sources, ignoring intervals")
|
||||
cycle_parser.add_argument("--base-url", help="OpenAI-compatible base URL for classify")
|
||||
cycle_parser.add_argument("--model", help="Local model name for classify")
|
||||
|
||||
check_llm_parser = subparsers.add_parser("check-llm", help="Check local OpenAI-compatible model endpoint")
|
||||
check_llm_parser.add_argument("--base-url", help="OpenAI-compatible base URL, e.g. http://127.0.0.1:1234/v1")
|
||||
check_llm_parser.add_argument("--model", help="Expected local model name")
|
||||
@@ -135,6 +146,8 @@ def main() -> None:
|
||||
print(f" {scores['reason_text']}")
|
||||
if args.dry_run:
|
||||
print("Dry run only; database was not updated.")
|
||||
elif args.command == "cycle":
|
||||
run_cycle(conn, args)
|
||||
elif args.command == "check-llm":
|
||||
client = llm_client_from_args(args)
|
||||
try:
|
||||
@@ -201,6 +214,38 @@ def list_recent(conn: sqlite3.Connection, limit: int, accepted_only: bool) -> No
|
||||
print(f" {row['canonical_url']}")
|
||||
|
||||
|
||||
def run_cycle(conn: sqlite3.Connection, args: argparse.Namespace) -> None:
|
||||
"""One end-to-end pass for a scheduler: poll due sources, classify the new
|
||||
arrivals, rebuild today's brief. Each step is independent and non-fatal so a
|
||||
down model endpoint or empty day never aborts the whole cycle.
|
||||
"""
|
||||
init_db(conn)
|
||||
|
||||
if args.force:
|
||||
poll_result = poll_all_sources(conn)
|
||||
else:
|
||||
poll_result = poll_due_sources(conn)
|
||||
print(f"poll: {_format_result(poll_result)}")
|
||||
|
||||
if not args.no_classify:
|
||||
client = llm_client_from_args(args)
|
||||
try:
|
||||
results = classify_articles(
|
||||
conn, client, limit=args.classify_limit, include_rejected=True, only_unclassified=True
|
||||
)
|
||||
print(f"classify: {len(results)} new article(s) scored by {client.model}")
|
||||
except Exception as exc: # endpoint down, timeout, etc. — keep going
|
||||
print(f"classify: skipped ({exc})")
|
||||
|
||||
if not args.no_brief:
|
||||
today = date.today().isoformat()
|
||||
try:
|
||||
brief_id = build_daily_brief(conn, brief_date=today, limit=5, replace=True)
|
||||
print(f"brief: rebuilt {today} (id {brief_id})")
|
||||
except Exception as exc:
|
||||
print(f"brief: skipped ({exc})")
|
||||
|
||||
|
||||
def serve(args: argparse.Namespace) -> None:
|
||||
try:
|
||||
import uvicorn
|
||||
|
||||
+33
-6
@@ -28,13 +28,40 @@ class FeedItem:
|
||||
|
||||
|
||||
def poll_all_sources(conn: sqlite3.Connection, limit: int | None = None) -> dict:
|
||||
query = """
|
||||
SELECT *
|
||||
FROM sources
|
||||
WHERE active = 1
|
||||
ORDER BY id
|
||||
return _poll_rows(conn, conn.execute(
|
||||
"SELECT * FROM sources WHERE active = 1 ORDER BY id"
|
||||
).fetchall(), limit)
|
||||
|
||||
|
||||
def poll_due_sources(conn: sqlite3.Connection, limit: int | None = None) -> dict:
|
||||
"""Poll only active sources whose last successful poll is older than their
|
||||
poll_interval_minutes (or that have never been polled successfully).
|
||||
|
||||
This is what makes poll_interval_minutes meaningful and lets a scheduler run
|
||||
frequently without re-hitting feeds that are not yet due.
|
||||
"""
|
||||
rows = conn.execute(query).fetchall()
|
||||
rows = conn.execute(
|
||||
"""
|
||||
SELECT s.*
|
||||
FROM sources s
|
||||
WHERE s.active = 1
|
||||
AND (
|
||||
NOT EXISTS (
|
||||
SELECT 1 FROM ingest_runs r
|
||||
WHERE r.source_id = s.id AND r.status = 'ok'
|
||||
)
|
||||
OR (
|
||||
SELECT MAX(r.finished_at) FROM ingest_runs r
|
||||
WHERE r.source_id = s.id AND r.status = 'ok'
|
||||
) <= datetime('now', '-' || s.poll_interval_minutes || ' minutes')
|
||||
)
|
||||
ORDER BY s.id
|
||||
"""
|
||||
).fetchall()
|
||||
return _poll_rows(conn, rows, limit)
|
||||
|
||||
|
||||
def _poll_rows(conn: sqlite3.Connection, rows: list[sqlite3.Row], limit: int | None) -> dict:
|
||||
if limit is not None:
|
||||
rows = rows[:limit]
|
||||
|
||||
|
||||
+13
-2
@@ -200,8 +200,11 @@ def classify_articles(
|
||||
limit: int,
|
||||
include_rejected: bool = False,
|
||||
dry_run: bool = False,
|
||||
only_unclassified: bool = False,
|
||||
) -> list[tuple[int, dict]]:
|
||||
rows = _classification_candidates(conn, limit=limit, include_rejected=include_rejected)
|
||||
rows = _classification_candidates(
|
||||
conn, limit=limit, include_rejected=include_rejected, only_unclassified=only_unclassified
|
||||
)
|
||||
results = []
|
||||
for row in rows:
|
||||
try:
|
||||
@@ -297,8 +300,16 @@ def _classification_candidates(
|
||||
conn: sqlite3.Connection,
|
||||
limit: int,
|
||||
include_rejected: bool,
|
||||
only_unclassified: bool = False,
|
||||
) -> list[sqlite3.Row]:
|
||||
where = "" if include_rejected else "WHERE s.accepted = 1 OR s.constructive_score >= 4"
|
||||
filters = []
|
||||
if not include_rejected:
|
||||
filters.append("(s.accepted = 1 OR s.constructive_score >= 4)")
|
||||
if only_unclassified:
|
||||
# Articles still carrying the fast heuristic score, i.e. not yet judged
|
||||
# by the model. Lets a scheduled cycle only spend the LLM on new items.
|
||||
filters.append("s.model_name LIKE 'heuristic-%'")
|
||||
where = ("WHERE " + " AND ".join(filters)) if filters else ""
|
||||
return conn.execute(
|
||||
f"""
|
||||
SELECT
|
||||
|
||||
Reference in New Issue
Block a user