ec82764bef
RTL (GS rasterizer, EE core stub, platform bridge, LPDDR4B path), sim regression (272 TBs), docs, and tooling. Copyrighted PS2 content (BIOS, game code, GS dumps, and all dump-derived textures/traces) is excluded via .gitignore and stays local. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
250 lines
13 KiB
Python
250 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""retroDE_ps2 — Ch340 GS-dump parser + GIF/GS decoder (Bricks 1-2).
|
|
|
|
Deterministically decodes a PCSX2 .gs/.gs.xz/.gs.zst dump into a NORMALIZED, versioned event stream
|
|
(container parse -> GIF tag walk -> GS register-write / IMAGE / transfer events). No hidden
|
|
approximation: anything not understood is emitted as an explicit event (MALFORMED / unknown reg /
|
|
IMAGE-not-inlined), never silently dropped or guessed.
|
|
|
|
Container format: see memory/reference_pcsx2_gsdump_format.md (pinned from PCSX2 source + validated
|
|
byte-exact against captures/gs/cubes/cubes_frame.gs.zst).
|
|
|
|
This module is the parser/decoder ONLY. Census/histograms (Brick 3) and the ps2_feeder-scene
|
|
translator (Brick 4) consume `parse_dump()`'s event stream. Raw IMAGE/transfer PAYLOADS are never
|
|
inlined into committable output — only structural facts (sizes, formats, offsets).
|
|
|
|
Usage:
|
|
gs_parse.py <dump.gs[.xz|.zst]> [--summary] [--events N] [--json events.jsonl]
|
|
"""
|
|
import sys, struct, lzma, subprocess, shutil, json
|
|
from dataclasses import dataclass, field, asdict
|
|
|
|
SCHEMA_VERSION = 1
|
|
|
|
# ---- GS register address -> name (A+D + decoded-PACKED targets) ----
|
|
GS_REG = {
|
|
0x00:"PRIM",0x01:"RGBAQ",0x02:"ST",0x03:"UV",0x04:"XYZF2",0x05:"XYZ2",0x06:"TEX0_1",0x07:"TEX0_2",
|
|
0x08:"CLAMP_1",0x09:"CLAMP_2",0x0A:"FOG",0x0C:"XYZF3",0x0D:"XYZ3",0x14:"TEX1_1",0x15:"TEX1_2",
|
|
0x16:"TEX2_1",0x17:"TEX2_2",0x18:"XYOFFSET_1",0x19:"XYOFFSET_2",0x1A:"PRMODECONT",0x1B:"PRMODE",
|
|
0x1C:"TEXCLUT",0x22:"SCANMSK",0x34:"MIPTBP1_1",0x35:"MIPTBP1_2",0x36:"MIPTBP2_1",0x37:"MIPTBP2_2",
|
|
0x3B:"TEXA",0x3D:"FOGCOL",0x3F:"TEXFLUSH",0x40:"SCISSOR_1",0x41:"SCISSOR_2",0x42:"ALPHA_1",
|
|
0x43:"ALPHA_2",0x44:"DIMX",0x45:"DTHE",0x46:"COLCLAMP",0x47:"TEST_1",0x48:"TEST_2",0x49:"PABE",
|
|
0x4A:"FBA_1",0x4B:"FBA_2",0x4C:"FRAME_1",0x4D:"FRAME_2",0x4E:"ZBUF_1",0x4F:"ZBUF_2",0x50:"BITBLTBUF",
|
|
0x51:"TRXPOS",0x52:"TRXREG",0x53:"TRXDIR",0x54:"HWREG",0x60:"SIGNAL",0x61:"FINISH",0x62:"LABEL",
|
|
}
|
|
# PACKED descriptor (REGS nibble) -> ("how to decode", target GS reg addr)
|
|
PACKED_PRIM,PACKED_RGBAQ,PACKED_ST,PACKED_UV = 0x0,0x1,0x2,0x3
|
|
PACKED_XYZF2,PACKED_XYZ2,PACKED_TEX0_1,PACKED_TEX0_2 = 0x4,0x5,0x6,0x7
|
|
PACKED_CLAMP1,PACKED_CLAMP2,PACKED_FOG = 0x8,0x9,0xA
|
|
PACKED_XYZF3,PACKED_XYZ3,PACKED_AD,PACKED_NOP = 0xC,0xD,0xE,0xF
|
|
GST = {0:"Transfer",1:"VSync",2:"ReadFIFO2",3:"Registers"}
|
|
GSPATH = {0:"Path1Old",1:"Path2",2:"Path3",3:"Path1New",4:"Dummy"}
|
|
|
|
@dataclass
|
|
class Event:
|
|
kind: str # GSREG | IMAGE | GIFTAG | FRAME_BOUNDARY | READFIFO | TRANSFER | MALFORMED
|
|
frame: int
|
|
idx: int
|
|
byte_off: int # offset in the DECOMPRESSED .gs of the source byte
|
|
reg: str = "" # for GSREG
|
|
addr: int = -1
|
|
value: int = 0
|
|
info: dict = field(default_factory=dict)
|
|
|
|
# ---------------------------------------------------------------- decompression
|
|
def read_dump_bytes(path):
|
|
if path.endswith(".gs.xz") or path.endswith(".xz"):
|
|
return lzma.open(path, "rb").read()
|
|
if path.endswith(".gs.zst") or path.endswith(".zst"):
|
|
if shutil.which("zstd") is None:
|
|
sys.exit("error: .zst dump but `zstd` not found on PATH")
|
|
return subprocess.run(["zstd","-d","-c",path], capture_output=True, check=True).stdout
|
|
return open(path,"rb").read()
|
|
|
|
# ---------------------------------------------------------------- container parse
|
|
@dataclass
|
|
class Header:
|
|
state_version:int; state_size:int; serial_offset:int; serial_size:int; crc:int
|
|
ss_w:int; ss_h:int; ss_off:int; ss_size:int; header_size:int; serial:str; packet_start:int
|
|
|
|
def parse_header(d):
|
|
if len(d) < 12 or struct.unpack_from("<I",d,0)[0] != 0xFFFFFFFF:
|
|
raise ValueError("not a new-format .gs (missing 0xFFFFFFFF marker)")
|
|
header_size = struct.unpack_from("<I",d,4)[0]
|
|
f = struct.unpack_from("<9I", d, 8)
|
|
h = Header(*f, header_size=header_size, serial="", packet_start=0)
|
|
if header_size < 36:
|
|
raise ValueError(f"header_size {header_size} < 36")
|
|
s0 = 8 + h.serial_offset
|
|
h.serial = d[s0:s0+h.serial_size].decode("latin1","replace")
|
|
h.packet_start = 8 + header_size + h.state_size + 8192
|
|
if h.packet_start > len(d):
|
|
raise ValueError(f"packet_start 0x{h.packet_start:x} past EOF 0x{len(d):x}")
|
|
return h
|
|
|
|
# ---------------------------------------------------------------- GIF tag decode
|
|
def _bits(q, lo, n): # extract n bits at lo from a 128-bit int
|
|
return (q >> lo) & ((1 << n) - 1)
|
|
|
|
def decode_packed_reg(desc, q):
|
|
"""Return (addr, value_64, note) for one PACKED register qword, or (None,0,note) to skip."""
|
|
if desc == PACKED_AD:
|
|
return _bits(q,64,8), q & 0xFFFFFFFFFFFFFFFF, ""
|
|
if desc == PACKED_NOP:
|
|
return None, 0, "nop"
|
|
if desc == PACKED_PRIM:
|
|
return 0x00, _bits(q,0,11), ""
|
|
if desc == PACKED_RGBAQ:
|
|
r=_bits(q,0,8); g=_bits(q,32,8); b=_bits(q,64,8); a=_bits(q,96,8); Q=_bits(q,96,32) if False else _bits(q,96,32)
|
|
# NOTE: Q float lives in [127:96]; we keep R/G/B/A (the 8-bit color the renderer uses).
|
|
Qf=_bits(q,96,32) # not used for color
|
|
return 0x01, (r | (g<<8) | (b<<16) | (a<<24)), ""
|
|
if desc == PACKED_ST:
|
|
return 0x02, (_bits(q,0,32) | (_bits(q,32,32)<<32)), "" # S,T floats (Q -> RGBAQ.Q)
|
|
if desc == PACKED_UV:
|
|
return 0x03, (_bits(q,0,14) | (_bits(q,16,14)<<14)), ""
|
|
if desc == PACKED_XYZ2:
|
|
return 0x05, (_bits(q,0,16) | (_bits(q,32,16)<<16) | (_bits(q,64,32)<<32)), ("adc" if _bits(q,111,1) else "")
|
|
if desc == PACKED_XYZ3:
|
|
return 0x0D, (_bits(q,0,16) | (_bits(q,32,16)<<16) | (_bits(q,64,32)<<32)), ""
|
|
if desc == PACKED_XYZF2:
|
|
return 0x04, (_bits(q,0,16) | (_bits(q,32,16)<<16) | (_bits(q,64,24)<<32) | (_bits(q,100,8)<<56)), ("adc" if _bits(q,111,1) else "")
|
|
if desc == PACKED_XYZF3:
|
|
return 0x0C, (_bits(q,0,16) | (_bits(q,32,16)<<16) | (_bits(q,64,24)<<32) | (_bits(q,100,8)<<56)), ""
|
|
if desc in (PACKED_TEX0_1,PACKED_TEX0_2,PACKED_CLAMP1,PACKED_CLAMP2):
|
|
addr = {PACKED_TEX0_1:0x06,PACKED_TEX0_2:0x07,PACKED_CLAMP1:0x08,PACKED_CLAMP2:0x09}[desc]
|
|
return addr, q & 0xFFFFFFFFFFFFFFFF, ""
|
|
if desc == PACKED_FOG:
|
|
return 0x0A, (_bits(q,100,8) << 56), ""
|
|
return None, 0, f"packed_desc_0x{desc:x}_unhandled"
|
|
|
|
def walk_gif(data, base_off, frame, emit):
|
|
"""Walk the GIF tag chain in `data` (a Transfer payload). emit(Event)."""
|
|
off = 0; n = len(data)
|
|
while off + 16 <= n:
|
|
q = int.from_bytes(data[off:off+16], "little")
|
|
nloop=_bits(q,0,15); eop=_bits(q,15,1); pre=_bits(q,46,1); prim=_bits(q,47,11)
|
|
flg=_bits(q,58,2); nreg=_bits(q,60,4); regs=_bits(q,64,64)
|
|
nregs = nreg if nreg != 0 else 16
|
|
emit(Event("GIFTAG",frame,0,base_off+off,info=dict(nloop=nloop,eop=eop,pre=pre,prim=prim,flg=flg,nreg=nregs)))
|
|
off += 16
|
|
if pre:
|
|
emit(Event("GSREG",frame,0,base_off+off,reg="PRIM",addr=0x00,value=prim,info=dict(via="PRE")))
|
|
if flg == 0: # PACKED: nloop * nregs qwords
|
|
descs = [(regs >> (4*k)) & 0xF for k in range(nregs)]
|
|
need = nloop*nregs*16
|
|
for _ in range(nloop):
|
|
for d in descs:
|
|
if off+16 > n: break
|
|
qq = int.from_bytes(data[off:off+16],"little")
|
|
addr,val,note = decode_packed_reg(d, qq)
|
|
if addr is not None:
|
|
inf = {"note":note} if note else {}
|
|
# Ch342 audit: PACKED ST also carries Q in lane2 [95:64] -> routed to RGBAQ.Q by
|
|
# the GS (the STQ mechanism). PACKED RGBAQ carries NO Q; Q comes from ST. Expose
|
|
# it so consumers reconstruct RGBAQ.Q consistently across PACKED/REGLIST/A+D.
|
|
if addr == 0x02: inf["q_stq"] = _bits(qq, 64, 32)
|
|
emit(Event("GSREG",frame,0,base_off+off,reg=GS_REG.get(addr,f"UNKNOWN_0x{addr:02x}"),
|
|
addr=addr,value=val,info=inf))
|
|
elif note and note!="nop":
|
|
emit(Event("MALFORMED",frame,0,base_off+off,info=dict(reason=note)))
|
|
off += 16
|
|
off = (base_off+0) and off # keep off as is
|
|
# if data ran short, account for it
|
|
if need > n - (off): pass
|
|
elif flg == 1: # REGLIST: nloop * nregs registers, 2 per qword (64-bit each), A+D-less
|
|
descs = [(regs >> (4*k)) & 0xF for k in range(nregs)]
|
|
total = nloop*nregs
|
|
half = 0
|
|
cur = 0
|
|
for i in range(total):
|
|
if half == 0:
|
|
if off+16 > n: break
|
|
qq = int.from_bytes(data[off:off+16],"little"); val = qq & 0xFFFFFFFFFFFFFFFF; cur=qq
|
|
half = 1
|
|
else:
|
|
val = (cur >> 64) & 0xFFFFFFFFFFFFFFFF; half = 0; off += 16
|
|
d = descs[i % nregs]
|
|
if d == PACKED_NOP: continue
|
|
addr = d if d in GS_REG else d
|
|
emit(Event("GSREG",frame,0,base_off+off,reg=GS_REG.get(d,f"UNKNOWN_0x{d:02x}"),addr=d,value=val,
|
|
info=dict(via="REGLIST")))
|
|
if half == 1: off += 16
|
|
elif flg == 2: # IMAGE: nloop qwords of raw data (texture / FB upload) — NOT inlined
|
|
qbytes = nloop*16
|
|
emit(Event("IMAGE",frame,0,base_off+off,info=dict(qwc=nloop,bytes=qbytes)))
|
|
off += qbytes
|
|
else: # flg == 3 disabled
|
|
emit(Event("GIFTAG",frame,0,base_off+off,info=dict(flg=3,note="disabled")))
|
|
if off > n:
|
|
emit(Event("MALFORMED",frame,0,base_off+off,info=dict(reason="gif_payload_overrun")))
|
|
break
|
|
|
|
# ---------------------------------------------------------------- packet stream
|
|
def parse_dump(path):
|
|
d = read_dump_bytes(path)
|
|
h = parse_header(d)
|
|
events = []; frame = 0
|
|
def emit(ev):
|
|
ev.idx = len(events); events.append(ev)
|
|
off = h.packet_start
|
|
while off < len(d):
|
|
tid = d[off]; pkt_off = off; off += 1
|
|
if tid == 0: # Transfer
|
|
if off+5 > len(d): emit(Event("MALFORMED",frame,0,pkt_off,info=dict(reason="trunc_transfer_hdr"))); break
|
|
path_id = d[off]; length = struct.unpack_from("<I",d,off+1)[0]; off += 5
|
|
if off+length > len(d): emit(Event("MALFORMED",frame,0,pkt_off,info=dict(reason="trunc_transfer_data",len=length))); break
|
|
emit(Event("TRANSFER",frame,0,pkt_off,info=dict(path=GSPATH.get(path_id,path_id),length=length)))
|
|
walk_gif(d[off:off+length], off, frame, emit)
|
|
off += length
|
|
elif tid == 1: # VSync (frame boundary)
|
|
if off >= len(d): break
|
|
emit(Event("FRAME_BOUNDARY",frame,0,pkt_off,info=dict(field=d[off]))); off += 1; frame += 1
|
|
elif tid == 2: # ReadFIFO2
|
|
if off+4 > len(d): break
|
|
emit(Event("READFIFO",frame,0,pkt_off,info=dict(qwc=struct.unpack_from("<I",d,off)[0]))); off += 4
|
|
elif tid == 3: # Registers snapshot
|
|
emit(Event("TRANSFER",frame,0,pkt_off,info=dict(regs_snapshot=8192))); off += 8192
|
|
else:
|
|
emit(Event("MALFORMED",frame,0,pkt_off,info=dict(reason=f"bad_packet_id_{tid}"))); break
|
|
return h, events
|
|
|
|
# ---------------------------------------------------------------- CLI / summary
|
|
def main(argv):
|
|
if len(argv) < 2:
|
|
print(__doc__); return 2
|
|
path = argv[1]
|
|
h, events = parse_dump(path)
|
|
print(f"schema v{SCHEMA_VERSION} serial={h.serial!r} crc=0x{h.crc:08x} ss={h.ss_w}x{h.ss_h} "
|
|
f"state=0x{h.state_size:x} packets@0x{h.packet_start:x}")
|
|
# histograms
|
|
kinds={}; regs={}; prims={}; flgs={}; frames=0; images=0; image_bytes=0; malformed=0
|
|
PRIMT={0:"POINT",1:"LINE",2:"LINE_STRIP",3:"TRIANGLE",4:"TRI_STRIP",5:"TRI_FAN",6:"SPRITE",7:"INVALID"}
|
|
for e in events:
|
|
kinds[e.kind]=kinds.get(e.kind,0)+1
|
|
if e.kind=="GSREG": regs[e.reg]=regs.get(e.reg,0)+1
|
|
if e.kind=="FRAME_BOUNDARY": frames+=1
|
|
if e.kind=="MALFORMED": malformed+=1
|
|
if e.kind=="IMAGE": images+=1; image_bytes+=e.info.get("bytes",0)
|
|
if e.kind=="GIFTAG":
|
|
fl=e.info.get("flg"); flgs[fl]=flgs.get(fl,0)+1
|
|
if e.info.get("pre"): prims[PRIMT.get(e.info.get("prim",0)&7,"?")]=prims.get(PRIMT.get(e.info.get("prim",0)&7,"?"),0)+1
|
|
print(f"events={len(events)} frames={frames} images={images} image_bytes={image_bytes} malformed={malformed}")
|
|
print("event kinds:", dict(sorted(kinds.items(),key=lambda x:-x[1])))
|
|
print("GIF flg :", {('PACKED' if k==0 else 'REGLIST' if k==1 else 'IMAGE' if k==2 else 'DISABLE'):v for k,v in sorted(flgs.items())})
|
|
print("PRIM types (via PRE):", dict(sorted(prims.items(),key=lambda x:-x[1])))
|
|
print("top GS regs:", dict(sorted(regs.items(),key=lambda x:-x[1])[:18]))
|
|
if "--events" in argv:
|
|
n=int(argv[argv.index("--events")+1])
|
|
for e in events[:n]:
|
|
print(f" f{e.frame} #{e.idx} @0x{e.byte_off:x} {e.kind} {e.reg} {('0x%x'%e.value) if e.kind=='GSREG' else ''} {e.info}")
|
|
if "--json" in argv:
|
|
outp=argv[argv.index("--json")+1]
|
|
with open(outp,"w") as f:
|
|
for e in events: f.write(json.dumps(asdict(e))+"\n")
|
|
print(f"wrote {len(events)} events -> {outp}")
|
|
return 0
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main(sys.argv))
|