"""News image cache (hardened): SSRF-safe cycle-only fetch + downscale to WebP, reject SVG/non-raster/private-host/redirect-to-private, negative-cache failures, LRU eviction, and the cache-HITS-ONLY /api/img endpoint (restricted to accepted, canonical articles).""" import io import os import pytest from PIL import Image from fastapi.testclient import TestClient from goodnews import newsimg from goodnews.db import connect, init_db # The real SSRF-safe fetch, captured before any fixture stubs it — the SSRF tests below # restore it so they exercise the genuine host/redirect validation. _REAL_SAFE_FETCH = newsimg._safe_fetch class _Resp: """Minimal non-redirect (2xx) response for a fake opener.""" def __init__(self, headers, body=b""): self.status, self.headers, self._b = 200, headers, body def read(self, n=-1): return self._b def close(self): pass def _http_error(url, code, location=None): """A real urllib HTTPError, as _NoRedirect raises for 3xx (and urllib for 4xx/5xx).""" import urllib.error hdrs = {"Location": location} if location else {} return urllib.error.HTTPError(url, code, "x", hdrs, io.BytesIO(b"")) def _fake_opener(handler): class _Op: def open(self, req, timeout=None): return handler(req.full_url) return lambda *a, **k: _Op() def _png(w=1600, h=1000, color=(40, 130, 173)): out = io.BytesIO() Image.new("RGB", (w, h), color).save(out, format="PNG") return out.getvalue() @pytest.fixture def cache(tmp_path, monkeypatch): monkeypatch.setenv("GOODNEWS_IMG_CACHE", str(tmp_path / "img_cache")) # default: treat hosts as public + a happy PNG fetch; individual tests override. monkeypatch.setattr(newsimg, "_host_is_public", lambda h: True) monkeypatch.setattr(newsimg, "_safe_fetch", lambda url, timeout=12: (_png(), "image/png")) return tmp_path / "img_cache" def test_fetch_and_cache_downscales_to_webp(cache): p = newsimg.fetch_and_cache("https://example.com/big.png") assert p and p.exists() and p.suffix == ".webp" with Image.open(p) as im: assert im.width == newsimg.DISPLAY_WIDTH and im.format == "WEBP" assert newsimg.path_for("https://example.com/big.png") == p # pure cache lookup def test_bad_scheme_and_empty_are_rejected(cache): assert newsimg.fetch_and_cache(None) is None assert newsimg.fetch_and_cache("ftp://example.com/x.png") is None assert not list(cache.glob("*.webp")) def test_svg_and_nonimage_rejected_and_negative_cached(cache, monkeypatch): monkeypatch.setattr(newsimg, "_safe_fetch", lambda url, timeout=12: (b"" + b" " * 600, "image/svg+xml")) url = "https://example.com/evil.svg" assert newsimg.fetch_and_cache(url) is None # decoded-raster only → SVG refused assert not list(cache.glob("*.webp")) # nothing stored assert newsimg._failed_recently(url) # negative-cached (won't refetch) def test_private_host_refused_and_negative_cached(cache, monkeypatch): monkeypatch.setattr(newsimg, "_safe_fetch", _REAL_SAFE_FETCH) # exercise real validation monkeypatch.setattr(newsimg, "_host_is_public", lambda h: False) url = "http://169.254.169.254/latest/meta-data/" assert newsimg.fetch_and_cache(url) is None assert newsimg._failed_recently(url) # SSRF target → permanent fail def test_redirect_to_private_is_refused(cache, monkeypatch): monkeypatch.setattr(newsimg, "_safe_fetch", _REAL_SAFE_FETCH) hosts = {"good.example": True, "evil.internal": False} monkeypatch.setattr(newsimg, "_host_is_public", lambda h: hosts.get(h, False)) def handle(url): # real 302 (HTTPError) → private dest if "good.example" in url: raise _http_error(url, 302, "http://evil.internal/x.png") return _Resp({"Content-Type": "image/png"}, _png()) # must never be reached monkeypatch.setattr(newsimg.urllib.request, "build_opener", _fake_opener(handle)) url = "http://good.example/p.png" assert newsimg.fetch_and_cache(url) is None # redirect hop re-validated → refused assert newsimg._failed_recently(url) def test_real_redirect_followed_to_public(cache, monkeypatch): """Regression for the _NoRedirect bug: 3xx arrive as HTTPError, must be followed.""" monkeypatch.setattr(newsimg, "_safe_fetch", _REAL_SAFE_FETCH) monkeypatch.setattr(newsimg, "_host_is_public", lambda h: True) seen = [] def handle(url): seen.append(url) if url == "http://a.example/p.png": raise _http_error(url, 302, "http://b.example/final.png") return _Resp({"Content-Type": "image/png"}, _png()) monkeypatch.setattr(newsimg.urllib.request, "build_opener", _fake_opener(handle)) p = newsimg.fetch_and_cache("http://a.example/p.png") assert p and p.suffix == ".webp" # followed the redirect, cached the dest assert seen == ["http://a.example/p.png", "http://b.example/final.png"] def test_404_is_permanent_but_5xx_is_transient(cache, monkeypatch): monkeypatch.setattr(newsimg, "_safe_fetch", _REAL_SAFE_FETCH) monkeypatch.setattr(newsimg, "_host_is_public", lambda h: True) monkeypatch.setattr(newsimg.urllib.request, "build_opener", _fake_opener(lambda url: (_ for _ in ()).throw(_http_error(url, 404)))) gone = "http://x.example/missing.png" assert newsimg.fetch_and_cache(gone) is None assert newsimg._failed_recently(gone) # 404 → permanent, negative-cached monkeypatch.setattr(newsimg.urllib.request, "build_opener", _fake_opener(lambda url: (_ for _ in ()).throw(_http_error(url, 503)))) down = "http://x.example/down.png" assert newsimg.fetch_and_cache(down) is None assert not newsimg._failed_recently(down) # 5xx → transient, retried next cycle def test_megapixel_ceiling_enforced_before_decode(cache, monkeypatch): monkeypatch.setattr(newsimg, "_MAX_PIXELS", 1000) # a 1600x1000 image (1.6M px) exceeds this monkeypatch.setattr(newsimg, "_safe_fetch", lambda url, timeout=12: (_png(1600, 1000), "image/png")) url = "https://x.example/huge.png" assert newsimg.fetch_and_cache(url) is None # rejected by the explicit w*h check assert not list(cache.glob("*.webp")) assert newsimg._failed_recently(url) def test_transient_failure_is_not_negative_cached(cache, monkeypatch): def boom(url, timeout=12): raise newsimg._FetchError("network down", permanent=False) monkeypatch.setattr(newsimg, "_safe_fetch", boom) url = "https://example.com/x.jpg" assert newsimg.fetch_and_cache(url) is None assert not newsimg._failed_recently(url) # transient → retried next cycle def test_prune_evicts_least_recently_used_over_cap(cache): paths = [newsimg.fetch_and_cache(f"https://example.com/{i}.png") for i in range(5)] for i, p in enumerate(paths): # 0 = oldest/LRU, 4 = newest os.utime(p, (1000 + i, 1000 + i)) sizes = [p.stat().st_size for p in paths] cap = sum(sizes) - sizes[0] - sizes[1] + 1 # room for the 3 newest only r = newsimg.prune(cap) assert r["removed"] == 2 and r["after"] <= cap assert not paths[0].exists() and not paths[1].exists() assert paths[2].exists() and paths[4].exists() def test_display_url_resolves_per_policy(): assert newsimg.display_url(7, "cache", "https://x/p.jpg") == "/api/img/7" # our copy assert newsimg.display_url(7, "remote", "https://x/p.jpg") == "https://x/p.jpg" # hotlink assert newsimg.display_url(7, "none", "https://x/p.jpg") is None # typographic cover assert newsimg.display_url(7, None, "https://x/p.jpg") == "https://x/p.jpg" # default = remote assert newsimg.display_url(7, "cache", None) is None # no image def test_warm_only_caches_cache_policy_sources(cache, monkeypatch): conn = connect(":memory:"); init_db(conn) conn.execute("INSERT INTO sources (id,name,feed_url,image_policy) VALUES (1,'C','http://c/f','cache')") conn.execute("INSERT INTO sources (id,name,feed_url,image_policy) VALUES (2,'R','http://r/f','remote')") # source 1 (cache): two accepted+canonical+image; plus not-accepted + duplicate (skipped) # source 2 (remote): accepted+image but must NOT be cached (re-hosting not cleared) rows = ((1, 1, "https://x/1.jpg", 1, None), (2, 1, "https://x/2.jpg", 1, None), (3, 1, "https://x/3.jpg", 0, None), (4, 1, "https://x/4.jpg", 1, 1), (5, 2, "https://x/5.jpg", 1, None)) for aid, sid, img, acc, dup in rows: conn.execute("INSERT INTO articles (id,source_id,canonical_url,title,url_hash,image_url,duplicate_of) " "VALUES (?,?,?,?,?,?,?)", (aid, sid, f"http://s/{aid}", f"t{aid}", f"h{aid}", img, dup)) conn.execute("INSERT INTO article_scores (article_id, accepted) VALUES (?,?)", (aid, acc)) conn.commit() assert newsimg.warm(conn) == 2 # only source 1's accepted+canonical+image assert newsimg.warm(conn) == 0 # idempotent — already cached def test_purge_source_removes_cached_copies(cache, monkeypatch): conn = connect(":memory:"); init_db(conn) conn.execute("INSERT INTO sources (id,name,feed_url,image_policy) VALUES (1,'C','http://c/f','cache')") conn.execute("INSERT INTO sources (id,name,feed_url,image_policy) VALUES (2,'O','http://o/f','cache')") for aid, sid, img in ((1, 1, "https://x/1.jpg"), (2, 1, "https://x/2.jpg"), (3, 2, "https://x/3.jpg")): conn.execute("INSERT INTO articles (id,source_id,canonical_url,title,url_hash,image_url) " "VALUES (?,?,?,?,?,?)", (aid, sid, f"http://s/{aid}", f"t{aid}", f"h{aid}", img)) conn.execute("INSERT INTO article_scores (article_id, accepted) VALUES (?,1)", (aid,)) conn.commit() assert newsimg.warm(conn) == 3 # all three cached (both sources 'cache') removed = newsimg.purge_source(conn, 1) # source 1 leaves cache → its 2 copies go assert removed == 2 assert newsimg.path_for("https://x/1.jpg") is None and newsimg.path_for("https://x/2.jpg") is None assert newsimg.path_for("https://x/3.jpg") is not None # source 2 untouched @pytest.fixture def client(tmp_path, monkeypatch): monkeypatch.setenv("GOODNEWS_DB", str(tmp_path / "t.sqlite3")) monkeypatch.setenv("GOODNEWS_IMG_CACHE", str(tmp_path / "img_cache")) monkeypatch.setattr(newsimg, "_host_is_public", lambda h: True) monkeypatch.setattr(newsimg, "_safe_fetch", lambda url, timeout=12: (_png(), "image/png")) conn = connect(tmp_path / "t.sqlite3"); init_db(conn) conn.execute("INSERT INTO sources (id,name,feed_url,image_policy) VALUES (1,'C','http://c/f','cache')") conn.execute("INSERT INTO sources (id,name,feed_url,image_policy) VALUES (2,'R','http://r/f','remote')") # src1 (cache): 1=accepted+image, 2=no image, 3=NOT accepted, 4=duplicate. # src2 (remote): 5=accepted+image but NOT cleared to cache → endpoint must 404. rows = ((1, 1, "https://x/pic.jpg", 1, None), (2, 1, "", 1, None), (3, 1, "https://x/p3.jpg", 0, None), (4, 1, "https://x/p4.jpg", 1, 1), (5, 2, "https://x/p5.jpg", 1, None)) for aid, sid, img, acc, dup in rows: conn.execute("INSERT INTO articles (id,source_id,canonical_url,title,url_hash,image_url,duplicate_of) " "VALUES (?,?,?,?,?,?,?)", (aid, sid, f"http://s/{aid}", f"t{aid}", f"h{aid}", img, dup)) conn.execute("INSERT INTO article_scores (article_id, accepted) VALUES (?,?)", (aid, acc)) conn.commit() # the cycle owns fetching — pre-warm so the endpoint has a hit to serve newsimg.warm(conn) conn.close() from goodnews.api import create_app return TestClient(create_app()) def test_img_endpoint_serves_cached_and_restricts(client): r = client.get("/api/img/1") assert r.status_code == 200 and r.headers["content-type"] == "image/webp" assert "immutable" not in r.headers.get("cache-control", "") # no 1-year immutable assert client.get("/api/img/2").status_code == 404 # no image assert client.get("/api/img/3").status_code == 404 # not accepted assert client.get("/api/img/4").status_code == 404 # duplicate (non-canonical) assert client.get("/api/img/5").status_code == 404 # remote-policy source (not cleared to cache) assert client.get("/api/img/999").status_code == 404 # unknown id def test_img_endpoint_never_fetches(client, monkeypatch): # An accepted article whose image was never warmed must 404, not trigger a fetch. called = {"n": 0} monkeypatch.setattr(newsimg, "fetch_and_cache", lambda *a, **k: called.__setitem__("n", called["n"] + 1)) # /api/img/1 is cached (warmed in fixture) → still served without any fetch assert client.get("/api/img/1").status_code == 200 assert called["n"] == 0