Files
retroDE_ps2/tools/gs_parse.py
T
thejayman77 ec82764bef Initial commit: retroDE_ps2 — first-of-its-kind PS2 GS FPGA core (DE25-Nano / Agilex 5)
RTL (GS rasterizer, EE core stub, platform bridge, LPDDR4B path), sim regression
(272 TBs), docs, and tooling. Copyrighted PS2 content (BIOS, game code, GS dumps,
and all dump-derived textures/traces) is excluded via .gitignore and stays local.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-29 20:10:50 -04:00

250 lines
13 KiB
Python

#!/usr/bin/env python3
"""retroDE_ps2 — Ch340 GS-dump parser + GIF/GS decoder (Bricks 1-2).
Deterministically decodes a PCSX2 .gs/.gs.xz/.gs.zst dump into a NORMALIZED, versioned event stream
(container parse -> GIF tag walk -> GS register-write / IMAGE / transfer events). No hidden
approximation: anything not understood is emitted as an explicit event (MALFORMED / unknown reg /
IMAGE-not-inlined), never silently dropped or guessed.
Container format: see memory/reference_pcsx2_gsdump_format.md (pinned from PCSX2 source + validated
byte-exact against captures/gs/cubes/cubes_frame.gs.zst).
This module is the parser/decoder ONLY. Census/histograms (Brick 3) and the ps2_feeder-scene
translator (Brick 4) consume `parse_dump()`'s event stream. Raw IMAGE/transfer PAYLOADS are never
inlined into committable output — only structural facts (sizes, formats, offsets).
Usage:
gs_parse.py <dump.gs[.xz|.zst]> [--summary] [--events N] [--json events.jsonl]
"""
import sys, struct, lzma, subprocess, shutil, json
from dataclasses import dataclass, field, asdict
SCHEMA_VERSION = 1
# ---- GS register address -> name (A+D + decoded-PACKED targets) ----
GS_REG = {
0x00:"PRIM",0x01:"RGBAQ",0x02:"ST",0x03:"UV",0x04:"XYZF2",0x05:"XYZ2",0x06:"TEX0_1",0x07:"TEX0_2",
0x08:"CLAMP_1",0x09:"CLAMP_2",0x0A:"FOG",0x0C:"XYZF3",0x0D:"XYZ3",0x14:"TEX1_1",0x15:"TEX1_2",
0x16:"TEX2_1",0x17:"TEX2_2",0x18:"XYOFFSET_1",0x19:"XYOFFSET_2",0x1A:"PRMODECONT",0x1B:"PRMODE",
0x1C:"TEXCLUT",0x22:"SCANMSK",0x34:"MIPTBP1_1",0x35:"MIPTBP1_2",0x36:"MIPTBP2_1",0x37:"MIPTBP2_2",
0x3B:"TEXA",0x3D:"FOGCOL",0x3F:"TEXFLUSH",0x40:"SCISSOR_1",0x41:"SCISSOR_2",0x42:"ALPHA_1",
0x43:"ALPHA_2",0x44:"DIMX",0x45:"DTHE",0x46:"COLCLAMP",0x47:"TEST_1",0x48:"TEST_2",0x49:"PABE",
0x4A:"FBA_1",0x4B:"FBA_2",0x4C:"FRAME_1",0x4D:"FRAME_2",0x4E:"ZBUF_1",0x4F:"ZBUF_2",0x50:"BITBLTBUF",
0x51:"TRXPOS",0x52:"TRXREG",0x53:"TRXDIR",0x54:"HWREG",0x60:"SIGNAL",0x61:"FINISH",0x62:"LABEL",
}
# PACKED descriptor (REGS nibble) -> ("how to decode", target GS reg addr)
PACKED_PRIM,PACKED_RGBAQ,PACKED_ST,PACKED_UV = 0x0,0x1,0x2,0x3
PACKED_XYZF2,PACKED_XYZ2,PACKED_TEX0_1,PACKED_TEX0_2 = 0x4,0x5,0x6,0x7
PACKED_CLAMP1,PACKED_CLAMP2,PACKED_FOG = 0x8,0x9,0xA
PACKED_XYZF3,PACKED_XYZ3,PACKED_AD,PACKED_NOP = 0xC,0xD,0xE,0xF
GST = {0:"Transfer",1:"VSync",2:"ReadFIFO2",3:"Registers"}
GSPATH = {0:"Path1Old",1:"Path2",2:"Path3",3:"Path1New",4:"Dummy"}
@dataclass
class Event:
kind: str # GSREG | IMAGE | GIFTAG | FRAME_BOUNDARY | READFIFO | TRANSFER | MALFORMED
frame: int
idx: int
byte_off: int # offset in the DECOMPRESSED .gs of the source byte
reg: str = "" # for GSREG
addr: int = -1
value: int = 0
info: dict = field(default_factory=dict)
# ---------------------------------------------------------------- decompression
def read_dump_bytes(path):
if path.endswith(".gs.xz") or path.endswith(".xz"):
return lzma.open(path, "rb").read()
if path.endswith(".gs.zst") or path.endswith(".zst"):
if shutil.which("zstd") is None:
sys.exit("error: .zst dump but `zstd` not found on PATH")
return subprocess.run(["zstd","-d","-c",path], capture_output=True, check=True).stdout
return open(path,"rb").read()
# ---------------------------------------------------------------- container parse
@dataclass
class Header:
state_version:int; state_size:int; serial_offset:int; serial_size:int; crc:int
ss_w:int; ss_h:int; ss_off:int; ss_size:int; header_size:int; serial:str; packet_start:int
def parse_header(d):
if len(d) < 12 or struct.unpack_from("<I",d,0)[0] != 0xFFFFFFFF:
raise ValueError("not a new-format .gs (missing 0xFFFFFFFF marker)")
header_size = struct.unpack_from("<I",d,4)[0]
f = struct.unpack_from("<9I", d, 8)
h = Header(*f, header_size=header_size, serial="", packet_start=0)
if header_size < 36:
raise ValueError(f"header_size {header_size} < 36")
s0 = 8 + h.serial_offset
h.serial = d[s0:s0+h.serial_size].decode("latin1","replace")
h.packet_start = 8 + header_size + h.state_size + 8192
if h.packet_start > len(d):
raise ValueError(f"packet_start 0x{h.packet_start:x} past EOF 0x{len(d):x}")
return h
# ---------------------------------------------------------------- GIF tag decode
def _bits(q, lo, n): # extract n bits at lo from a 128-bit int
return (q >> lo) & ((1 << n) - 1)
def decode_packed_reg(desc, q):
"""Return (addr, value_64, note) for one PACKED register qword, or (None,0,note) to skip."""
if desc == PACKED_AD:
return _bits(q,64,8), q & 0xFFFFFFFFFFFFFFFF, ""
if desc == PACKED_NOP:
return None, 0, "nop"
if desc == PACKED_PRIM:
return 0x00, _bits(q,0,11), ""
if desc == PACKED_RGBAQ:
r=_bits(q,0,8); g=_bits(q,32,8); b=_bits(q,64,8); a=_bits(q,96,8); Q=_bits(q,96,32) if False else _bits(q,96,32)
# NOTE: Q float lives in [127:96]; we keep R/G/B/A (the 8-bit color the renderer uses).
Qf=_bits(q,96,32) # not used for color
return 0x01, (r | (g<<8) | (b<<16) | (a<<24)), ""
if desc == PACKED_ST:
return 0x02, (_bits(q,0,32) | (_bits(q,32,32)<<32)), "" # S,T floats (Q -> RGBAQ.Q)
if desc == PACKED_UV:
return 0x03, (_bits(q,0,14) | (_bits(q,16,14)<<14)), ""
if desc == PACKED_XYZ2:
return 0x05, (_bits(q,0,16) | (_bits(q,32,16)<<16) | (_bits(q,64,32)<<32)), ("adc" if _bits(q,111,1) else "")
if desc == PACKED_XYZ3:
return 0x0D, (_bits(q,0,16) | (_bits(q,32,16)<<16) | (_bits(q,64,32)<<32)), ""
if desc == PACKED_XYZF2:
return 0x04, (_bits(q,0,16) | (_bits(q,32,16)<<16) | (_bits(q,64,24)<<32) | (_bits(q,100,8)<<56)), ("adc" if _bits(q,111,1) else "")
if desc == PACKED_XYZF3:
return 0x0C, (_bits(q,0,16) | (_bits(q,32,16)<<16) | (_bits(q,64,24)<<32) | (_bits(q,100,8)<<56)), ""
if desc in (PACKED_TEX0_1,PACKED_TEX0_2,PACKED_CLAMP1,PACKED_CLAMP2):
addr = {PACKED_TEX0_1:0x06,PACKED_TEX0_2:0x07,PACKED_CLAMP1:0x08,PACKED_CLAMP2:0x09}[desc]
return addr, q & 0xFFFFFFFFFFFFFFFF, ""
if desc == PACKED_FOG:
return 0x0A, (_bits(q,100,8) << 56), ""
return None, 0, f"packed_desc_0x{desc:x}_unhandled"
def walk_gif(data, base_off, frame, emit):
"""Walk the GIF tag chain in `data` (a Transfer payload). emit(Event)."""
off = 0; n = len(data)
while off + 16 <= n:
q = int.from_bytes(data[off:off+16], "little")
nloop=_bits(q,0,15); eop=_bits(q,15,1); pre=_bits(q,46,1); prim=_bits(q,47,11)
flg=_bits(q,58,2); nreg=_bits(q,60,4); regs=_bits(q,64,64)
nregs = nreg if nreg != 0 else 16
emit(Event("GIFTAG",frame,0,base_off+off,info=dict(nloop=nloop,eop=eop,pre=pre,prim=prim,flg=flg,nreg=nregs)))
off += 16
if pre:
emit(Event("GSREG",frame,0,base_off+off,reg="PRIM",addr=0x00,value=prim,info=dict(via="PRE")))
if flg == 0: # PACKED: nloop * nregs qwords
descs = [(regs >> (4*k)) & 0xF for k in range(nregs)]
need = nloop*nregs*16
for _ in range(nloop):
for d in descs:
if off+16 > n: break
qq = int.from_bytes(data[off:off+16],"little")
addr,val,note = decode_packed_reg(d, qq)
if addr is not None:
inf = {"note":note} if note else {}
# Ch342 audit: PACKED ST also carries Q in lane2 [95:64] -> routed to RGBAQ.Q by
# the GS (the STQ mechanism). PACKED RGBAQ carries NO Q; Q comes from ST. Expose
# it so consumers reconstruct RGBAQ.Q consistently across PACKED/REGLIST/A+D.
if addr == 0x02: inf["q_stq"] = _bits(qq, 64, 32)
emit(Event("GSREG",frame,0,base_off+off,reg=GS_REG.get(addr,f"UNKNOWN_0x{addr:02x}"),
addr=addr,value=val,info=inf))
elif note and note!="nop":
emit(Event("MALFORMED",frame,0,base_off+off,info=dict(reason=note)))
off += 16
off = (base_off+0) and off # keep off as is
# if data ran short, account for it
if need > n - (off): pass
elif flg == 1: # REGLIST: nloop * nregs registers, 2 per qword (64-bit each), A+D-less
descs = [(regs >> (4*k)) & 0xF for k in range(nregs)]
total = nloop*nregs
half = 0
cur = 0
for i in range(total):
if half == 0:
if off+16 > n: break
qq = int.from_bytes(data[off:off+16],"little"); val = qq & 0xFFFFFFFFFFFFFFFF; cur=qq
half = 1
else:
val = (cur >> 64) & 0xFFFFFFFFFFFFFFFF; half = 0; off += 16
d = descs[i % nregs]
if d == PACKED_NOP: continue
addr = d if d in GS_REG else d
emit(Event("GSREG",frame,0,base_off+off,reg=GS_REG.get(d,f"UNKNOWN_0x{d:02x}"),addr=d,value=val,
info=dict(via="REGLIST")))
if half == 1: off += 16
elif flg == 2: # IMAGE: nloop qwords of raw data (texture / FB upload) — NOT inlined
qbytes = nloop*16
emit(Event("IMAGE",frame,0,base_off+off,info=dict(qwc=nloop,bytes=qbytes)))
off += qbytes
else: # flg == 3 disabled
emit(Event("GIFTAG",frame,0,base_off+off,info=dict(flg=3,note="disabled")))
if off > n:
emit(Event("MALFORMED",frame,0,base_off+off,info=dict(reason="gif_payload_overrun")))
break
# ---------------------------------------------------------------- packet stream
def parse_dump(path):
d = read_dump_bytes(path)
h = parse_header(d)
events = []; frame = 0
def emit(ev):
ev.idx = len(events); events.append(ev)
off = h.packet_start
while off < len(d):
tid = d[off]; pkt_off = off; off += 1
if tid == 0: # Transfer
if off+5 > len(d): emit(Event("MALFORMED",frame,0,pkt_off,info=dict(reason="trunc_transfer_hdr"))); break
path_id = d[off]; length = struct.unpack_from("<I",d,off+1)[0]; off += 5
if off+length > len(d): emit(Event("MALFORMED",frame,0,pkt_off,info=dict(reason="trunc_transfer_data",len=length))); break
emit(Event("TRANSFER",frame,0,pkt_off,info=dict(path=GSPATH.get(path_id,path_id),length=length)))
walk_gif(d[off:off+length], off, frame, emit)
off += length
elif tid == 1: # VSync (frame boundary)
if off >= len(d): break
emit(Event("FRAME_BOUNDARY",frame,0,pkt_off,info=dict(field=d[off]))); off += 1; frame += 1
elif tid == 2: # ReadFIFO2
if off+4 > len(d): break
emit(Event("READFIFO",frame,0,pkt_off,info=dict(qwc=struct.unpack_from("<I",d,off)[0]))); off += 4
elif tid == 3: # Registers snapshot
emit(Event("TRANSFER",frame,0,pkt_off,info=dict(regs_snapshot=8192))); off += 8192
else:
emit(Event("MALFORMED",frame,0,pkt_off,info=dict(reason=f"bad_packet_id_{tid}"))); break
return h, events
# ---------------------------------------------------------------- CLI / summary
def main(argv):
if len(argv) < 2:
print(__doc__); return 2
path = argv[1]
h, events = parse_dump(path)
print(f"schema v{SCHEMA_VERSION} serial={h.serial!r} crc=0x{h.crc:08x} ss={h.ss_w}x{h.ss_h} "
f"state=0x{h.state_size:x} packets@0x{h.packet_start:x}")
# histograms
kinds={}; regs={}; prims={}; flgs={}; frames=0; images=0; image_bytes=0; malformed=0
PRIMT={0:"POINT",1:"LINE",2:"LINE_STRIP",3:"TRIANGLE",4:"TRI_STRIP",5:"TRI_FAN",6:"SPRITE",7:"INVALID"}
for e in events:
kinds[e.kind]=kinds.get(e.kind,0)+1
if e.kind=="GSREG": regs[e.reg]=regs.get(e.reg,0)+1
if e.kind=="FRAME_BOUNDARY": frames+=1
if e.kind=="MALFORMED": malformed+=1
if e.kind=="IMAGE": images+=1; image_bytes+=e.info.get("bytes",0)
if e.kind=="GIFTAG":
fl=e.info.get("flg"); flgs[fl]=flgs.get(fl,0)+1
if e.info.get("pre"): prims[PRIMT.get(e.info.get("prim",0)&7,"?")]=prims.get(PRIMT.get(e.info.get("prim",0)&7,"?"),0)+1
print(f"events={len(events)} frames={frames} images={images} image_bytes={image_bytes} malformed={malformed}")
print("event kinds:", dict(sorted(kinds.items(),key=lambda x:-x[1])))
print("GIF flg :", {('PACKED' if k==0 else 'REGLIST' if k==1 else 'IMAGE' if k==2 else 'DISABLE'):v for k,v in sorted(flgs.items())})
print("PRIM types (via PRE):", dict(sorted(prims.items(),key=lambda x:-x[1])))
print("top GS regs:", dict(sorted(regs.items(),key=lambda x:-x[1])[:18]))
if "--events" in argv:
n=int(argv[argv.index("--events")+1])
for e in events[:n]:
print(f" f{e.frame} #{e.idx} @0x{e.byte_off:x} {e.kind} {e.reg} {('0x%x'%e.value) if e.kind=='GSREG' else ''} {e.info}")
if "--json" in argv:
outp=argv[argv.index("--json")+1]
with open(outp,"w") as f:
for e in events: f.write(json.dumps(asdict(e))+"\n")
print(f"wrote {len(events)} events -> {outp}")
return 0
if __name__ == "__main__":
sys.exit(main(sys.argv))