CSV export: defuse formula injection in cells
Per Codex: source-controlled strings (name, feed_url, last_error, review_reason) could be read as formulas by spreadsheet apps if they start with = + - @. Add _csv_cell — prefixes such strings with an apostrophe; numbers pass through untouched (no risk, and avoids mangling negatives). Routed every exported cell through it. Test: a =HYPERLINK(...) source name is escaped, never bare. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
+23
-11
@@ -1014,9 +1014,21 @@ def create_app() -> FastAPI:
|
||||
|
||||
# --- CSV exports (admin-gated, for inspection / archiving) ---------------
|
||||
|
||||
def _csv_cell(v):
|
||||
# Defuse CSV formula injection: a cell a spreadsheet might evaluate (=,+,-,@)
|
||||
# gets a leading apostrophe. Numbers pass through untouched (no risk, and
|
||||
# this avoids mangling any legitimate negative value).
|
||||
if v is None:
|
||||
return ""
|
||||
if isinstance(v, (int, float)):
|
||||
return v
|
||||
s = str(v)
|
||||
return "'" + s if s[:1] in ("=", "+", "-", "@") else s
|
||||
|
||||
def _csv_response(filename: str, write) -> Response:
|
||||
buf = io.StringIO()
|
||||
write(csv.writer(buf))
|
||||
writer = csv.writer(buf)
|
||||
write(lambda values: writer.writerow([_csv_cell(v) for v in values]))
|
||||
return Response(
|
||||
content=buf.getvalue(),
|
||||
media_type="text/csv",
|
||||
@@ -1030,14 +1042,14 @@ def create_app() -> FastAPI:
|
||||
_require_admin(conn, request)
|
||||
rows = queries.source_health(conn)
|
||||
|
||||
def write(w):
|
||||
w.writerow([
|
||||
def write(row):
|
||||
row([
|
||||
"name", "feed_url", "homepage", "status", "visible", "served", "accepted_total",
|
||||
"total_articles", "acceptance_pct", "duplicate_pct", "accepted_dup_pct",
|
||||
"image_coverage_pct", "last_success", "last_error", "retry_after", "review_flag", "review_reason",
|
||||
])
|
||||
for s in rows:
|
||||
w.writerow([
|
||||
row([
|
||||
s["name"], s["feed_url"], s.get("homepage_url") or "",
|
||||
s.get("status") or "", "yes" if s.get("content_visible") else "no",
|
||||
s["served"], s["accepted_total"], s["total_articles"],
|
||||
@@ -1055,9 +1067,9 @@ def create_app() -> FastAPI:
|
||||
_require_admin(conn, request)
|
||||
st = queries.admin_stats(conn, days=days)
|
||||
|
||||
def write(w):
|
||||
def write(row):
|
||||
v, a = st["visitors"], st["accounts"]
|
||||
w.writerow(["metric", "value"])
|
||||
row(["metric", "value"])
|
||||
for label, value in [
|
||||
("window_days", st["days"]),
|
||||
("visitors_today", v["today"]), ("visitors_7d", v["d7"]), ("visitors_30d", v["d30"]),
|
||||
@@ -1065,13 +1077,13 @@ def create_app() -> FastAPI:
|
||||
("accounts_total", a["total"]), ("accounts_new_7d", a["new_7d"]), ("accounts_active_7d", a["active_7d"]),
|
||||
("feedback_7d", st.get("feedback_7d", 0)), ("feedback_unread", st.get("feedback_unread", 0)),
|
||||
]:
|
||||
w.writerow([label, value])
|
||||
row([label, value])
|
||||
for kind, n in (st.get("shares") or {}).items():
|
||||
w.writerow([f"share_{kind}", n])
|
||||
w.writerow([]) # blank line, then the daily time series
|
||||
w.writerow(["date", "visitors", "opens"])
|
||||
row([f"share_{kind}", n])
|
||||
row([]) # blank line, then the daily time series
|
||||
row(["date", "visitors", "opens"])
|
||||
for d in st.get("daily", []):
|
||||
w.writerow([d["day"], d["visits"], d["opens"]])
|
||||
row([d["day"], d["visits"], d["opens"]])
|
||||
|
||||
return _csv_response("audience.csv", write)
|
||||
|
||||
|
||||
@@ -184,3 +184,17 @@ def test_export_audience_csv(tmp_path, monkeypatch):
|
||||
assert "metric,value" in body and "window_days,7" in body
|
||||
assert "date,visitors,opens" in body # daily time-series section
|
||||
assert TestClient(app).get("/api/admin/export/audience.csv").status_code == 401 # gated
|
||||
|
||||
|
||||
def test_export_sources_csv_escapes_formula_injection(tmp_path, monkeypatch):
|
||||
import os, sqlite3
|
||||
app, api = _make(tmp_path, monkeypatch, admin_email="boss@x.com")
|
||||
c = sqlite3.connect(os.environ["GOODNEWS_DB"])
|
||||
c.execute("UPDATE sources SET name = ?, review_flag = 1, review_reason = ? WHERE id = 1",
|
||||
('=HYPERLINK("http://bad")', '+danger'))
|
||||
c.commit(); c.close()
|
||||
tc = _signin(app, api, "boss@x.com")
|
||||
body = tc.get("/api/admin/export/sources.csv").text
|
||||
assert "'=HYPERLINK" in body # leading apostrophe defuses the formula (CSV may quote the cell)
|
||||
assert "'+danger" in body
|
||||
assert ",=HYPERLINK" not in body # never written as a bare, evaluable formula cell
|
||||
|
||||
Reference in New Issue
Block a user