From abaaf34f9ab33cedb0ef2e1fa853d20ea38a9d25 Mon Sep 17 00:00:00 2001 From: Joseph <162703152+josephnef@users.noreply.github.com> Date: Sat, 27 Jun 2026 17:24:51 +0300 Subject: [PATCH] =?UTF-8?q?fec:=20fused=20FEC=20=E2=80=94=20sub-block=20in?= =?UTF-8?q?tegrity=20+=20Reed-Solomon=20+=20per-layer=20UEP?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Cross-layer FEC for the video downlink: keep FCS-failed frames (DEVOURER_RX_KEEP_CORRUPTED) and salvage their CRC-valid sub-blocks instead of discarding the whole frame as one erasure. A frame's all-or-nothing FCS becomes fine-grained per-symbol erasures, multiplying the outer code's recovery when corruption is localized. tools/precoder/: - fec_subblock.py — sub-block-integrity (SBI) layer: pack/unpack fixed-size, individually CRC-guarded sub-blocks (one sub-block == one outer-code symbol); stream_id multiplexing; numpy-free so it imports into the GNU Radio env. - stream_fec_rs.py — Reed-Solomon outer scheme (systematic GF(2^8) Vandermonde, Rizzo/zfec construction, poly 0x11d), registered as a third scheme alongside RaptorQ/RLC in stream_fec.py. - svc_uep_fec.py — per-SVC-layer FEC-rate UEP (HEVC NAL -> temporal layer -> RS redundancy); the application-FEC half of cross-layer UEP that complements the MCS ladder in svc_tx.h. - fused_fec_link.py + fused_fec_tx.py/fused_fec_rx.py — chip-path sender/receiver + CLIs; the RX runs baseline and SBI decoders in lockstep and reports the gain. - fec_fusion_sim.py — offline simulation quantifying the SBI gain and sizing the sub-blocks, no hardware. - 22 new unit tests (suite 130/130 green). tests/: - fused_fec_onair.sh — chip<->chip on-air gain harness (8812 TX -> 8821 RX). - sdr_interferer.py — USRP B210 calibrated co-channel AWGN/CW source, a reproducible SNR knob when no attenuator is available. txdemo/stream_tx_demo/main.cpp — DEVOURER_TX_PWR_OVERRIDE (absolute per-rate TXAGC index 0..63) for the marginal-SNR bench setups that exercise the salvage. On-air, real silicon (8812->8821, MCS7, 15 s): +129 RS blocks recovered that the drop-whole-frame baseline lost. Full architecture + results: docs/fused-fec.md. Co-Authored-By: Claude Opus 4.8 --- CLAUDE.md | 15 + docs/fused-fec.md | 236 ++++++++++++++++ tests/fused_fec_onair.sh | 114 ++++++++ tests/sdr_interferer.py | 133 +++++++++ tools/precoder/README.md | 30 ++ tools/precoder/fec_fusion_sim.py | 245 ++++++++++++++++ tools/precoder/fec_subblock.py | 241 ++++++++++++++++ tools/precoder/fused_fec_link.py | 130 +++++++++ tools/precoder/fused_fec_rx.py | 80 ++++++ tools/precoder/fused_fec_tx.py | 74 +++++ tools/precoder/pyproject.toml | 4 +- tools/precoder/stream_fec.py | 11 +- tools/precoder/stream_fec_rs.py | 393 ++++++++++++++++++++++++++ tools/precoder/svc_uep_fec.py | 188 ++++++++++++ tools/precoder/test_fec_fusion_sim.py | 73 +++++ tools/precoder/test_fec_subblock.py | 190 +++++++++++++ tools/precoder/test_fused_fec_link.py | 71 +++++ tools/precoder/test_stream_fec_rs.py | 147 ++++++++++ tools/precoder/test_svc_uep_fec.py | 109 +++++++ txdemo/stream_tx_demo/main.cpp | 14 + 20 files changed, 2495 insertions(+), 3 deletions(-) create mode 100644 docs/fused-fec.md create mode 100644 tests/fused_fec_onair.sh create mode 100644 tests/sdr_interferer.py create mode 100644 tools/precoder/fec_fusion_sim.py create mode 100644 tools/precoder/fec_subblock.py create mode 100644 tools/precoder/fused_fec_link.py create mode 100644 tools/precoder/fused_fec_rx.py create mode 100644 tools/precoder/fused_fec_tx.py create mode 100644 tools/precoder/stream_fec_rs.py create mode 100644 tools/precoder/svc_uep_fec.py create mode 100644 tools/precoder/test_fec_fusion_sim.py create mode 100644 tools/precoder/test_fec_subblock.py create mode 100644 tools/precoder/test_fused_fec_link.py create mode 100644 tools/precoder/test_stream_fec_rs.py create mode 100644 tools/precoder/test_svc_uep_fec.py diff --git a/CLAUDE.md b/CLAUDE.md index e4d001d..3b867db 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -110,6 +110,13 @@ Both `WiFiDriverDemo` and `WiFiDriverTxDemo` honour: - `DEVOURER_SKIP_TXPWR=1` — skip the per-rate TX-power loop during channel switch (runs by default on every chip; escape hatch for RX-only experiments). +- `DEVOURER_RX_KEEP_CORRUPTED=1` — pass frames that fail the 802.11 FCS (CRC32) + or decryption-ICV check up to the host instead of dropping them at the WMAC + filter (sets RCR `ACRC32|AICV`). They arrive with `crc_err`/`icv_err` set on + the RX descriptor; `WiFiDriverDemo` surfaces them in its `` + output. This is the entry point for the fused-FEC sub-block-salvage layer + (see `docs/fused-fec.md`); opt-in, since a body with a corrupt tail is the + worst-case input for an IP-stack consumer that didn't ask for it. - `DEVOURER_USB_DEBUG=1` — raise libusb log level from the default WARNING to DEBUG (produces ~7 MB per 15 s — has filled `/tmp` mid-capture and adds 0.5-0.8 s to init even with stderr discarded). `DEVOURER_USB_QUIET` is @@ -157,6 +164,14 @@ is `DEVOURER_SVC_LADDER="CRIT=;T0=;T1=;..."` where each ``) | +| `fec_fusion_sim.py` | offline simulation: quantify SBI gain, size sub-blocks, no hardware | +| `test_*.py` | unit tests for each module (130 in the suite) | + +## Running it + +### Offline simulation (no hardware) + +```sh +cd tools/precoder +uv run python fec_fusion_sim.py --scheme rs --model slope --ber 3e-4 --sweep +``` + +Quantifies the SBI gain under uniform/slope BER + frame-loss and sweeps the +sub-block size. In the post-PHY-FEC residual-BER regime (small symbols, BER +3e-5..3e-4) SBI lifts message success from 23–73 % → 97–100 %; the knee is +≈32 B sub-blocks (≈6 % framing overhead). + +### chip↔chip on-air + +```sh +sudo bash tests/fused_fec_onair.sh # SKIP_RAIL=1 after a fresh boot +``` + +8812 TX at a fragile high MCS (the robust BPSK preamble/SIG keep the frame +detected while the 64-QAM data fails the FCS → corrupt-but-received) → 8821 RX +with `KEEP_CORRUPTED`. Reports `received / corrupt / FUSED-FEC GAIN`. See +[Reproducible corruption](#reproducible-corruption) for the recipe details. + +### SDR-RX over real air (rung-3) + +In `~/git/sdr2wifi` (needs the gr-ieee802-11 fork + GNU Radio 3.10): + +```sh +sudo bash run_fused_rung3.sh # 8812 TX HT MCS7 → B210 RX → SBI salvage +``` + +## Results + +- **chip↔chip, real silicon, 15 s, MCS7, ch6:** 4086 frames, 1072 corrupt + surfaced, 6013 sub-blocks salvaged, **FUSED-FEC GAIN = 129 RS blocks** the + baseline lost (≈13 % lift in delivered blocks). +- **SDR-RX, over real air, HT MCS7:** the pipeline runs end-to-end — 4044 + devourer HT frames decoded over the air, corrupt frames surfaced, SBI bridge + runs. The gain is small because the fork's **hard-decision** Viterbi produces + frame-wide corruption (see below), not because of a code defect. + +## Reproducible corruption + +To exercise the salvage path reproducibly on a bench without an attenuator +(fixed antennas, adapters inches apart): + +- **Use normal TX power.** Lowering chip TX power kills *detection* (preamble + too weak → zero frames received), it does not make frames partially corrupt. + Power controls reception; MCS (and an interferer) control data corruption. +- **A high MCS corrupts naturally.** MCS7's fragile 64-QAM data fails the FCS on + ~13–26 % of frames at normal power while the robust preamble keeps them + detected — no interferer required. +- **`tests/sdr_interferer.py`** turns a USRP B210 into a calibrated co-channel + AWGN/CW source. A given `--tx-gain` reproduces the same interference (fixed + RNG seed), so the FCS-failure rate becomes a function of one number — the + reproducible knob when the natural rate is too low. +- **`DEVOURER_TX_PWR_OVERRIDE`** (StreamTxDemo) forces an absolute per-rate + TXAGC index 0–63 for fine control. + +Note: a `uhubctl` power-cycle of a **root-hub** port (e.g. the 8812 on root hub +9) drops the device but does not re-enumerate it on power restore — it wedges +until a full host reboot. `tests/fused_fec_onair.sh` supports `SKIP_RAIL=1` to +use the current rail as-is after a clean boot. + +## SDR receiver changes (sister repos) + +The SDR-RX path required two changes to the `~/git/gr-ieee802-11` fork, both +mirroring devourer's `KEEP_CORRUPTED`: + +- **`lib/decode_mac.cc`** — `GR_KEEP_CORRUPTED` surfaces FCS-failed *legacy* + PSDUs tagged `crc_ok=#f` instead of dropping them. +- **`lib/frame_equalizer_impl.cc`** — HT/VHT/MIMO frames are decoded here (not + in `decode_mac`) and previously only printed their CRC result. A new `pdu` + message port publishes them (with `crc_ok` + `GR_KEEP_CORRUPTED`); the hier + block forwards it to `mac_out`. + +`~/git/sdr2wifi` holds the over-air receiver (`fused_fec_rung3.py`), the +software-loopback capstone (`fused_fec_rung1.py`), and validation tools +(`keep_corrupted_check.py`, `ht_hier_check.py`). + +## Future work: soft-decision SDR + +The SDR over-air gain is gated by the fork's **hard-decision** Viterbi: on a +marginal frame it diverges and corrupts the whole frame, leaving SBI nothing to +salvage. The fix is a **soft-decision** receive path — a soft demapper +(per-coded-bit LLRs for BPSK/QPSK/16/64-QAM) feeding a soft-input Viterbi — +so marginal frames carry a few *localized* residual errors instead of +frame-wide garbage, restoring the structure SBI exploits. The fork already has a +soft min-sum LDPC decoder, but only for R=1/2 n=648 (the robust regime that does +not corrupt over air); the fragile BCC rates (MCS1-7) are where the soft path is +needed. + +## References + +- Jamieson & Balakrishnan, "PPR: Partial Packet Recovery for Wireless Networks", SIGCOMM 2007. +- Han et al., "All Bits Are Not Equal — A Study of IEEE 802.11 Communication Bit Errors", INFOCOM 2009. +- Han et al., "Maranello: Practical Partial Packet Recovery for 802.11", NSDI 2010. +- Abdel-Khalek & Heath, "A Cross-Layer Design for Perceptual Optimization of H.264/SVC with UEP", JSAC 2012. +- Shokrollahi, "Raptor Codes", IEEE Trans. IT 2006 (RaptorQ: RFC 6330). Roca et al., RLC: RFC 8681. +- Rizzo, "Effective Erasure Codes for Reliable Computer Communication Protocols", 1997 (the GF(2⁸) Vandermonde RS). diff --git a/tests/fused_fec_onair.sh b/tests/fused_fec_onair.sh new file mode 100644 index 0000000..7229818 --- /dev/null +++ b/tests/fused_fec_onair.sh @@ -0,0 +1,114 @@ +#!/usr/bin/env bash +# On-air chip↔chip fused-FEC gain on real Realtek silicon. +# +# devourer TX (8812) → devourer RX (8821, DEVOURER_RX_KEEP_CORRUPTED=1). The TX +# flies at a FRAGILE high MCS (64-QAM, no LDPC): the robust BPSK preamble/SIG +# keep each frame DETECTED and surfaced while the 64-QAM data fails the FCS, so +# a fraction of frames arrive "received-but-corrupt" — exactly the salvage +# regime. At MCS7, inches apart, ~10-15% of frames are corrupt-but-received with +# NO interferer at all; the RX salvages the CRC-valid sub-blocks and the SBI +# decoder recovers RS blocks the drop-whole-frame baseline throws away. +# +# IMPORTANT recipe notes (learned on the bench): +# * Use NORMAL TX power. Low power (DEVOURER_TX_POWER small) doesn't make frames +# "partially corrupt" — it kills DETECTION (preamble too weak), so the RX gets +# ZERO frames. Power controls reception; MCS (and the optional interferer) +# control data corruption. +# * Capture-then-offline-analyse (not a live pipe): WiFiDriverDemo → raw log, +# then fused_fec_rx reads the log. Robust + re-analysable with different FEC +# params. +# * Optional USRP B210 interferer (USE_INTERFERER=1, IGAIN dB) raises the noise +# floor to BOOST and make the corrupt rate reproducible. It needs ~8-10 s of +# FPGA-load warmup before it actually radiates. +# +# ch6 (2.4 GHz) avoids the USB Vbus-sag gotcha. SKIP_RAIL=1 after a fresh boot. +# Run: sudo bash tests/fused_fec_onair.sh +set -u +ROOT="$(cd "$(dirname "$0")/.." && pwd)" +FEC="$ROOT/tools/precoder" +PY="${PY:-python3}" + +TX_PID=${TX_PID:-0x8812}; TX_VID=${TX_VID:-0x0bda}; TX=${TX_SYSFS:-9-2} +RX_PID=${RX_PID:-0x0120}; RX_VID=${RX_VID:-0x2357}; RX=${RX_SYSFS:-9-1.4} +CH=${CH:-6} +HUBS=${HUBS:-"9 2;9-1 4"} # ';'-sep "hub port" for the rail cycle +MODULES=${MODULES:-"rtw88_8812au rtw88_8821au"} +TX_RATE=${TX_RATE:-MCS7} # fragile 64-QAM → natural corruption +TX_POWER=${TX_POWER:-} # EMPTY = chip default (normal). Do NOT lower. +TX_PWR_OVERRIDE=${TX_PWR_OVERRIDE:-} # absolute TXAGC 0..63 (rarely needed) +# One RS block per frame (N=10): a single FCS failure is a whole-block loss for +# the baseline but recoverable by SBI when ≤2 of its 10 sub-blocks are bad. +# Raise --overhead for more salvage headroom (recovers more corrupt frames). +FEC_ARGS=${FEC_ARGS:-"--symbol-size 32 --overhead 0.25 --blocks-per-body 10 --k 8"} +SECS=${SECS:-15} # TX window +USE_INTERFERER=${USE_INTERFERER:-} # set to 1 to add the B210 noise booster +IGAIN=${IGAIN:-80} # B210 TX gain dB when USE_INTERFERER=1 +IMODE=${IMODE:-noise} # noise | cw + +RAW=/tmp/fused_fec_rx_raw.log + +KILL(){ sudo pkill -9 StreamTxDem 2>/dev/null; sudo pkill -9 WiFiDriverDem 2>/dev/null + sudo pkill -9 -f fused_fec 2>/dev/null; sudo pkill -9 -f sdr_interferer 2>/dev/null; } +trap KILL EXIT + +# Fresh rail. SKIP_RAIL=1 bypasses the uhubctl power-cycle — REQUIRED on adapters +# on a root-hub port (e.g. 8812 on root hub 9), where `uhubctl off` drops the +# device but power-restore does NOT re-enumerate it (wedges until a host reboot). +if [ -z "${SKIP_RAIL:-}" ]; then + echo "=== fresh rail (power-cycle Wi-Fi hub tree; B210 port untouched) ===" + echo " WARNING: on a root-hub port this can wedge the adapter; use SKIP_RAIL=1 post-reboot" + sudo modprobe -r $MODULES 2>/dev/null + IFS=';' read -ra _hubs <<< "$HUBS" + for hp in "${_hubs[@]}"; do sudo uhubctl -a off -l ${hp% *} -p ${hp#* } >/dev/null 2>&1; done; sleep 16 + for hp in "${_hubs[@]}"; do sudo uhubctl -a on -l ${hp% *} -p ${hp#* } >/dev/null 2>&1; done; sleep 9 + sudo modprobe $MODULES 2>/dev/null; sleep 2 +else + echo "=== SKIP_RAIL set — using current rail as-is (assumes a recent clean boot) ===" +fi + +# free both Wi-Fi adapters from the kernel driver (the B210 is uhd-accessed) +for D in "$TX" "$RX"; do + for i in /sys/bus/usb/devices/$D/$D:*; do + ifc=$(basename "$i"); drv=$(readlink -f "$i/driver" 2>/dev/null) + [ -n "$drv" ] && echo "$ifc" | sudo tee "$drv/unbind" >/dev/null 2>&1 + done +done; sleep 1 + +head -c 400000 /dev/urandom > /tmp/fused_fec_src.bin + +# Optional interferer — start it FIRST and let the FPGA load before TX. +if [ -n "$USE_INTERFERER" ]; then + echo "=== B210 interferer: ch$CH gain=${IGAIN}dB mode=$IMODE (warming up FPGA) ===" + sudo $PY "$ROOT/tests/sdr_interferer.py" --channel $CH --tx-gain "$IGAIN" \ + --rate 20e6 --mode "$IMODE" --secs $((SECS + 40)) >/tmp/intf.log 2>&1 & + sleep 11 +fi + +# RX: capture the 8821's surfaced frames (background, raw — robust vs a live pipe). +: > "$RAW" +sudo env DEVOURER_VID=$RX_VID DEVOURER_PID=$RX_PID DEVOURER_CHANNEL=$CH \ + DEVOURER_STREAM_OUT=1 DEVOURER_RX_KEEP_CORRUPTED=1 \ + timeout $((SECS + 30)) "$ROOT/build/WiFiDriverDemo" >"$RAW" 2>/dev/null & +RXBG=$! +sleep 8 # RX init (open + reset + monitor up) + +# TX: 8812 at the fragile MCS, NORMAL power, for the window. +echo "=== TX (8812 $TX_RATE${TX_POWER:+ pwr=$TX_POWER}) for ${SECS}s ===" +pwr_env="" +[ -n "$TX_POWER" ] && pwr_env="$pwr_env DEVOURER_TX_POWER=$TX_POWER" +[ -n "$TX_PWR_OVERRIDE" ] && pwr_env="$pwr_env DEVOURER_TX_PWR_OVERRIDE=$TX_PWR_OVERRIDE" +$PY "$FEC/fused_fec_tx.py" --input /tmp/fused_fec_src.bin --repeat 4 $FEC_ARGS 2>/dev/null \ + | sudo env DEVOURER_VID=$TX_VID DEVOURER_PID=$TX_PID DEVOURER_CHANNEL=$CH \ + DEVOURER_TX_RATE=$TX_RATE $pwr_env \ + timeout "$SECS" "$ROOT/build/StreamTxDemo" >/dev/null 2>&1 + +sleep 3 +sudo pkill -9 -f sdr_interferer 2>/dev/null +sudo pkill -9 WiFiDriverDem 2>/dev/null +wait "$RXBG" 2>/dev/null + +# Offline analysis: same FEC params, baseline vs SBI. +echo "=== fused-FEC result (offline analysis of the on-air capture) ===" +grep "" "$RAW" | $PY "$FEC/fused_fec_rx.py" $FEC_ARGS >/dev/null 2>/tmp/fused_gain.log +grep "fused_fec_rx" /tmp/fused_gain.log || echo " no frames captured — check TX rate/power and that both chips are unbound" +echo "=== raw capture at $RAW (re-analyse with other FEC params if desired) ===" diff --git a/tests/sdr_interferer.py b/tests/sdr_interferer.py new file mode 100644 index 0000000..d2ec92a --- /dev/null +++ b/tests/sdr_interferer.py @@ -0,0 +1,133 @@ +#!/usr/bin/env python3 +"""USRP B210 calibrated co-channel interferer — a reproducible SNR knob. + +When a conducted/attenuator setup isn't available (fixed antennas, adapters +inches apart with tens of dB of link margin), lowering chip TX power alone +can't reach the "received-but-FCS-failed" salvage regime. This instead raises +the noise floor at the receiver by transmitting band-filling AWGN (or a CW +tone) on the Wi-Fi channel from the B210. The FCS-failure rate then becomes a +reproducible function of ONE number — the USRP TX gain — with nothing moved. + +Reproducible by construction: the noise is drawn from a fixed-seed NumPy RNG, +so a given --tx-gain reproduces the same interference run to run. Sweep +--tx-gain to dial the corruption rate; lock it for a rig setpoint. + + # ch6 (2.437 GHz) AWGN at 70 dB gain for 20 s: + sudo python3 sdr_interferer.py --channel 6 --tx-gain 70 --secs 20 + # run until killed (the on-air harness backgrounds it): + sudo python3 sdr_interferer.py --channel 6 --tx-gain 70 + +SAFETY: this radiates on a live Wi-Fi channel. Start at LOW gain and raise it — +too much and the receiver is fully jammed (0 frames) instead of partially +corrupted. Use only on a bench you control. +""" +from __future__ import annotations + +import argparse +import signal +import sys +import time + + +def chan_to_freq(channel: int) -> float: + """2.4 GHz: ch1..14 → 2407 + 5*ch MHz (ch6 = 2437). 5 GHz: 5000 + 5*ch + (ch36 = 5180).""" + if channel <= 14: + return (2407 + 5 * channel) * 1e6 + return (5000 + 5 * channel) * 1e6 + + +def main(argv=None) -> int: + ap = argparse.ArgumentParser( + description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) + ap.add_argument("--args", default="", help="UHD device args (e.g. type=b200)") + ap.add_argument("--channel", type=int, default=None, + help="Wi-Fi channel (sets --freq); 2.4 GHz 1-14 / 5 GHz 36+") + ap.add_argument("--freq", type=float, default=None, help="center freq Hz") + ap.add_argument("--rate", type=float, default=20e6, + help="sample rate Hz = interference bandwidth (default 20e6)") + ap.add_argument("--tx-gain", type=float, default=60.0, + help="USRP TX gain dB — THE reproducible SNR knob") + ap.add_argument("--antenna", default="TX/RX") + ap.add_argument("--amplitude", type=float, default=0.3, + help="digital backoff 0..1 (keep fixed; use --tx-gain to vary power)") + ap.add_argument("--mode", default="noise", choices=("noise", "cw")) + ap.add_argument("--seed", type=int, default=1234, + help="RNG seed — fixes the noise so runs are reproducible") + ap.add_argument("--secs", type=float, default=0.0, + help="run time; 0 = until SIGINT/SIGTERM") + args = ap.parse_args(argv) + + if args.freq is None: + if args.channel is None: + ap.error("need --freq or --channel") + args.freq = chan_to_freq(args.channel) + + try: + import numpy as np + import uhd + except ImportError as e: + sys.stderr.write(f"sdr_interferer: import failed ({e}); UHD+NumPy " + "must be importable (system uhd, as in sdr_power_probe.py)\n") + return 2 + + usrp = uhd.usrp.MultiUSRP(args.args) + usrp.set_tx_rate(args.rate) + usrp.set_tx_freq(uhd.types.TuneRequest(args.freq)) + usrp.set_tx_gain(args.tx_gain) + try: + usrp.set_tx_antenna(args.antenna) + except Exception: + pass + + st = uhd.usrp.StreamArgs("fc32", "sc16") + st.channels = [0] + tx = usrp.get_tx_stream(st) + md = uhd.types.TXMetadata() + md.start_of_burst = True + md.end_of_burst = False + md.has_time_spec = False + + nsamps = 8192 + rng = np.random.default_rng(args.seed) + if args.mode == "cw": + # +2.5 MHz tone within the band — a deterministic, strong interferer. + t = np.arange(nsamps) / args.rate + tone = (args.amplitude * np.exp(2j * np.pi * 2.5e6 * t)).astype(np.complex64) + + sys.stderr.write( + f"[interferer] freq={args.freq/1e9:.4f} GHz rate={args.rate/1e6:g} MS/s " + f"gain={args.tx_gain:g} dB mode={args.mode} amp={args.amplitude} " + f"seed={args.seed}\n") + sys.stderr.flush() + + stop = {"flag": False} + signal.signal(signal.SIGINT, lambda *_: stop.update(flag=True)) + signal.signal(signal.SIGTERM, lambda *_: stop.update(flag=True)) + + t0 = time.monotonic() + sent = 0 + while not stop["flag"]: + if args.secs and (time.monotonic() - t0) >= args.secs: + break + if args.mode == "cw": + buf = tone + else: + buf = (args.amplitude * + (rng.standard_normal(nsamps) + 1j * rng.standard_normal(nsamps)) + / np.sqrt(2)).astype(np.complex64) + tx.send(buf.reshape(1, -1), md) + md.start_of_burst = False + sent += nsamps + + md.end_of_burst = True + try: + tx.send(np.zeros((1, 1), dtype=np.complex64), md) + except Exception: + pass + sys.stderr.write(f"[interferer] stopped ({sent} samples)\n") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tools/precoder/README.md b/tools/precoder/README.md index 8b0cf5b..0903a1f 100644 --- a/tools/precoder/README.md +++ b/tools/precoder/README.md @@ -36,9 +36,39 @@ Phase-B SDR extra (optional): `uv sync --extra sdr`. | `seed_probe.py` | discover / characterise the chip scrambler seed (rx + bruteforce) | | `fft_capture.py` | Phase-B per-subcarrier IQ verification (+ runnable `--self-test`) | | `test_pipeline.py` | pytest: scrambler/BCC/interleaver KATs + pipeline round-trips | +| `stream_fec*.py` | outer erasure codes: RaptorQ (`_raptorq`), RLC (`_rlc`), **Reed-Solomon (`_rs`)**; `stream_fec.py` dispatches by `FecConfig.scheme` | +| `fec_subblock.py` | **sub-block integrity (SBI)** — per-sub-block CRC so a kept-corrupt frame yields its surviving symbols as erasures, not a whole-frame loss | +| `svc_uep_fec.py` | per-SVC-layer FEC-rate UEP (heavy FEC on base/IDR, light on enhancement) — the app-FEC half of cross-layer UEP | +| `fec_fusion_sim.py` | offline sim quantifying the SBI gain + picking the sub-block size, no hardware | +| `fused_fec_link.py` + `fused_fec_tx/rx.py` | chip↔chip RS+SBI sender/receiver + CLIs (RX reports baseline-vs-SBI gain) | The repo-level end-to-end smoke is `tests/precoder_smoke.py` (skips without numpy). +## Fused FEC (sub-block integrity + RS + per-layer UEP) + +Three error-correction layers stacked into one concatenated code for the video +downlink: the **PHY MCS** code (inner, picked per-packet via radiotap/`TxMode`), +a **sub-block-integrity** layer (`fec_subblock.py`) that turns the chip's +all-or-nothing 802.11 FCS into per-symbol erasures, and an **outer erasure +code** (RaptorQ/RLC/**RS**). With `DEVOURER_RX_KEEP_CORRUPTED=1` a frame that +fails the FCS is kept; only its genuinely-corrupt sub-blocks are dropped, so the +outer code recovers from far fewer erasures than "whole frame lost". + +```sh +# Size the sub-blocks for your link's residual BER (no radio): +uv run python fec_fusion_sim.py --scheme rs --model slope --ber 3e-4 --sweep + +# On-air chip↔chip gain (8812 TX → 8821 RX), baseline vs SBI: +sudo bash ../../tests/fused_fec_onair.sh # reports FUSED-FEC GAIN + +# SDR-RX path (USRP B210, gr-ieee802-11 fork w/ GR_KEEP_CORRUPTED): see +# ~/git/sdr2wifi/fused_fec_rung1.py — end-to-end over the real OFDM PHY. +``` + +The PHY-MCS half of SVC unequal error protection lives in C++ +(`txdemo/svc_tx_demo/svc_tx.h`, `DEVOURER_SVC_LADDER`); `svc_uep_fec.py` adds the +matching FEC-rate ladder so base/IDR layers get robust MCS **and** heavy FEC. + ## End-to-end recipe ```sh diff --git a/tools/precoder/fec_fusion_sim.py b/tools/precoder/fec_fusion_sim.py new file mode 100644 index 0000000..d07a8b9 --- /dev/null +++ b/tools/precoder/fec_fusion_sim.py @@ -0,0 +1,245 @@ +#!/usr/bin/env python3 +"""Offline simulation of the fused-FEC link — quantifies the sub-block-integrity +(SBI) recovery gain and picks the sub-block size, with NO hardware. + +Runs the *real* pipeline end to end — `stream_fec.make_encoder` → +`fec_subblock.pack` → a byte-level channel → `fec_subblock.unpack` → +`stream_fec.make_decoder` — so the numbers reflect the exact code that runs on +air, not a model of it. For each radio body it compares two receivers: + + * baseline — the chip drops any FCS-failed frame, so a body with even one + bit error contributes zero symbols (today's behaviour); + * sbi — the body is kept (DEVOURER_RX_KEEP_CORRUPTED) and each sub-block + is CRC-checked independently; survivors feed the outer decoder. + +Channel models (per radio body): + * uniform — every bit flips with probability --ber. + * slope — "All Bits Are Not Equal" (INFOCOM 2009): per-bit error + probability rises linearly with byte position, 0 at the head to + 2×--ber at the tail (sync/phase drift accumulates). + * frameloss — independent whole-frame loss with probability --frame-loss + (stacks with the bit model; both receivers lose these equally). + +Usage: + uv run python fec_fusion_sim.py --scheme rs --k 8 --overhead 0.5 \ + --symbol-size 128 --model slope --ber 2e-3 --trials 200 --sweep +""" + +from __future__ import annotations + +import argparse +import os +import random +import sys +from dataclasses import dataclass + +_HERE = os.path.dirname(os.path.abspath(__file__)) +if _HERE not in sys.path: + sys.path.insert(0, _HERE) + +import fec_subblock # noqa: E402 +import stream_fec # noqa: E402 +from stream_fec import FecConfig # noqa: E402 + + +@dataclass +class Channel: + model: str # uniform | slope + ber: float # base bit-error rate + frame_loss: float # independent whole-frame loss prob + rng: random.Random + + def corrupt(self, body: bytes) -> "tuple[bytes, bool, bool]": + """Return (body_out, frame_lost, fcs_failed). + + frame_lost: the whole frame vanished (neither RX sees it). + fcs_failed: at least one bit flipped (the chip FCS would fail). + """ + if self.rng.random() < self.frame_loss: + return body, True, True + n = len(body) + out = bytearray(body) + flipped = False + for i in range(n): + if self.model == "slope": + # 0 at head → 2*ber at tail. + p = self.ber * 2.0 * (i / max(1, n - 1)) + else: + p = self.ber + if p <= 0: + continue + byte = out[i] + for bit in range(8): + if self.rng.random() < p: + byte ^= (1 << bit) + flipped = True + out[i] = byte + return bytes(out), False, flipped + + +@dataclass +class TrialResult: + baseline_ok: bool + sbi_ok: bool + frames: int + frames_lost: int + frames_fcs_failed: int + subblocks: int + subblocks_failed: int + + +def _make_message(n_packets: int, pkt_len: int, rng: random.Random) -> list[bytes]: + return [bytes(rng.randrange(256) for _ in range(pkt_len)) + for _ in range(n_packets)] + + +def run_trial(cfg: FecConfig, blocks_per_body: int, n_packets: int, + pkt_len: int, chan: Channel, crc_bytes: int, + rng: random.Random) -> TrialResult: + msg = _make_message(n_packets, pkt_len, rng) + + enc = stream_fec.make_encoder(cfg) + envs: list[bytes] = [] + for p in msg: + envs += enc.add_packet(p) + envs += enc.flush() + if not envs: + raise RuntimeError("encoder produced no symbols") + env_size = len(envs[0]) + if any(len(e) != env_size for e in envs): + raise RuntimeError("non-uniform envelope size — SBI needs fixed blocks") + + bodies = fec_subblock.pack(envs, block_payload=env_size, + blocks_per_body=blocks_per_body, + crc_bytes=crc_bytes) + + base_dec = stream_fec.make_decoder(cfg) + sbi_dec = stream_fec.make_decoder(cfg) + base_out: list[bytes] = [] + sbi_out: list[bytes] = [] + n_lost = n_fcs = sub_total = sub_failed = 0 + + for body in bodies: + corrupt, lost, fcs_failed = chan.corrupt(body) + if lost: + n_lost += 1 + if fcs_failed: + n_fcs += 1 + + # Baseline: a clean (no-bit-error, not-lost) frame passes all its + # sub-blocks; an FCS-failed or lost frame contributes nothing. + if not lost and not fcs_failed: + for env in fec_subblock.unpack(body, env_size, crc_bytes).survivors: + base_out += base_dec.add_symbol(env) + + # SBI: a lost frame is gone; otherwise salvage surviving sub-blocks. + if not lost: + res = fec_subblock.unpack(corrupt, env_size, crc_bytes) + sub_total += res.n_blocks + sub_failed += res.n_failed + for env in res.survivors: + sbi_out += sbi_dec.add_symbol(env) + + want = set(msg) + return TrialResult( + baseline_ok=set(base_out) >= want, + sbi_ok=set(sbi_out) >= want, + frames=len(bodies), + frames_lost=n_lost, + frames_fcs_failed=n_fcs, + subblocks=sub_total, + subblocks_failed=sub_failed, + ) + + +@dataclass +class Summary: + blocks_per_body: int + trials: int + baseline_success: float + sbi_success: float + mean_subblock_loss: float + overhead_pct: float + env_size: int + + +def run(cfg: FecConfig, blocks_per_body: int, n_packets: int, pkt_len: int, + chan_factory, crc_bytes: int, trials: int, + seed: int) -> Summary: + base_ok = sbi_ok = 0 + sub_total = sub_failed = 0 + env_size = 0 + for t in range(trials): + rng = random.Random(seed + t) + chan = chan_factory(rng) + r = run_trial(cfg, blocks_per_body, n_packets, pkt_len, chan, + crc_bytes, rng) + base_ok += int(r.baseline_ok) + sbi_ok += int(r.sbi_ok) + sub_total += r.subblocks + sub_failed += r.subblocks_failed + # Derive env_size from a throwaway encode for overhead accounting. + enc = stream_fec.make_encoder(cfg) + tmp: list[bytes] = [] + for p in _make_message(n_packets, pkt_len, random.Random(0)): + tmp += enc.add_packet(p) + tmp += enc.flush() + env_size = len(tmp[0]) + body_payload = blocks_per_body * env_size + overhead = fec_subblock.overhead_bytes(blocks_per_body, crc_bytes) + return Summary( + blocks_per_body=blocks_per_body, + trials=trials, + baseline_success=base_ok / trials, + sbi_success=sbi_ok / trials, + mean_subblock_loss=(sub_failed / sub_total) if sub_total else 0.0, + overhead_pct=100.0 * overhead / (overhead + body_payload), + env_size=env_size, + ) + + +def main(argv=None) -> int: + ap = argparse.ArgumentParser( + description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) + ap.add_argument("--scheme", default="rs", choices=("raptorq", "rlc", "rs")) + ap.add_argument("--k", type=int, default=8) + ap.add_argument("--symbol-size", type=int, default=128) + ap.add_argument("--overhead", type=float, default=0.5) + ap.add_argument("--blocks-per-body", type=int, default=4) + ap.add_argument("--message-packets", type=int, default=8) + ap.add_argument("--packet-bytes", type=int, default=100) + ap.add_argument("--model", default="slope", choices=("uniform", "slope")) + ap.add_argument("--ber", type=float, default=2e-3) + ap.add_argument("--frame-loss", type=float, default=0.0) + ap.add_argument("--crc-bytes", type=int, default=2, choices=(2, 4)) + ap.add_argument("--trials", type=int, default=200) + ap.add_argument("--seed", type=int, default=1) + ap.add_argument("--sweep", action="store_true", + help="sweep blocks-per-body 1..N and tabulate the knee") + args = ap.parse_args(argv) + + cfg = FecConfig(k=args.k, symbol_size=args.symbol_size, + overhead=args.overhead, scheme=args.scheme) + + def chan_factory(rng): + return Channel(model=args.model, ber=args.ber, + frame_loss=args.frame_loss, rng=rng) + + bpb_list = ([1, 2, 4, 8, 16] if args.sweep else [args.blocks_per_body]) + print(f"# fused-FEC sim: scheme={args.scheme} k={args.k} " + f"symbol_size={args.symbol_size} overhead={args.overhead} " + f"model={args.model} ber={args.ber} frame_loss={args.frame_loss} " + f"crc={args.crc_bytes}B trials={args.trials}") + print(f"# {'blk/body':>8} {'env_B':>6} {'ovh%':>6} " + f"{'subloss%':>8} {'baseline':>9} {'sbi':>6}") + for bpb in bpb_list: + s = run(cfg, bpb, args.message_packets, args.packet_bytes, + chan_factory, args.crc_bytes, args.trials, args.seed) + print(f" {s.blocks_per_body:>8} {s.env_size:>6} " + f"{s.overhead_pct:>5.1f}% {100 * s.mean_subblock_loss:>7.2f}% " + f"{100 * s.baseline_success:>8.1f}% {100 * s.sbi_success:>5.1f}%") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tools/precoder/fec_subblock.py b/tools/precoder/fec_subblock.py new file mode 100644 index 0000000..d930277 --- /dev/null +++ b/tools/precoder/fec_subblock.py @@ -0,0 +1,241 @@ +"""Sub-Block Integrity (SBI) framing — the "fused FEC" middle layer. + +Packs several fixed-size outer-code symbols (FEC envelopes from +`stream_fec.make_encoder`) into one radio body, each guarded by its own CRC, +so a frame that fails the 802.11 FCS — surfaced via +`DEVOURER_RX_KEEP_CORRUPTED` — yields its *surviving* sub-blocks as good +symbols and erases only the corrupted ones, instead of the whole frame +becoming a single erasure. + +Why this matters +---------------- +Today (`tun_p2p.py`) one FEC envelope == one radio frame, so a CRC-failed +frame = one whole erased symbol. The outer erasure code (RaptorQ / RLC / RS) +then has to reconstruct that symbol from repair. Real 802.11 corruption is +*localized* ("All Bits Are Not Equal", INFOCOM 2009): a frame that fails the +FCS is usually mostly-correct. By splitting the body into independently +checksummed sub-blocks we convert one whole-frame erasure into a few +sub-block erasures — and the outer decoder is **unchanged**: a dropped +sub-block is simply a symbol that didn't arrive, which is exactly what FEC +recovers. This is the concatenation + PHY-MCS FEC (inner) ⊕ sub-block integrity ⊕ outer erasure code +that befinitiv sketched for wifibroadcast but never built. + +Wire format +----------- + body = SBI_HDR | block[0] | block[1] | … | block[n-1] + SBI_HDR = MAGIC(2 LE) | VER(1) | STREAM_ID(1) | BLOCK_PAYLOAD(2 LE) | N_BLOCKS(1) (7 B) + block = CRC16(2 LE over PAYLOAD) | PAYLOAD(BLOCK_PAYLOAD bytes) + +STREAM_ID multiplexes independent sub-block streams over one SBI framing — used +by the per-SVC-layer UEP encoder (`svc_uep_fec.py`) to tag which temporal layer +a body belongs to so the receiver routes it to that layer's FEC decoder. + +Fixed-size sub-blocks on purpose +-------------------------------- +The blocks are NOT self-delimiting. A self-delimiting (length-prefixed) +framing would desync every following sub-block the moment a length byte got +flipped — defeating the entire purpose. With a fixed `block_payload` the +receiver partitions the body at *known* offsets, so a hit in block i is +contained to block i. The receiver therefore trusts its own configured +`block_payload`, not the (possibly corrupt) header — the header is a sanity +tag for clean frames, never load-bearing under corruption. + +False-accept note +----------------- +A corrupted sub-block whose CRC16 happens to match (~1/65536) feeds a bad +symbol to the outer decoder. CRC32 (`crc_bytes=4`) drops that to ~1/2^32 at ++2 B/block; CRC16 is the default to match `stream.py`. The PoC reports the +overhead so the trade is explicit. +""" + +from __future__ import annotations + +import struct +from dataclasses import dataclass +from typing import Iterable + + +def crc16_ccitt(data: bytes, init: int = 0xFFFF) -> int: + """CRC-16-CCITT-FALSE (poly 0x1021, init 0xFFFF) — byte-identical to + `stream.crc16_ccitt`. Inlined (not imported) so this module stays a + dependency-light, numpy-free primitive usable inside the GNU Radio + Python env (the SDR→SBI bridge) as well as the precoder toolchain.""" + crc = init & 0xFFFF + for byte in data: + crc ^= (byte & 0xFF) << 8 + for _ in range(8): + crc = ((crc << 1) ^ 0x1021) & 0xFFFF if (crc & 0x8000) \ + else (crc << 1) & 0xFFFF + return crc + + +SBI_MAGIC = 0xF5B0 +SBI_VERSION = 0 +SBI_HDR_STRUCT = " int: + """CRC over `data`, `width` ∈ {2, 4}. CRC32 reuses Python's zlib so we + keep the strong option dependency-free; CRC16 mirrors stream.py.""" + if width == 2: + return crc16_ccitt(data) & 0xFFFF + if width == 4: + import zlib + return zlib.crc32(data) & 0xFFFFFFFF + raise ValueError(f"crc_bytes must be 2 or 4 (got {width})") + + +@dataclass(frozen=True) +class UnpackResult: + """Outcome of unpacking one radio body. + + `survivors` are the envelope payloads whose sub-block CRC validated, in + body order — feed each straight to the outer decoder's `add_symbol`. + `n_blocks` / `n_failed` quantify the salvage (n_failed == 0 on a clean + body; on a kept-corrupt body it's how many sub-blocks were lost, i.e. the + erasures the outer code now has to cover — far fewer than the whole frame). + """ + survivors: list[bytes] + n_blocks: int + n_failed: int + header_ok: bool + stream_id: int = 0 + + +class SubBlockPacker: + """Batch fixed-size FEC envelopes into SBI radio bodies. + + Stateful so a caller can stream envelopes in (mirrors the FEC encoder's + `add`/`flush` shape): `add(env)` returns a body once `blocks_per_body` + have accumulated, `flush()` emits a short final body. + + All envelopes in one packer must share `block_payload` (true for a single + FEC scheme/config — RLC and RaptorQ both pad symbols to a fixed size). + """ + + def __init__(self, block_payload: int, blocks_per_body: int, + crc_bytes: int = 2, stream_id: int = 0) -> None: + if block_payload <= 0 or block_payload > 0xFFFF: + raise ValueError(f"block_payload must be 1..65535 (got {block_payload})") + if blocks_per_body <= 0 or blocks_per_body > 0xFF: + raise ValueError(f"blocks_per_body must be 1..255 (got {blocks_per_body})") + if crc_bytes not in (2, 4): + raise ValueError("crc_bytes must be 2 or 4") + if not (0 <= stream_id <= 0xFF): + raise ValueError(f"stream_id must be 0..255 (got {stream_id})") + self.block_payload = block_payload + self.blocks_per_body = blocks_per_body + self.crc_bytes = crc_bytes + self.stream_id = stream_id + self._pending: list[bytes] = [] + self.bodies_out = 0 + self.blocks_out = 0 + + @property + def block_stride(self) -> int: + return self.crc_bytes + self.block_payload + + def body_capacity(self) -> int: + """Max body size this packer emits (full body).""" + return SBI_HDR_LEN + self.blocks_per_body * self.block_stride + + def add(self, envelope: bytes) -> list[bytes]: + if len(envelope) != self.block_payload: + raise ValueError( + f"envelope {len(envelope)}B != block_payload {self.block_payload}B " + "(all envelopes in one packer must be the same fixed size)") + self._pending.append(bytes(envelope)) + out: list[bytes] = [] + while len(self._pending) >= self.blocks_per_body: + batch = self._pending[:self.blocks_per_body] + self._pending = self._pending[self.blocks_per_body:] + out.append(self._build_body(batch)) + return out + + def flush(self) -> list[bytes]: + if not self._pending: + return [] + batch = self._pending + self._pending = [] + return [self._build_body(batch)] + + def _build_body(self, batch: list[bytes]) -> bytes: + out = bytearray(struct.pack(SBI_HDR_STRUCT, SBI_MAGIC, SBI_VERSION, + self.stream_id, self.block_payload, + len(batch))) + for env in batch: + crc = _crc(self.crc_bytes, env) + out += crc.to_bytes(self.crc_bytes, "little") + out += env + self.bodies_out += 1 + self.blocks_out += len(batch) + return bytes(out) + + +def pack(envelopes: Iterable[bytes], block_payload: int, blocks_per_body: int, + crc_bytes: int = 2, stream_id: int = 0) -> list[bytes]: + """Stateless convenience wrapper around SubBlockPacker.""" + p = SubBlockPacker(block_payload, blocks_per_body, crc_bytes, stream_id) + out: list[bytes] = [] + for env in envelopes: + out += p.add(env) + out += p.flush() + return out + + +def unpack(body: bytes, block_payload: int, crc_bytes: int = 2) -> UnpackResult: + """Split a radio body into sub-blocks and return the CRC-valid survivors. + + `block_payload` / `crc_bytes` come from the receiver's config and are + authoritative — the body's header is sanity-checked but never trusted to + drive partitioning, so a corrupted header can't desync the scan. Works + identically on a clean body (n_failed == 0) and a kept-corrupt one. + """ + stride = crc_bytes + block_payload + header_ok = False + stream_id = 0 + if len(body) >= SBI_HDR_LEN: + magic, ver, stream_id, hdr_bp, _hdr_n = struct.unpack_from( + SBI_HDR_STRUCT, body) + header_ok = (magic == SBI_MAGIC and ver == SBI_VERSION + and hdr_bp == block_payload) + + region = body[SBI_HDR_LEN:] + n_blocks = len(region) // stride + survivors: list[bytes] = [] + n_failed = 0 + for i in range(n_blocks): + off = i * stride + crc_field = int.from_bytes(region[off:off + crc_bytes], "little") + payload = region[off + crc_bytes:off + stride] + if _crc(crc_bytes, payload) == crc_field: + survivors.append(bytes(payload)) + else: + n_failed += 1 + return UnpackResult(survivors=survivors, n_blocks=n_blocks, + n_failed=n_failed, header_ok=header_ok, + stream_id=stream_id) + + +def peek_stream_id(body: bytes): + """Return the SBI STREAM_ID from a body's header, or None on a bad header. + + The header sits at a fixed 7-byte offset independent of block_payload, so a + multiplexing receiver can route a body to the right stream before it knows + that stream's envelope size. Under corruption the routing may be wrong, but + the wrong stream's decoder then rejects the mismatched sub-blocks — a + dropped body, never a mis-decode. + """ + if len(body) < SBI_HDR_LEN: + return None + magic, ver, stream_id, _bp, _n = struct.unpack_from(SBI_HDR_STRUCT, body) + if magic != SBI_MAGIC or ver != SBI_VERSION: + return None + return stream_id + + +def overhead_bytes(blocks_per_body: int, crc_bytes: int = 2) -> int: + """Per-body framing overhead (header + per-block CRCs), for sizing logs.""" + return SBI_HDR_LEN + blocks_per_body * crc_bytes diff --git a/tools/precoder/fused_fec_link.py b/tools/precoder/fused_fec_link.py new file mode 100644 index 0000000..b92c6da --- /dev/null +++ b/tools/precoder/fused_fec_link.py @@ -0,0 +1,130 @@ +"""Fused-FEC link core — RS + sub-block-integrity (SBI) sender/receiver that +runs over the devourer chip↔chip stream link (StreamTxDemo / WiFiDriverDemo). + +This is the chip-path (scenario 1) sibling of the SDR capstone +(`~/git/sdr2wifi/fused_fec_rung1.py`): the same SBI-over-RS framing, but the +receiver consumes `` lines (with DEVOURER_RX_KEEP_CORRUPTED) +instead of gr-ieee802-11 PDUs. The chip gives only hard corrupted bytes (no +LLRs), so erasure localization is purely the per-sub-block CRC. + +Both ends share a `FecConfig` (scheme=rs) and `blocks_per_body`; the receiver +derives the envelope size the same way the sender does, so a body partitions +deterministically even when the chip flagged it corrupt. + +The receiver runs TWO decoders in lockstep so an on-air run reports the gain +directly: + * baseline — drops any crc_err frame (today's behaviour), + * sbi — keeps it and salvages the CRC-valid sub-blocks. + +CLI wrappers: `fused_fec_tx.py` (bytes→bodies→StreamTxDemo) and +`fused_fec_rx.py` (``→recovered bytes + gain report). +""" + +from __future__ import annotations + +import os +import sys +from dataclasses import dataclass, field + +_HERE = os.path.dirname(os.path.abspath(__file__)) +if _HERE not in sys.path: + sys.path.insert(0, _HERE) + +import fec_subblock # noqa: E402 +import stream_fec # noqa: E402 +from stream_fec import FecConfig # noqa: E402 + + +def env_size(cfg: FecConfig) -> int: + """Outer-code envelope (symbol) size for a config — by a throwaway encode, + so no scheme header length is hard-coded. Identical on both ends.""" + enc = stream_fec.make_encoder(cfg) + envs: list[bytes] = [] + for _ in range(cfg.k): + envs += enc.add_packet(b"\x00" * min(8, cfg.max_packet_size)) + envs += enc.flush() + return len(envs[0]) + + +def _chunks(data: bytes, n: int): + for i in range(0, len(data), n): + yield data[i:i + n] + + +class FusedFecSender: + def __init__(self, cfg: FecConfig, blocks_per_body: int, + crc_bytes: int = 2, stream_id: int = 0) -> None: + self.cfg = cfg + self.enc = stream_fec.make_encoder(cfg) + self.env = env_size(cfg) + self.packer = fec_subblock.SubBlockPacker( + self.env, blocks_per_body, crc_bytes=crc_bytes, stream_id=stream_id) + self.mtu = cfg.max_packet_size + + def add_bytes(self, data: bytes) -> list[bytes]: + bodies: list[bytes] = [] + for chunk in _chunks(data, self.mtu): + for envelope in self.enc.add_packet(chunk): + bodies += self.packer.add(envelope) + return bodies + + def flush(self) -> list[bytes]: + bodies: list[bytes] = [] + for envelope in self.enc.flush(): + bodies += self.packer.add(envelope) + bodies += self.packer.flush() + return bodies + + +@dataclass +class RxReport: + frames_seen: int = 0 + frames_corrupt: int = 0 + subblocks_total: int = 0 + subblocks_salvaged: int = 0 # CRC-valid sub-blocks from corrupt frames + base_blocks: int = 0 + sbi_blocks: int = 0 + base_packets: int = 0 + sbi_packets: int = 0 + + +class FusedFecReceiver: + """Consumes (body, crc_err) frames; runs baseline + SBI decoders in + parallel. `add_frame` returns the packets the SBI decoder recovered from + this frame (the live output); the gain vs baseline is in `report()`.""" + + def __init__(self, cfg: FecConfig, blocks_per_body: int, + crc_bytes: int = 2) -> None: + self.cfg = cfg + self.env = env_size(cfg) + self.crc_bytes = crc_bytes + self.base_dec = stream_fec.make_decoder(cfg) + self.sbi_dec = stream_fec.make_decoder(cfg) + self.r = RxReport() + + def add_frame(self, body: bytes, crc_err: bool) -> list[bytes]: + self.r.frames_seen += 1 + if crc_err: + self.r.frames_corrupt += 1 + res = fec_subblock.unpack(body, self.env, self.crc_bytes) + self.r.subblocks_total += res.n_blocks + if crc_err: + self.r.subblocks_salvaged += len(res.survivors) + + # Baseline: a crc_err frame is dropped wholesale. + if not crc_err: + for env in res.survivors: + got = self.base_dec.add_symbol(env) + self.r.base_packets += len(got) + # SBI: salvage the surviving sub-blocks regardless. + out: list[bytes] = [] + for env in res.survivors: + got = self.sbi_dec.add_symbol(env) + out += got + self.r.sbi_packets += len(got) + return out + + def report(self) -> RxReport: + self.r.base_blocks = self.base_dec.blocks_decoded + self.r.sbi_blocks = self.sbi_dec.blocks_decoded + return self.r diff --git a/tools/precoder/fused_fec_rx.py b/tools/precoder/fused_fec_rx.py new file mode 100644 index 0000000..2c46531 --- /dev/null +++ b/tools/precoder/fused_fec_rx.py @@ -0,0 +1,80 @@ +#!/usr/bin/env python3 +"""Fused-FEC RX driver — → SBI salvage → RS decode. + +Reads WiFiDriverDemo's stdout (run it with DEVOURER_STREAM_OUT=1 AND +DEVOURER_RX_KEEP_CORRUPTED=1 so FCS-failed bodies are surfaced), salvages each +frame's CRC-valid sub-blocks, and Reed-Solomon-decodes the recovered symbols. +Recovered packet bytes go to stdout; the baseline-vs-SBI gain report to stderr. + + DEVOURER_PID=0x8821 DEVOURER_STREAM_OUT=1 DEVOURER_RX_KEEP_CORRUPTED=1 \ + ./build/WiFiDriverDemo | python3 fused_fec_rx.py > recovered.bin + +FEC parameters MUST match fused_fec_tx.py. +""" +from __future__ import annotations + +import argparse +import os +import re +import sys +import time + +_HERE = os.path.dirname(os.path.abspath(__file__)) +if _HERE not in sys.path: + sys.path.insert(0, _HERE) + +from fused_fec_link import FusedFecReceiver # noqa: E402 +from fused_fec_tx import add_fec_args # noqa: E402 (shared FEC arg set) +from stream_fec import FecConfig # noqa: E402 + +_STREAM_RE = re.compile( + r"rate=(?P\d+)\s+len=(?P\d+)" + r"(?:\s+crc_err=(?P\d+))?" + r"(?:\s+icv_err=(?P\d+))?" + r".*?\s+body=(?P[0-9a-fA-F]*)") + + +def main(argv=None) -> int: + ap = argparse.ArgumentParser( + description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) + add_fec_args(ap) + ap.add_argument("--idle-timeout", type=float, default=0.0, + help="exit when no new frame for this many seconds (0=off)") + args = ap.parse_args(argv) + + cfg = FecConfig(k=args.k, symbol_size=args.symbol_size, + overhead=args.overhead, scheme=args.scheme) + rcv = FusedFecReceiver(cfg, args.blocks_per_body, crc_bytes=args.crc_bytes) + + out = sys.stdout.buffer + last = time.monotonic() + for line in sys.stdin: + m = _STREAM_RE.search(line) + if not m: + if args.idle_timeout and (time.monotonic() - last) > args.idle_timeout: + break + continue + body = bytes.fromhex(m.group("hex")) + crc_err = bool(int(m.group("crc_err") or 0)) or \ + bool(int(m.group("icv_err") or 0)) + for pkt in rcv.add_frame(body, crc_err): + out.write(pkt) + out.flush() + last = time.monotonic() + if args.idle_timeout and (time.monotonic() - last) > args.idle_timeout: + break + + r = rcv.report() + gain = r.sbi_blocks - r.base_blocks + sys.stderr.write( + f"fused_fec_rx: frames={r.frames_seen} corrupt={r.frames_corrupt} " + f"sub-blocks={r.subblocks_total} salvaged={r.subblocks_salvaged}\n" + f"fused_fec_rx: baseline blocks={r.base_blocks} pkts={r.base_packets}\n" + f"fused_fec_rx: sbi blocks={r.sbi_blocks} pkts={r.sbi_packets}\n" + f"fused_fec_rx: FUSED-FEC GAIN = {gain} block(s) recovered that the " + f"drop-whole-frame baseline lost\n") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tools/precoder/fused_fec_tx.py b/tools/precoder/fused_fec_tx.py new file mode 100644 index 0000000..dd7c415 --- /dev/null +++ b/tools/precoder/fused_fec_tx.py @@ -0,0 +1,74 @@ +#!/usr/bin/env python3 +"""Fused-FEC TX driver — bytes → RS+SBI bodies → StreamTxDemo. + +Reads a byte stream (stdin or --input), Reed-Solomon-encodes it and packs the +outer-code symbols into sub-block-integrity (SBI) bodies, then writes +length-prefixed PSDU bodies to stdout for the C++ StreamTxDemo: + + python3 fused_fec_tx.py --input data.bin | DEVOURER_PID=0x8812 ./build/StreamTxDemo + +Wire format on stdout: + +The FEC parameters (--k/--symbol-size/--overhead/--blocks-per-body/--scheme) +MUST match fused_fec_rx.py on the receiver. +""" +from __future__ import annotations + +import argparse +import os +import struct +import sys + +_HERE = os.path.dirname(os.path.abspath(__file__)) +if _HERE not in sys.path: + sys.path.insert(0, _HERE) + +from fused_fec_link import FusedFecSender # noqa: E402 +from stream_fec import FecConfig # noqa: E402 + + +def add_fec_args(ap: argparse.ArgumentParser) -> None: + ap.add_argument("--k", type=int, default=8) + ap.add_argument("--symbol-size", type=int, default=64) + ap.add_argument("--overhead", type=float, default=0.5) + ap.add_argument("--blocks-per-body", type=int, default=4) + ap.add_argument("--scheme", default="rs", choices=("rs", "raptorq", "rlc")) + ap.add_argument("--crc-bytes", type=int, default=2, choices=(2, 4)) + + +def main(argv=None) -> int: + ap = argparse.ArgumentParser( + description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) + ap.add_argument("--input", default=None, help="read bytes from file (else stdin)") + ap.add_argument("--repeat", type=int, default=1, + help="emit each body this many times (combats RX warmup loss)") + add_fec_args(ap) + args = ap.parse_args(argv) + + cfg = FecConfig(k=args.k, symbol_size=args.symbol_size, + overhead=args.overhead, scheme=args.scheme) + snd = FusedFecSender(cfg, args.blocks_per_body, crc_bytes=args.crc_bytes) + + src = sys.stdin.buffer if args.input is None else open(args.input, "rb") + data = src.read() + if args.input is not None: + src.close() + + bodies = snd.add_bytes(data) + snd.flush() + out = sys.stdout.buffer + nbytes = 0 + for body in bodies: + chunk = struct.pack(" None: - if self.scheme not in ("raptorq", "rlc"): + if self.scheme not in ("raptorq", "rlc", "rs"): raise ValueError( - f"scheme must be 'raptorq' or 'rlc' (got {self.scheme!r})") + f"scheme must be 'raptorq', 'rlc' or 'rs' (got {self.scheme!r})") if self.k <= 0 or self.k > 255: raise ValueError(f"k must be in 1..255 (got {self.k})") if self.symbol_size <= PACKET_LEN_PREFIX: @@ -132,6 +133,9 @@ def make_encoder(cfg: FecConfig): if cfg.scheme == "rlc": from stream_fec_rlc import RlcEncoder return RlcEncoder(cfg) + if cfg.scheme == "rs": + from stream_fec_rs import RsEncoder + return RsEncoder(cfg) raise ValueError(f"unknown FEC scheme {cfg.scheme!r}") @@ -144,6 +148,9 @@ def make_decoder(cfg: FecConfig): if cfg.scheme == "rlc": from stream_fec_rlc import RlcDecoder return RlcDecoder(cfg) + if cfg.scheme == "rs": + from stream_fec_rs import RsDecoder + return RsDecoder(cfg) raise ValueError(f"unknown FEC scheme {cfg.scheme!r}") diff --git a/tools/precoder/stream_fec_rs.py b/tools/precoder/stream_fec_rs.py new file mode 100644 index 0000000..db359e5 --- /dev/null +++ b/tools/precoder/stream_fec_rs.py @@ -0,0 +1,393 @@ +"""Reed-Solomon block erasure FEC for the stream link. + +The third outer scheme alongside RaptorQ (`stream_fec_raptorq.py`) and RLC +(`stream_fec_rlc.py`). RS is **MDS** — any K of the N = K + repair symbols +reconstruct the block exactly, with zero overhead — which makes it the best +choice for the small, low-latency blocks a video downlink wants (where +RaptorQ's probabilistic overhead is worst). This mirrors what +wfb-ng/wifibroadcast use: a systematic Reed-Solomon over GF(2^8) built from a +Vandermonde matrix (poly 0x11d, the zfec/Rizzo construction). + +Wire format (inside the outer `StreamFrame.payload`): + + RS_MAGIC (2) = 0xF540 little-endian + VERSION/FLAGS (1) = 0 + K (1) = source symbols per block + KREAL (1) = real source symbols in THIS block (1..K); the + trailing (K - KREAL) are zero pads (flush) + SYMBOL_SIZE (2) = LE u16 + BLOCK_ID (2) = LE u16 wraps + ESI (1) = symbol index in the block, 0..N-1 (0..K-1 source, + systematic; K..N-1 repair) + N (1) = total symbols in the block (K + repair) + PAYLOAD (var) = `symbol_size` bytes + +Source symbols are concatenation-packed IP packets — the SAME length-prefix +scheme as the other two schemes (`stream_fec.PACKET_LEN_PREFIX`). + +Decode is erasure-only: the systematic source symbols pass through untouched, +and once any K distinct ESIs of a block have arrived the K×K submatrix of the +encoding matrix is inverted over GF(2^8) to recover every source symbol. +Pure-Python GF math — fine for the research toolchain's modest rates. +""" + +from __future__ import annotations + +import struct +import time +from dataclasses import dataclass, field +from typing import Optional + +RS_MAGIC = 0xF540 +RS_HEADER_LEN = 11 +RS_HEADER_STRUCT = " None: + x = 1 + for i in range(255): + _GF_EXP[i] = x + _GF_LOG[x] = i + x <<= 1 + if x & 0x100: + x ^= _GF_POLY + for i in range(255, 512): + _GF_EXP[i] = _GF_EXP[i - 255] + + +_init_gf() + + +def _mul(a: int, b: int) -> int: + if a == 0 or b == 0: + return 0 + return _GF_EXP[_GF_LOG[a] + _GF_LOG[b]] + + +def _inv(a: int) -> int: + if a == 0: + raise ZeroDivisionError("GF inverse of 0") + return _GF_EXP[255 - _GF_LOG[a]] + + +def _pow(a: int, e: int) -> int: + if e == 0: + return 1 + if a == 0: + return 0 + return _GF_EXP[(_GF_LOG[a] * e) % 255] + + +def _mat_mul(A: list[list[int]], B: list[list[int]]) -> list[list[int]]: + n, m, p = len(A), len(B), len(B[0]) + out = [[0] * p for _ in range(n)] + for i in range(n): + Ai = A[i] + Oi = out[i] + for k in range(m): + a = Ai[k] + if a == 0: + continue + la = _GF_LOG[a] + Bk = B[k] + for j in range(p): + b = Bk[j] + if b: + Oi[j] ^= _GF_EXP[la + _GF_LOG[b]] + return out + + +def _mat_inv(M: list[list[int]]) -> list[list[int]]: + """Gauss-Jordan inverse over GF(2^8). Raises if singular.""" + n = len(M) + a = [row[:] + [1 if i == j else 0 for j in range(n)] + for i, row in enumerate(M)] + for col in range(n): + piv = next((r for r in range(col, n) if a[r][col] != 0), None) + if piv is None: + raise ValueError("singular matrix (should not happen for K rows)") + if piv != col: + a[col], a[piv] = a[piv], a[col] + inv_p = _inv(a[col][col]) + a[col] = [_mul(v, inv_p) for v in a[col]] + for r in range(n): + if r != col and a[r][col]: + f = a[r][col] + lf = _GF_LOG[f] + ac = a[col] + ar = a[r] + for j in range(2 * n): + v = ac[j] + if v: + ar[j] ^= _GF_EXP[lf + _GF_LOG[v]] + return [row[n:] for row in a] + + +_MATRIX_CACHE: dict[tuple[int, int], list[list[int]]] = {} + + +def _encoding_matrix(k: int, n: int) -> list[list[int]]: + """N×K systematic MDS matrix: top K rows = I, any K rows invertible. + + Build a Vandermonde V[i][j] = x_i^j (x_i = i, distinct for i<256), then + right-multiply by (V[:K])^-1 so the top block becomes identity — the + Rizzo/zfec construction. Right-multiplying by an invertible matrix + preserves the any-K-rows-invertible property of a Vandermonde. + """ + key = (k, n) + cached = _MATRIX_CACHE.get(key) + if cached is not None: + return cached + if n > 256: + raise ValueError(f"RS needs K+repair = N <= 256 (got N={n})") + V = [[_pow(i, j) for j in range(k)] for i in range(n)] + top_inv = _mat_inv([row[:] for row in V[:k]]) + A = _mat_mul(V, top_inv) + _MATRIX_CACHE[key] = A + return A + + +def _lincomb(coeffs: list[int], symbols: list[bytes], size: int) -> bytes: + """GF(2^8) linear combination of `symbols` with `coeffs`, byte-wise.""" + acc = bytearray(size) + for c, sym in zip(coeffs, symbols): + if c == 0: + continue + lc = _GF_LOG[c] + for idx in range(size): + s = sym[idx] + if s: + acc[idx] ^= _GF_EXP[lc + _GF_LOG[s]] + return bytes(acc) + + +# --------------------------------------------------------------------------- # +# Header helpers +# --------------------------------------------------------------------------- # +def _pack_header(k: int, kreal: int, symbol_size: int, block_id: int, + esi: int, n: int) -> bytes: + return struct.pack(RS_HEADER_STRUCT, RS_MAGIC, 0, k, kreal, symbol_size, + block_id & 0xFFFF, esi, n) + + +def _unpack_header(env: bytes) -> Optional[tuple[int, int, int, int, int, int]]: + if len(env) < RS_HEADER_LEN: + return None + magic, ver, k, kreal, ss, bid, esi, n = struct.unpack_from( + RS_HEADER_STRUCT, env) + if magic != RS_MAGIC or ver != 0: + return None + return k, kreal, ss, bid, esi, n + + +# --------------------------------------------------------------------------- # +# Encoder +# --------------------------------------------------------------------------- # +class RsEncoder: + """Concatenation-packs IP packets, then Reed-Solomon-encodes K source + symbols into N = K + repair_count systematic + parity symbols.""" + + def __init__(self, cfg) -> None: + self.cfg = cfg + self._n = cfg.k + cfg.repair_count + if self._n > 256: + raise ValueError( + f"RS block N = K({cfg.k}) + repair({cfg.repair_count}) = " + f"{self._n} exceeds GF(2^8) limit of 256") + self._pending_symbols: list[bytes] = [] + self._current_symbol = bytearray() + self._block_id = 0 + self.blocks_encoded = 0 + self.packets_in = 0 + self.symbols_out = 0 + self.bytes_in = 0 + + def add_packet(self, pkt: bytes) -> list[bytes]: + n = len(pkt) + if n > self.cfg.max_packet_size: + raise ValueError( + f"packet {n}B exceeds max_packet_size {self.cfg.max_packet_size}B " + f"(symbol_size {self.cfg.symbol_size} - {PACKET_LEN_PREFIX} prefix)" + ) + self.packets_in += 1 + self.bytes_in += n + needed = PACKET_LEN_PREFIX + n + if needed > self.cfg.symbol_size - len(self._current_symbol): + self._seal_current_symbol() + ready = self._maybe_encode_full_block() + if ready: + self._append_to_current(pkt) + return ready + self._append_to_current(pkt) + return [] + + def _append_to_current(self, pkt: bytes) -> None: + self._current_symbol += struct.pack(" None: + if not self._current_symbol: + return + pad = self.cfg.symbol_size - len(self._current_symbol) + if pad: + self._current_symbol += b"\x00" * pad + self._pending_symbols.append(bytes(self._current_symbol)) + self._current_symbol = bytearray() + + def _maybe_encode_full_block(self) -> list[bytes]: + if len(self._pending_symbols) >= self.cfg.k: + return self._encode_block(kreal=self.cfg.k) + return [] + + def flush(self) -> list[bytes]: + self._seal_current_symbol() + if not self._pending_symbols: + return [] + kreal = len(self._pending_symbols) + while len(self._pending_symbols) < self.cfg.k: + self._pending_symbols.append(b"\x00" * self.cfg.symbol_size) + return self._encode_block(kreal=kreal) + + def _encode_block(self, kreal: int) -> list[bytes]: + src = self._pending_symbols[:self.cfg.k] + self._pending_symbols = self._pending_symbols[self.cfg.k:] + k, n, ss = self.cfg.k, self._n, self.cfg.symbol_size + A = _encoding_matrix(k, n) + bid = self._block_id + self._block_id = (self._block_id + 1) & 0xFFFF + + out: list[bytes] = [] + for esi in range(n): + if esi < k: + payload = src[esi] # systematic passthrough + else: + payload = _lincomb(A[esi], src, ss) + out.append(_pack_header(k, kreal, ss, bid, esi, n) + payload) + self.blocks_encoded += 1 + self.symbols_out += len(out) + return out + + @property + def pending_packets(self) -> int: + if self._current_symbol: + return 1 + return len(self._pending_symbols) + + +# --------------------------------------------------------------------------- # +# Decoder +# --------------------------------------------------------------------------- # +@dataclass +class _RsBlock: + k: int + n: int + kreal: int + first_seen: float + symbols: dict[int, bytes] = field(default_factory=dict) # esi -> payload + decoded: bool = False + + +class RsDecoder: + def __init__(self, cfg) -> None: + self.cfg = cfg + self._blocks: dict[int, _RsBlock] = {} + self.blocks_decoded = 0 + self.blocks_unrecoverable = 0 + self.symbols_in = 0 + self.symbols_dropped_bad_cfg = 0 + self.symbols_dropped_stale_block = 0 + self.packets_out = 0 + self.bytes_out = 0 + + def add_symbol(self, envelope: bytes) -> list[bytes]: + header = _unpack_header(envelope) + if header is None: + return [] + k, kreal, ss, block_id, esi, n = header + if k != self.cfg.k or ss != self.cfg.symbol_size: + self.symbols_dropped_bad_cfg += 1 + return [] + if not (1 <= kreal <= k) or not (0 <= esi < n) or n < k: + self.symbols_dropped_bad_cfg += 1 + return [] + payload = envelope[RS_HEADER_LEN:] + if len(payload) != ss: + self.symbols_dropped_bad_cfg += 1 + return [] + self.symbols_in += 1 + + st = self._blocks.get(block_id) + if st is None: + st = _RsBlock(k=k, n=n, kreal=kreal, first_seen=time.monotonic()) + self._blocks[block_id] = st + elif st.decoded: + self.symbols_dropped_stale_block += 1 + return [] + st.symbols.setdefault(esi, bytes(payload)) + + if len(st.symbols) < k: + return [] + return self._solve(st) + + def _solve(self, st: _RsBlock) -> list[bytes]: + k, n, ss = st.k, st.n, self.cfg.symbol_size + esis = sorted(st.symbols)[:k] + # Fast path: all K systematic source symbols present -> no inverse. + if all(e < k for e in esis) and len(esis) == k: + source = [st.symbols[e] for e in range(k)] + else: + A = _encoding_matrix(k, n) + sub = [A[e][:] for e in esis] + inv = _mat_inv(sub) + recv = [st.symbols[e] for e in esis] + source = [_lincomb(inv[j], recv, ss) for j in range(k)] + st.decoded = True + self.blocks_decoded += 1 + ip_pkts = self._unpack(source, st.kreal) + self.packets_out += len(ip_pkts) + self.bytes_out += sum(len(p) for p in ip_pkts) + return ip_pkts + + def _unpack(self, source: list[bytes], kreal: int) -> list[bytes]: + ss = self.cfg.symbol_size + out: list[bytes] = [] + for i in range(kreal): + symbol = source[i] + pos = 0 + while pos + PACKET_LEN_PREFIX <= len(symbol): + ln = int.from_bytes(symbol[pos:pos + PACKET_LEN_PREFIX], "little") + if ln == 0: + break + end = pos + PACKET_LEN_PREFIX + ln + if end > len(symbol): + break + out.append(bytes(symbol[pos + PACKET_LEN_PREFIX:end])) + pos = end + return out + + def expire_blocks_older_than(self, max_age_s: float) -> int: + if not self._blocks: + return 0 + now = time.monotonic() + expired = [bid for bid, st in self._blocks.items() + if (now - st.first_seen) > max_age_s] + unrecoverable = 0 + for bid in expired: + if not self._blocks[bid].decoded: + unrecoverable += 1 + del self._blocks[bid] + self.blocks_unrecoverable += unrecoverable + return unrecoverable + + @property + def in_flight_blocks(self) -> int: + return len(self._blocks) diff --git a/tools/precoder/svc_uep_fec.py b/tools/precoder/svc_uep_fec.py new file mode 100644 index 0000000..e8e9d76 --- /dev/null +++ b/tools/precoder/svc_uep_fec.py @@ -0,0 +1,188 @@ +"""Per-SVC-layer FEC-rate UEP — the application-FEC half of cross-layer UEP. + +`txdemo/svc_tx_demo/svc_tx.h` already maps each HEVC temporal layer to a PHY +MCS (the PHY-rate half of unequal error protection). This adds the *outer-FEC- +rate* half: each temporal layer gets its own Reed-Solomon redundancy, so the +base/IDR layers — which `svc_tx.h` already flies at the most robust MCS — also +carry the heaviest erasure protection, and the enhancement layers carry the +least. That is the joint MCS+FEC UEP of Abdel-Khalek & Heath (JSAC 2012): +protection tracks perceptual importance on BOTH knobs at once, giving a +graceful-degradation staircase instead of one cliff. + +Each layer is an independent FEC stream tagged with an SBI `stream_id`, so +losing all of T2 never stalls T0's decode, and a corrupted frame's surviving +sub-blocks (via `fec_subblock`) still feed the correct layer's decoder. The +receiver routes a body by the stream_id in its SBI header and unpacks with +that layer's *configured* envelope size (never the corruptible header field), +so a mangled stream_id merely drops the body rather than mis-decoding it. + +This is the application-FEC sibling of the C++ `svc::LayerPolicy`; the two are +meant to be configured together (robust MCS + heavy FEC on the same layers). +""" + +from __future__ import annotations + +import os +import sys +from dataclasses import dataclass, field + +_HERE = os.path.dirname(os.path.abspath(__file__)) +if _HERE not in sys.path: + sys.path.insert(0, _HERE) + +import fec_subblock # noqa: E402 +import stream_fec # noqa: E402 +from stream_fec import FecConfig # noqa: E402 + + +# --------------------------------------------------------------------------- # +# HEVC NAL classification — Python mirror of svc_tx.h:parse_hevc_nal +# --------------------------------------------------------------------------- # +@dataclass(frozen=True) +class NalInfo: + tid: int = 0 + critical: bool = False + type: int = 0 + + +def parse_hevc_nal(nal: bytes) -> NalInfo: + """HEVC 2-byte NAL header: type=(b0>>1)&0x3F, tid=(b1&7)-1. Critical = + IRAP slices (16..23) or parameter sets (VPS=32/SPS=33/PPS=34).""" + if len(nal) < 2: + return NalInfo() + ntype = (nal[0] >> 1) & 0x3F + tid = (nal[1] & 0x07) - 1 + if tid < 0: + tid = 0 + critical = (16 <= ntype <= 23) or (32 <= ntype <= 34) + return NalInfo(tid=tid, critical=critical, type=ntype) + + +# --------------------------------------------------------------------------- # +# Per-layer policy +# --------------------------------------------------------------------------- # +@dataclass(frozen=True) +class UepLayer: + fec: FecConfig + blocks_per_body: int = 4 + + +def _rs(k: int, overhead: float, symbol_size: int = 64) -> FecConfig: + return FecConfig(k=k, symbol_size=symbol_size, overhead=overhead, scheme="rs") + + +@dataclass +class UepPolicy: + """stream_id 0 = critical; 1.. = temporal layers T0, T1, … in order. + `by_tid[i]` is the layer for temporal id i (clamped to the last entry).""" + critical: UepLayer + by_tid: list[UepLayer] + + def stream_for(self, info: NalInfo) -> int: + if info.critical or not self.by_tid: + return 0 + return 1 + min(info.tid, len(self.by_tid) - 1) + + def layer(self, stream_id: int) -> UepLayer: + if stream_id == 0: + return self.critical + return self.by_tid[min(stream_id - 1, len(self.by_tid) - 1)] + + def stream_ids(self) -> list[int]: + return [0] + [1 + i for i in range(len(self.by_tid))] + + +def default_uep_policy() -> UepPolicy: + """FEC-rate ladder complementing svc_tx.h's default MCS ladder. Heaviest + erasure protection on the layers that also fly at the most robust MCS. + critical (IDR / VPS/SPS/PPS) : RS overhead 1.00 (N=16, tolerates 8/16 loss) + T0 base : RS overhead 0.75 (N=14) + T1 : RS overhead 0.50 (N=12) + T2 : RS overhead 0.25 (N=10) — lightest + """ + return UepPolicy( + critical=UepLayer(_rs(8, 1.00)), + by_tid=[ + UepLayer(_rs(8, 0.75)), + UepLayer(_rs(8, 0.50)), + UepLayer(_rs(8, 0.25)), + ], + ) + + +def _env_size(cfg: FecConfig) -> int: + """Envelope (outer-code symbol) size for a config — derived by a throwaway + encode so we never hard-code a scheme's header length.""" + enc = stream_fec.make_encoder(cfg) + envs: list[bytes] = [] + for i in range(cfg.k): + envs += enc.add_packet(b"\x00" * min(8, cfg.max_packet_size)) + envs += enc.flush() + return len(envs[0]) + + +# --------------------------------------------------------------------------- # +# Encoder / decoder +# --------------------------------------------------------------------------- # +class SvcUepEncoder: + """Routes each NAL to its temporal layer's FEC stream and SBI-packs it. + `add_nal` returns a list of (stream_id, body) ready for the radio.""" + + def __init__(self, policy: UepPolicy) -> None: + self.policy = policy + self._enc = {sid: stream_fec.make_encoder(policy.layer(sid).fec) + for sid in policy.stream_ids()} + self._env = {sid: _env_size(policy.layer(sid).fec) + for sid in policy.stream_ids()} + self._packer = { + sid: fec_subblock.SubBlockPacker( + self._env[sid], policy.layer(sid).blocks_per_body, stream_id=sid) + for sid in policy.stream_ids()} + + def add_nal(self, nal: bytes) -> list[tuple[int, bytes]]: + sid = self.policy.stream_for(parse_hevc_nal(nal)) + out: list[tuple[int, bytes]] = [] + for env in self._enc[sid].add_packet(nal): + for body in self._packer[sid].add(env): + out.append((sid, body)) + return out + + def flush(self) -> list[tuple[int, bytes]]: + out: list[tuple[int, bytes]] = [] + for sid in self.policy.stream_ids(): + for env in self._enc[sid].flush(): + for body in self._packer[sid].add(env): + out.append((sid, body)) + for body in self._packer[sid].flush(): + out.append((sid, body)) + return out + + +class SvcUepDecoder: + """Routes a received body by its SBI stream_id to that layer's FEC decoder, + unpacking with the layer's *configured* envelope size.""" + + def __init__(self, policy: UepPolicy) -> None: + self.policy = policy + self._dec = {sid: stream_fec.make_decoder(policy.layer(sid).fec) + for sid in policy.stream_ids()} + self._env = {sid: _env_size(policy.layer(sid).fec) + for sid in policy.stream_ids()} + self.bodies_routed = 0 + self.bodies_misrouted = 0 + + def add_body(self, body: bytes) -> list[tuple[int, bytes]]: + sid = fec_subblock.peek_stream_id(body) + if sid is None or sid not in self._dec: + self.bodies_misrouted += 1 + return [] + self.bodies_routed += 1 + res = fec_subblock.unpack(body, self._env[sid]) + out: list[tuple[int, bytes]] = [] + for env in res.survivors: + for pkt in self._dec[sid].add_symbol(env): + out.append((sid, pkt)) + return out + + def blocks_decoded(self, stream_id: int) -> int: + return self._dec[stream_id].blocks_decoded diff --git a/tools/precoder/test_fec_fusion_sim.py b/tools/precoder/test_fec_fusion_sim.py new file mode 100644 index 0000000..2f22d1b --- /dev/null +++ b/tools/precoder/test_fec_fusion_sim.py @@ -0,0 +1,73 @@ +"""Tests for the fused-FEC simulation harness (`fec_fusion_sim.py`). + +Locks in the qualitative result the sim produced: in the realistic +post-PHY-FEC residual-BER regime (small symbols, low BER) sub-block salvage +turns a marginal drop-whole-frame link into a reliable one, and SBI never +does worse than the baseline. +""" + +from __future__ import annotations + +import random + +import fec_fusion_sim +from fec_fusion_sim import Channel, run +from stream_fec import FecConfig + + +def _factory(model, ber, frame_loss): + return lambda rng: Channel(model=model, ber=ber, frame_loss=frame_loss, + rng=rng) + + +def test_slope_model_tail_heavier_than_head(): + rng = random.Random(0) + ch = Channel("slope", ber=5e-3, frame_loss=0.0, rng=rng) + head_flips = tail_flips = 0 + body = bytes(200) + for _ in range(200): + out, lost, _ = ch.corrupt(body) + assert not lost + head_flips += sum(bin(out[i]).count("1") for i in range(20)) + tail_flips += sum(bin(out[i]).count("1") for i in range(180, 200)) + # Slope-line: errors concentrate at the frame tail. + assert tail_flips > head_flips * 3 + + +def test_sbi_beats_baseline_in_residual_ber_regime(): + cfg = FecConfig(k=8, symbol_size=32, overhead=0.5, scheme="rs") + s = run(cfg, blocks_per_body=8, n_packets=16, pkt_len=8, + chan_factory=_factory("slope", 3e-4, 0.0), + crc_bytes=2, trials=120, seed=3) + # Baseline is genuinely marginal here; SBI is near-perfect. + assert s.baseline_success < 0.8 + assert s.sbi_success > 0.95 + assert s.sbi_success >= s.baseline_success + + +def test_sbi_never_worse_than_baseline_clean_channel(): + cfg = FecConfig(k=6, symbol_size=64, overhead=0.5, scheme="rs") + s = run(cfg, blocks_per_body=4, n_packets=12, pkt_len=40, + chan_factory=_factory("uniform", 0.0, 0.0), + crc_bytes=2, trials=20, seed=1) + assert s.baseline_success == 1.0 and s.sbi_success == 1.0 + + +def test_frame_loss_hits_both_equally(): + # Pure whole-frame loss (no bit errors): SBI has no advantage, and at a + # loss rate within the erasure budget both still recover. + cfg = FecConfig(k=8, symbol_size=64, overhead=1.0, scheme="rs") # N=16 + s = run(cfg, blocks_per_body=2, n_packets=10, pkt_len=40, + chan_factory=_factory("uniform", 0.0, 0.15), + crc_bytes=2, trials=60, seed=5) + assert s.sbi_success >= s.baseline_success + + +def test_summary_overhead_accounting(): + cfg = FecConfig(k=8, symbol_size=64, overhead=0.5, scheme="rs") + s = run(cfg, blocks_per_body=4, n_packets=8, pkt_len=40, + chan_factory=_factory("uniform", 0.0, 0.0), + crc_bytes=2, trials=5, seed=0) + # env = 11 (RS header) + 64 = 75; overhead = hdr(6)+4*2=14 over 14+4*75=314. + assert s.env_size == 75 + assert 4.0 < s.overhead_pct < 5.0 diff --git a/tools/precoder/test_fec_subblock.py b/tools/precoder/test_fec_subblock.py new file mode 100644 index 0000000..c2272f0 --- /dev/null +++ b/tools/precoder/test_fec_subblock.py @@ -0,0 +1,190 @@ +"""Tests for the Sub-Block Integrity (SBI) fused-FEC layer (`fec_subblock.py`). + +Covers: + * Clean round-trip — pack N envelopes, unpack, get them back byte-exact. + * Localized corruption — flipping bytes in sub-block j drops *only* j; the + other sub-blocks in the same body survive (the whole point). + * Header corruption can't desync — a mangled SBI header still partitions + correctly off the receiver's configured block_payload. + * CRC32 option. + * The headline fusion result — a low-but-ubiquitous BER that fails the + chip FCS on *every* frame destroys the drop-whole-frame baseline (0 + symbols delivered) but SBI delivers the uncorrupted sub-blocks, so the + outer RaptorQ code reconstructs the message. +""" + +from __future__ import annotations + +import random + +import pytest + +import fec_subblock +from fec_subblock import (SBI_HDR_LEN, SubBlockPacker, overhead_bytes, pack, + unpack) + + +def _envs(n: int, size: int, seed: int = 0) -> list[bytes]: + rng = random.Random(seed) + return [bytes(rng.randrange(256) for _ in range(size)) for _ in range(n)] + + +def test_inline_crc_matches_stream(): + """fec_subblock inlines crc16 (to stay numpy-free for the SDR bridge); it + must stay byte-identical to stream.crc16_ccitt or the two ends disagree.""" + import stream + rng = random.Random(0) + for _ in range(500): + d = bytes(rng.randrange(256) for _ in range(rng.randrange(1, 80))) + assert fec_subblock.crc16_ccitt(d) == stream.crc16_ccitt(d) + + +# --------------------------------------------------------------------------- # +# Clean round-trip +# --------------------------------------------------------------------------- # +@pytest.mark.parametrize("crc_bytes", [2, 4]) +def test_clean_round_trip(crc_bytes): + envs = _envs(10, 64) + bodies = pack(envs, block_payload=64, blocks_per_body=4, crc_bytes=crc_bytes) + # 10 envelopes, 4/body -> 3 bodies (4,4,2). + assert len(bodies) == 3 + got: list[bytes] = [] + for b in bodies: + res = unpack(b, block_payload=64, crc_bytes=crc_bytes) + assert res.header_ok + assert res.n_failed == 0 + got += res.survivors + assert got == envs + + +def test_packer_rejects_wrong_size(): + p = SubBlockPacker(block_payload=64, blocks_per_body=4) + with pytest.raises(ValueError): + p.add(b"\x00" * 63) + + +def test_partial_final_body(): + envs = _envs(5, 32) + bodies = pack(envs, block_payload=32, blocks_per_body=4) + assert len(bodies) == 2 + # Last body holds the single leftover block. + last = unpack(bodies[1], block_payload=32) + assert last.n_blocks == 1 and last.n_failed == 0 + assert last.survivors == [envs[4]] + + +# --------------------------------------------------------------------------- # +# Localized corruption — only the hit sub-block is lost +# --------------------------------------------------------------------------- # +def test_corrupting_one_subblock_drops_only_it(): + envs = _envs(4, 64, seed=1) + body = bytearray(pack(envs, block_payload=64, blocks_per_body=4)[0]) + stride = 2 + 64 # crc16 + payload + hit = 2 # corrupt sub-block index 2 + # Flip a byte inside block `hit`'s payload (past its 2-byte CRC). + off = SBI_HDR_LEN + hit * stride + 2 + 30 + body[off] ^= 0xFF + res = unpack(bytes(body), block_payload=64) + assert res.n_blocks == 4 + assert res.n_failed == 1 + # Survivors are blocks 0,1,3 in order — block 2 dropped, others byte-exact. + assert res.survivors == [envs[0], envs[1], envs[3]] + + +def test_header_corruption_does_not_desync(): + envs = _envs(4, 48, seed=2) + body = bytearray(pack(envs, block_payload=48, blocks_per_body=4)[0]) + body[0] ^= 0xFF # smash the magic + body[1] ^= 0xAA + res = unpack(bytes(body), block_payload=48) + assert not res.header_ok # header flagged bad … + assert res.n_failed == 0 # … but every sub-block still recovered + assert res.survivors == envs + + +def test_stream_id_round_trips(): + envs = _envs(6, 40, seed=5) + bodies = pack(envs, block_payload=40, blocks_per_body=3, stream_id=7) + got = [] + for b in bodies: + res = unpack(b, block_payload=40) + assert res.header_ok and res.stream_id == 7 and res.n_failed == 0 + got += res.survivors + assert got == envs + + +def test_overhead_accounting(): + # 4 blocks, CRC16 -> header(6) + 4*2 = 14 bytes of framing per body. + assert overhead_bytes(4, crc_bytes=2) == SBI_HDR_LEN + 8 + assert overhead_bytes(4, crc_bytes=4) == SBI_HDR_LEN + 16 + + +# --------------------------------------------------------------------------- # +# Headline fusion result — ubiquitous localized BER +# --------------------------------------------------------------------------- # +def _corrupt_one_block_per_body(body: bytes, block_payload: int, rng, + crc_bytes: int = 2) -> bytes: + """Flip a couple of bytes inside ONE random sub-block of `body` — models a + low BER whose few bit-flips happen to land in a single OFDM-symbol region, + failing the frame FCS but leaving the rest of the body intact.""" + stride = crc_bytes + block_payload + n = (len(body) - SBI_HDR_LEN) // stride + j = rng.randrange(n) + b = bytearray(body) + base = SBI_HDR_LEN + j * stride + crc_bytes + for _ in range(2): + b[base + rng.randrange(block_payload)] ^= (1 << rng.randrange(8)) + return bytes(b) + + +def test_sbi_beats_drop_whole_frame_under_ubiquitous_ber(): + raptorq = pytest.importorskip("raptorq") # outer code dep + import stream_fec + from stream_fec import FecConfig + + # Small symbols so several ride in one radio body. overhead=1.0 -> the + # RaptorQ block tolerates ~50% symbol loss. + cfg = FecConfig(k=8, symbol_size=64, overhead=1.0, scheme="raptorq") + payloads = [bytes(f"packet-{i:03d} ".encode()) * 3 for i in range(8)] + + enc = stream_fec.make_encoder(cfg) + envelopes: list[bytes] = [] + for p in payloads: + envelopes += enc.add_packet(p) + envelopes += enc.flush() + assert envelopes, "encoder produced no symbols" + + env_size = len(envelopes[0]) + assert all(len(e) == env_size for e in envelopes), "non-uniform envelope size" + + blocks_per_body = 4 + bodies = pack(envelopes, block_payload=env_size, + blocks_per_body=blocks_per_body) + + rng = random.Random(1234) + # Channel: EVERY radio frame takes a localized hit (fails its FCS). + corrupted = [_corrupt_one_block_per_body(b, env_size, rng) for b in bodies] + + # --- Baseline: chip drops any FCS-failed frame -> no symbols at all. --- + base_dec = stream_fec.make_decoder(cfg) + base_out: list[bytes] = [] + for _b in corrupted: + pass # every frame corrupted -> every frame dropped -> nothing decoded + assert base_out == [], "baseline somehow recovered without SBI" + + # --- SBI: keep the corrupt frames, salvage the surviving sub-blocks. --- + sbi_dec = stream_fec.make_decoder(cfg) + sbi_out: list[bytes] = [] + total_blocks = total_failed = 0 + for b in corrupted: + res = unpack(b, block_payload=env_size) + total_blocks += res.n_blocks + total_failed += res.n_failed + for env in res.survivors: + sbi_out += sbi_dec.add_symbol(env) + + # ~1 of 4 sub-blocks lost per body — the rest survive and reconstruct. + assert 0 < total_failed < total_blocks + assert set(sbi_out) == set(payloads), ( + f"SBI failed to recover all packets: got {len(set(sbi_out))}/" + f"{len(payloads)} (lost {total_failed}/{total_blocks} sub-blocks)") diff --git a/tools/precoder/test_fused_fec_link.py b/tools/precoder/test_fused_fec_link.py new file mode 100644 index 0000000..58700e3 --- /dev/null +++ b/tools/precoder/test_fused_fec_link.py @@ -0,0 +1,71 @@ +"""Tests for the chip-path fused-FEC link core (`fused_fec_link.py`). + +Software loopback: sender → (optional localized corruption) → receiver. Proves +the clean round-trip and that SBI salvage beats the drop-whole-frame baseline +under the localized corruption a real noisy chip link produces. +""" + +from __future__ import annotations + +import random + +from fused_fec_link import FusedFecReceiver, FusedFecSender, env_size +from stream_fec import FecConfig + + +def test_clean_round_trip_reconstructs_stream(): + cfg = FecConfig(k=8, symbol_size=64, overhead=0.5, scheme="rs") # N=12 + snd = FusedFecSender(cfg, blocks_per_body=4) # 12 env / 4 = 3 bodies/block + msg = bytes((i * 7) & 0xFF for i in range(8 * 62 * 3)) # whole packets + bodies = snd.add_bytes(msg) + snd.flush() + + rcv = FusedFecReceiver(cfg, blocks_per_body=4) + out = b"".join(pkt for b in bodies for pkt in rcv.add_frame(b, crc_err=False)) + assert out == msg + rep = rcv.report() + assert rep.frames_corrupt == 0 and rep.sbi_blocks == rep.base_blocks > 0 + + +def _corrupt_one_subblock(body: bytes, env: int, rng, crc_bytes: int = 2) -> bytes: + import fec_subblock + stride = crc_bytes + env + n = (len(body) - fec_subblock.SBI_HDR_LEN) // stride + j = rng.randrange(n) + b = bytearray(body) + base = fec_subblock.SBI_HDR_LEN + j * stride + crc_bytes + b[base + rng.randrange(env)] ^= 0xFF + return bytes(b) + + +def test_sbi_beats_baseline_one_block_per_frame(): + # One whole RS block per radio frame (N=10, R=2). A single FCS-failed frame + # is a total block loss for the baseline but recoverable by SBI (only the + # one corrupt sub-block is lost, 9 >= K=8 survive). + cfg = FecConfig(k=8, symbol_size=32, overhead=0.25, scheme="rs") # N=10 + snd = FusedFecSender(cfg, blocks_per_body=10) + env = env_size(cfg) + rng = random.Random(3) + + NB = 40 + frames: list[tuple[bytes, bool]] = [] + for blk in range(NB): + msg = bytes(((blk * 17 + i) & 0xFF) for i in range(8 * 30)) + bodies = snd.add_bytes(msg) + snd.flush() + # Each block = exactly one body (10 env / 10 per body). Corrupt ~25%. + for body in bodies: + if rng.random() < 0.25: + frames.append((_corrupt_one_subblock(body, env, rng), True)) + else: + frames.append((body, False)) + + rcv = FusedFecReceiver(cfg, blocks_per_body=10) + for body, crc_err in frames: + rcv.add_frame(body, crc_err) + rep = rcv.report() + + assert rep.frames_corrupt > 0 + # SBI recovers the corrupt frames' blocks; baseline drops them. + assert rep.sbi_blocks > rep.base_blocks + assert rep.sbi_blocks == NB # SBI gets every block + assert rep.base_blocks == NB - rep.frames_corrupt + assert rep.subblocks_salvaged > 0 diff --git a/tools/precoder/test_stream_fec_rs.py b/tools/precoder/test_stream_fec_rs.py new file mode 100644 index 0000000..2d54480 --- /dev/null +++ b/tools/precoder/test_stream_fec_rs.py @@ -0,0 +1,147 @@ +"""Tests for the Reed-Solomon outer scheme (`stream_fec_rs.py`). + +Covers MDS recovery (any K of N reconstruct), the erasure limit (exactly R +recoverable, R+1 not), systematic passthrough, partial-block flush, block-id +wrap, and the RS+SBI fusion (RS as the outer code under sub-block salvage). +""" + +from __future__ import annotations + +import random + +import pytest + +import fec_subblock +import stream_fec +import stream_fec_rs +from stream_fec import FecConfig, make_decoder, make_encoder + + +def _encode(packets: list[bytes], cfg: FecConfig) -> list[bytes]: + enc = make_encoder(cfg) + envs: list[bytes] = [] + for p in packets: + envs += enc.add_packet(p) + envs += enc.flush() + return envs + + +def _decode(envs: list[bytes], cfg: FecConfig) -> list[bytes]: + dec = make_decoder(cfg) + out: list[bytes] = [] + for e in envs: + out += dec.add_symbol(e) + return out + + +# --------------------------------------------------------------------------- # +# GF(2^8) sanity +# --------------------------------------------------------------------------- # +def test_gf_field_axioms(): + for a in range(1, 256): + assert stream_fec_rs._mul(a, stream_fec_rs._inv(a)) == 1 + # distributivity spot-check + assert stream_fec_rs._mul(0x53, 0xCA) == stream_fec_rs._mul(0xCA, 0x53) + + +def test_encoding_matrix_is_systematic(): + A = stream_fec_rs._encoding_matrix(8, 12) + for i in range(8): + assert A[i] == [1 if j == i else 0 for j in range(8)] + + +# --------------------------------------------------------------------------- # +# MDS recovery +# --------------------------------------------------------------------------- # +def test_round_trip_no_loss(): + cfg = FecConfig(k=8, symbol_size=64, overhead=0.5, scheme="rs") # N=12 + pkts = [f"msg-{i:02d}-".encode() * 4 for i in range(8)] + assert _decode(_encode(pkts, cfg), cfg) == pkts + + +def test_any_k_of_n_reconstruct(): + """MDS: every K-subset of the N symbols reconstructs the block.""" + import itertools + cfg = FecConfig(k=4, symbol_size=48, overhead=0.5, scheme="rs") # N=6 + pkts = [f"p{i}".encode() * 10 for i in range(4)] + envs = _encode(pkts, cfg) + assert len(envs) == 6 + for keep in itertools.combinations(range(6), 4): + subset = [envs[i] for i in keep] + assert _decode(subset, cfg) == pkts, f"subset {keep} failed" + + +def test_erasure_limit_R_ok_Rplus1_fails(): + cfg = FecConfig(k=6, symbol_size=64, overhead=0.5, scheme="rs") # N=9, R=3 + pkts = [f"data-{i}".encode() * 6 for i in range(6)] + envs = _encode(pkts, cfg) + assert len(envs) == 9 + rng = random.Random(7) + + # Drop exactly R=3 -> recovers. + e = envs[:] + rng.shuffle(e) + assert _decode(e[3:], cfg) == pkts + + # Drop R+1=4 -> only 5 < K=6 symbols -> cannot recover. + assert _decode(e[4:], cfg) == [] + + +def test_partial_flush_kreal(): + cfg = FecConfig(k=8, symbol_size=64, overhead=0.5, scheme="rs") + pkts = [f"only-{i}".encode() * 3 for i in range(3)] # < 1 symbol each + # 3 tiny packets pack into 1 source symbol -> kreal=1. + assert _decode(_encode(pkts, cfg), cfg) == pkts + + +def test_block_id_wrap(): + cfg = FecConfig(k=2, symbol_size=32, overhead=0.5, scheme="rs") + enc = make_encoder(cfg) + enc._block_id = 0xFFFF + dec = make_decoder(cfg) + out: list[bytes] = [] + for i in range(6): # several blocks across the wrap + for e in enc.add_packet(f"x{i}".encode() * 8): + out += dec.add_symbol(e) + for e in enc.flush(): + out += dec.add_symbol(e) + assert out == [f"x{i}".encode() * 8 for i in range(6)] + + +def test_rejects_oversized_n(): + with pytest.raises(ValueError): + make_encoder(FecConfig(k=200, symbol_size=32, overhead=1.0, scheme="rs")) + + +# --------------------------------------------------------------------------- # +# RS + SBI fusion +# --------------------------------------------------------------------------- # +def test_rs_with_sbi_under_ubiquitous_ber(): + cfg = FecConfig(k=8, symbol_size=64, overhead=0.5, scheme="rs") # N=12, R=4 + pkts = [f"frame-{i:02d} payload".encode() for i in range(8)] + envs = _encode(pkts, cfg) + env_size = len(envs[0]) + assert all(len(e) == env_size for e in envs) + + bodies = fec_subblock.pack(envs, block_payload=env_size, blocks_per_body=3) + rng = random.Random(99) + dec = make_decoder(cfg) + out: list[bytes] = [] + failed = blocks = 0 + for body in bodies: + # Localized hit: corrupt one sub-block per frame (fails its FCS). + b = bytearray(body) + stride = 2 + env_size + n = (len(b) - fec_subblock.SBI_HDR_LEN) // stride + j = rng.randrange(n) + base = fec_subblock.SBI_HDR_LEN + j * stride + 2 + b[base + rng.randrange(env_size)] ^= 0xFF + res = fec_subblock.unpack(bytes(b), block_payload=env_size) + blocks += res.n_blocks + failed += res.n_failed + for env in res.survivors: + out += dec.add_symbol(env) + + # ~1/3 sub-blocks lost (4 of 12 here) — within the R=4 erasure budget. + assert 0 < failed <= 4 + assert out == pkts diff --git a/tools/precoder/test_svc_uep_fec.py b/tools/precoder/test_svc_uep_fec.py new file mode 100644 index 0000000..b3687ce --- /dev/null +++ b/tools/precoder/test_svc_uep_fec.py @@ -0,0 +1,109 @@ +"""Tests for per-SVC-layer FEC-rate UEP (`svc_uep_fec.py`). + +The headline is the graceful-degradation staircase: under the SAME channel +loss, the heavily-protected base/critical layers fully recover while the +lightly-protected enhancement layers degrade — protection tracking importance. +""" + +from __future__ import annotations + +import random + +import pytest + +import svc_uep_fec +from svc_uep_fec import (NalInfo, SvcUepDecoder, SvcUepEncoder, + default_uep_policy, parse_hevc_nal) + +IDR = 19 # IRAP -> critical +TRAIL = 1 # trailing picture +SPS = 33 # parameter set -> critical + + +def _nal(ntype: int, tid: int, payload: int = 60) -> bytes: + # 62-byte NAL = 2 header + 60 payload -> exactly one 64-byte RS symbol. + return bytes([(ntype << 1) & 0xFF, (tid + 1) & 0x07]) + bytes(payload) + + +# --------------------------------------------------------------------------- # +# NAL classification (mirror of svc_tx.h) +# --------------------------------------------------------------------------- # +def test_parse_hevc_nal(): + assert parse_hevc_nal(_nal(IDR, 0)) == NalInfo(tid=0, critical=True, type=IDR) + assert parse_hevc_nal(_nal(SPS, 0)).critical + n = parse_hevc_nal(_nal(TRAIL, 2)) + assert n.tid == 2 and not n.critical and n.type == TRAIL + assert parse_hevc_nal(b"\x00") == NalInfo() # malformed -> base + + +def test_stream_routing(): + p = default_uep_policy() + assert p.stream_for(parse_hevc_nal(_nal(IDR, 2))) == 0 # critical regardless of tid + assert p.stream_for(parse_hevc_nal(_nal(TRAIL, 0))) == 1 + assert p.stream_for(parse_hevc_nal(_nal(TRAIL, 1))) == 2 + assert p.stream_for(parse_hevc_nal(_nal(TRAIL, 6))) == 3 # tid>layers clamps to last + + +# --------------------------------------------------------------------------- # +# Clean round-trip +# --------------------------------------------------------------------------- # +def test_clean_round_trip_routes_per_layer(): + enc = SvcUepEncoder(default_uep_policy()) + nals = ([_nal(IDR, 0)] + [_nal(TRAIL, 0)] * 8 + [_nal(TRAIL, 1)] * 8 + + [_nal(TRAIL, 2)] * 8) * 2 + bodies = [] + for n in nals: + bodies += enc.add_nal(n) + bodies += enc.flush() + + dec = SvcUepDecoder(default_uep_policy()) + recovered = {0: set(), 1: set(), 2: set(), 3: set()} + for sid, body in bodies: + for rsid, pkt in dec.add_body(body): + recovered[rsid].add(pkt) + # Every layer's NALs come back on their own stream, no loss. + assert recovered[1] == {_nal(TRAIL, 0)} + assert recovered[2] == {_nal(TRAIL, 1)} + assert recovered[3] == {_nal(TRAIL, 2)} + assert _nal(IDR, 0) in recovered[0] + assert dec.bodies_misrouted == 0 + + +# --------------------------------------------------------------------------- # +# The headline: graceful-degradation staircase under loss +# --------------------------------------------------------------------------- # +def test_uep_staircase_under_body_loss(): + policy = default_uep_policy() + enc = SvcUepEncoder(policy) + NB = 20 # RS blocks per layer + bodies = [] + for _ in range(NB): + for _ in range(8): + bodies += enc.add_nal(_nal(IDR, 0)) # critical (sid 0) + for _ in range(8): + bodies += enc.add_nal(_nal(TRAIL, 0)) # T0 (sid 1) + for _ in range(8): + bodies += enc.add_nal(_nal(TRAIL, 1)) # T1 (sid 2) + for _ in range(8): + bodies += enc.add_nal(_nal(TRAIL, 2)) # T2 (sid 3) + bodies += enc.flush() + + rng = random.Random(7) + dec = SvcUepDecoder(policy) + p = 0.30 # whole-body loss probability, same for every layer + for sid, body in bodies: + if rng.random() < p: + continue + dec.add_body(body) + + crit = dec.blocks_decoded(0) + t0 = dec.blocks_decoded(1) + t1 = dec.blocks_decoded(2) + t2 = dec.blocks_decoded(3) + + # Monotone staircase: heavier FEC -> more blocks survive the same loss. + assert crit >= t0 >= t1 >= t2 + # Heavy layers ride through; the lightest visibly degrades. + assert crit >= int(0.8 * NB) + assert t2 <= int(0.6 * NB) + assert crit > t2 # the UEP point — base protected, enhancement sacrificed diff --git a/txdemo/stream_tx_demo/main.cpp b/txdemo/stream_tx_demo/main.cpp index ca1e5b0..61e8e24 100644 --- a/txdemo/stream_tx_demo/main.cpp +++ b/txdemo/stream_tx_demo/main.cpp @@ -199,6 +199,20 @@ int main(int argc, char **argv) { rtlDevice->InitWrite(SelectedChannel{.Channel = static_cast(channel), .ChannelOffset = 0, .ChannelWidth = CHANNEL_WIDTH_20}); + + /* DEVOURER_TX_PWR_OVERRIDE: force an absolute per-rate TXAGC index (0..63), + * bypassing the EFUSE/SetTxPower table — the finest-grained, lowest TX-power + * knob for pushing the link into the marginal-SNR regime where the RX's + * corrupted-frame salvage path gets exercised (pairs with the B210 interferer + * in tests/fused_fec_onair.sh). Applied once and held, unlike + * WiFiDriverTxDemo's DEVOURER_TX_PWR_START ramp. Must follow InitWrite so the + * channel-set has run; ApplyTxPower re-pushes the index to the registers. */ + if (const char *o = std::getenv("DEVOURER_TX_PWR_OVERRIDE")) { + int idx = std::atoi(o); + rtlDevice->SetTxPowerOverride(idx); + rtlDevice->ApplyTxPower(); + logger->info("DEVOURER_TX_PWR_OVERRIDE — forced absolute TXAGC index {}", idx); + } sleep(2); auto dot11 = build_dot11_probe_req();