Make cycle show classify progress and prevent overlapping runs

- cycle now prints per-article classify progress (flushed) so the long step is
  clearly alive rather than appearing hung.
- An exclusive flock guards the cycle so a manual run and the systemd timer (or
  two timer ticks) cannot overlap and contend on the database and model.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
jay
2026-05-30 16:15:03 +00:00
parent b1530e4a4f
commit 470e9ecbf8
2 changed files with 41 additions and 7 deletions
+36 -6
View File
@@ -255,26 +255,56 @@ def check_feeds(conn: sqlite3.Connection, include_inactive: bool = False) -> Non
def run_cycle(conn: sqlite3.Connection, args: argparse.Namespace) -> None:
"""One end-to-end pass for a scheduler: poll due sources, classify the new
arrivals, rebuild today's brief. Each step is independent and non-fatal so a
down model endpoint or empty day never aborts the whole cycle.
arrivals, dedup, rebuild today's brief. Each step is independent and
non-fatal so a down model endpoint or empty day never aborts the cycle.
Holds an exclusive lock so a manual run and the systemd timer (or two timer
ticks) can never overlap and contend on the database and model.
"""
import fcntl
lock_path = Path(args.db).parent / ".goodnews-cycle.lock"
lock_file = open(lock_path, "w")
try:
fcntl.flock(lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB)
except OSError:
print("cycle: another cycle is already running; skipping")
lock_file.close()
return
try:
_run_cycle_locked(conn, args)
finally:
fcntl.flock(lock_file, fcntl.LOCK_UN)
lock_file.close()
def _run_cycle_locked(conn: sqlite3.Connection, args: argparse.Namespace) -> None:
init_db(conn)
if args.force:
poll_result = poll_all_sources(conn)
else:
poll_result = poll_due_sources(conn)
print(f"poll: {_format_result(poll_result)}")
print(f"poll: {_format_result(poll_result)}", flush=True)
if not args.no_classify:
client = llm_client_from_args(args)
def _progress(done: int, total: int, article_id: int) -> None:
print(f" classify {done}/{total} (article {article_id})", flush=True)
try:
results = classify_articles(
conn, client, limit=args.classify_limit, include_rejected=True, only_unclassified=True
conn,
client,
limit=args.classify_limit,
include_rejected=True,
only_unclassified=True,
progress=_progress,
)
print(f"classify: {len(results)} new article(s) scored by {client.model}")
print(f"classify: {len(results)} new article(s) scored by {client.model}", flush=True)
except Exception as exc: # endpoint down, timeout, etc. — keep going
print(f"classify: skipped ({exc})")
print(f"classify: skipped ({exc})", flush=True)
if not args.no_dedup:
try:
+5 -1
View File
@@ -5,6 +5,7 @@ import os
import sqlite3
import urllib.error
import urllib.request
from collections.abc import Callable
from dataclasses import dataclass
from .taxonomy import (
@@ -226,12 +227,13 @@ def classify_articles(
include_rejected: bool = False,
dry_run: bool = False,
only_unclassified: bool = False,
progress: "Callable[[int, int, int], None] | None" = None,
) -> list[tuple[int, dict]]:
rows = _classification_candidates(
conn, limit=limit, include_rejected=include_rejected, only_unclassified=only_unclassified
)
results = []
for row in rows:
for index, row in enumerate(rows, start=1):
try:
scores = client.classify(row)
except RuntimeError as exc:
@@ -244,6 +246,8 @@ def classify_articles(
if not dry_run:
upsert_article_score(conn, row["id"], scores)
conn.commit()
if progress is not None:
progress(index, len(rows), row["id"])
return results