Analyzer plug

This commit is contained in:
2026-04-07 13:25:55 +03:00
parent e7d7c0e1c1
commit 7651f07e0a
37 changed files with 3040 additions and 0 deletions

View File

@ -0,0 +1,100 @@
#!/usr/bin/env python3
"""Сравнение трасс decode по carraw3 vs pointraw_3 для кадра pkt[33] (car: CRC fail)."""
from __future__ import annotations
import sys
from pathlib import Path
_TOOLS = Path(__file__).resolve().parent
if str(_TOOLS) not in sys.path:
sys.path.insert(0, str(_TOOLS))
from ir_decoder_raw_sim import (
BIT_TIME_US,
RISE_MAX,
RISE_MIN,
SimState,
first_rx,
parse_raw_csv,
segments_to_edges,
tick,
)
def run_traced(path: Path) -> tuple[SimState, list[dict]]:
rows = parse_raw_csv(path)
assert rows is not None
edges = segments_to_edges(rows)
st = SimState()
tr: list[dict] = []
for t_us, rising in edges:
tick(st, t_us, rising, BIT_TIME_US, trace_rise=tr)
return st, tr
def main() -> None:
root = Path(__file__).resolve().parents[1] / "Analyzer" / "raw"
car_p = root / "carraw3.txt"
pt_p = root / "pointraw_3.txt"
st_c, tr_c = run_traced(car_p)
st_p, tr_p = run_traced(pt_p)
bad_i = next(i for i, x in enumerate(st_c.packets) if not x[0])
print(f"carraw3: bad packet index {bad_i}, bytes {st_c.packets[bad_i][2].hex()}")
print(f"carraw3 packets={len(st_c.packets)} ok={sum(1 for x in st_c.packets if x[0])}")
print(f"pointraw_3 packets={len(st_p.packets)} ok={sum(1 for x in st_p.packets if x[0])}")
print(f"rise window us: min={RISE_MIN} max={RISE_MAX} bit={BIT_TIME_US}")
print()
# Пока кадр №bad_i собирается, в списке уже bad_i завершённых пакетов → len(packets)==bad_i.
build_i = bad_i
def frame_trace(tr: list[dict]) -> list[dict]:
return [r for r in tr if r["n_pkt"] == build_i]
fc = frame_trace(tr_c)
fp = frame_trace(tr_p)
print(
f"decode rises while building packet[{bad_i}] (trace n_pkt=={build_i}): "
f"car {len(fc)} vs point {len(fp)} lines"
)
print("--- car (first 40 decode steps of this frame) ---")
for i, r in enumerate(fc[:40]):
br = r["branch"]
ex = ""
if br == "ceil":
ex = f" hc={r['hc']} lc={r['lc']} ac={r['ac']}"
print(
f" {i:2} t={r['t_us'] / 1e6:.6f}s rp={r['rp']} ht={r['ht']} lt={r['lt']} "
f"{br} bits_out={r['bits_out']} i_buf={r['i_buf']}{ex}"
)
print("--- point (first 40) ---")
for i, r in enumerate(fp[:40]):
br = r["branch"]
ex = ""
if br == "ceil":
ex = f" hc={r['hc']} lc={r['lc']} ac={r['ac']}"
print(
f" {i:2} t={r['t_us'] / 1e6:.6f}s rp={r['rp']} ht={r['ht']} lt={r['lt']} "
f"{br} bits_out={r['bits_out']} i_buf={r['i_buf']}{ex}"
)
print()
# Первое расхождение по (rp, ht, lt, branch, bits_out)
for i, (a, b) in enumerate(zip(fc, fp)):
ka = (a["rp"], a["ht"], a["lt"], a["branch"], a["bits_out"])
kb = (b["rp"], b["ht"], b["lt"], b["branch"], b["bits_out"])
if ka != kb:
print(f"First decode diff at step {i} within frame:")
print(f" car {a}")
print(f" point {b}")
break
else:
if len(fc) != len(fp):
print(f"Same tuples for min(len)={min(len(fc), len(fp))}; length car={len(fc)} point={len(fp)}")
else:
print("Identical decode trace for full frame (unexpected if CRC differs)")
if __name__ == "__main__":
main()

507
tools/ir_decoder_raw_sim.py Normal file
View File

@ -0,0 +1,507 @@
#!/usr/bin/env python3
"""
Симуляция IR_DecoderRaw::tick() по CSV уровней (как Analyzer/raw/*_raw.txt).
Сверка с IR_DecoderRaw.cpp / IR_config.h — без Arduino, только логика декодера.
"""
from __future__ import annotations
import sys
from dataclasses import dataclass, field
from pathlib import Path
# IR_config.h
BIT_ACTIVE_TAKTS = 25
BIT_PAUSE_TAKTS = 12
CARRIER_HZ = 38000
CARRIER_PERIOD_US = 1_000_000 / CARRIER_HZ
BIT_TIME_US = int((BIT_ACTIVE_TAKTS + BIT_PAUSE_TAKTS) * CARRIER_PERIOD_US)
TOLERANCE_US = 300
SYNC_BITS = 3
BIT_PER_BYTE = 8
MSG_BYTES = 1
CRC_BYTES = 2
POLY1 = 0x31
POLY2 = 0x8C
DATA_BYTE_SIZE_MAX = 1 + 2 + 2 + 31 + 2
IR_MASK_MSG_INFO = 0x1F
PREAMB_FRONTS = 6 # preambPulse*2
RISE_MIN = BIT_TIME_US - TOLERANCE_US
RISE_MAX = BIT_TIME_US + TOLERANCE_US
IR_TIMEOUT = RISE_MAX * (8 + SYNC_BITS + 1)
# IR_config.h: IR_SHORT_LOW_GLITCH_REJECT (по умолчанию 1)
SHORT_LOW_GLITCH_REJECT = True
GLITCH_REJECT_PHASE_NUDGE = True
MICRO_GAP_RISE_REJECT = True
IN_MARK_DOUBLE_FALL_IGNORE = False # как IR_config по умолчанию; True — меньше err_other, на разбор битов не влияет
# IR_RISE_INCLUSIVE_AROUND, IR_RISE_GRAY_SINGLE_BIT_FALLBACK
RISE_INCLUSIVE_AROUND = True
RISE_GRAY_SINGLE_BIT_FALLBACK = False # как IR_config по умолчанию; True — только для экспериментов
def around_rise_period(rise_period: int, rise_sync: int) -> bool:
lo = max(0, rise_sync - TOLERANCE_US)
hi = rise_sync + TOLERANCE_US
if RISE_INCLUSIVE_AROUND:
return lo <= rise_period <= hi
return lo < rise_period < hi
def rise_gray_single_bit_fallback(rise_period: int, rise_sync: int) -> bool:
hi = rise_sync + TOLERANCE_US
return rise_period > hi and rise_period <= 2 * rise_sync
def glitch_phase_nudge(edge_us: float, rise_sync: int, prev_rise: float) -> float:
if not GLITCH_REJECT_PHASE_NUDGE:
return prev_rise
if edge_us <= rise_sync:
return prev_rise
nudged = edge_us - rise_sync
if nudged > prev_rise and nudged < edge_us:
return nudged
return prev_rise
def ceil_div(val: int, div: int) -> int:
ret = val // div
if ((val << 4) // div - (ret << 4)) >= 8:
ret += 1
return ret
def crc8(data: bytes, start: int, end: int, poly: int) -> int:
crc = 0xFF
for i in range(start, end):
crc ^= data[i]
for _ in range(8):
if crc & 0x80:
crc = ((crc << 1) ^ poly) & 0xFF
else:
crc = (crc << 1) & 0xFF
return crc
def crc_check(buf: bytearray, pack_size: int) -> bool:
ln = pack_size - CRC_BYTES
c1 = crc8(bytes(buf), 0, ln, POLY1)
c2 = crc8(bytes(buf), 0, ln + 1, POLY2)
crc = ((c1 << 8) & 0xFF00) | (c2 & 0xFF)
return buf[ln] == ((crc >> 8) & 0xFF) and buf[ln + 1] == (crc & 0xFF)
def parse_raw_csv(path: Path) -> list[tuple[float, str, float]] | None:
"""None — файл не в формате уровней (например экспорт битов декодера)."""
rows = []
with path.open(encoding="utf-8", errors="replace") as f:
header = f.readline()
h = header.lower()
if "duration" not in h and "level" not in h:
if "bit_idx" in h or (len(header.split(",")) >= 3 and header.split(",")[1].strip() == "Type"):
return None
for line in f:
line = line.strip()
if not line:
continue
parts = line.split(",")
if len(parts) < 4:
continue
t_s = float(parts[0])
level = parts[1].strip()
if level.upper() not in ("HIGH", "LOW"):
return None
dur_us = float(parts[3].replace(" us", "").strip())
rows.append((t_s, level, dur_us))
return rows
def segments_to_edges(rows: list[tuple[float, str, float]]) -> list[tuple[float, bool]]:
"""Фронты: (t_us, rising)."""
edges: list[tuple[float, bool]] = []
prev = None
for t_s, level, _dur in rows:
high = level.upper() == "HIGH"
if prev is not None:
rising = high
edges.append((t_s * 1e6, rising))
prev = high
return edges
@dataclass
class SimState:
prev_rise: float = 0.0
prev_fall: float = 0.0
rise_period: int = 0
high_time: int = 0
low_time: int = 0
last_edge: float = 0.0
preamb_front_counter: int = 0
is_preamb: bool = False
is_recive_raw: bool = False
is_recive: bool = False
is_wrong_pack: bool = False
is_buffer_overflow: bool = False
buf_bit_pos: int = 0
is_data: bool = True
i_data_buffer: int = 0
next_control_bit: int = BIT_PER_BYTE
i_sync_bit: int = 0
err_sync_bit: int = 0
pack_size: int = 0
data_buffer: bytearray = field(default_factory=lambda: bytearray(DATA_BYTE_SIZE_MAX))
err_low: int = 0
err_high: int = 0
err_other: int = 0
high_count: int = 0
low_count: int = 0
all_count: int = 0
packets: list[tuple[bool, int, bytes]] = field(default_factory=list) # crc_ok, pack_size, raw bytes
bits_log: list[tuple[float, str, int]] = field(default_factory=list) # t_us, kind, bit
def first_rx(st: SimState) -> None:
"""IR_DecoderRaw::firstRX — сброс буфера; isRecive/isReciveRaw в прошивке здесь не меняются."""
st.is_preamb = True
st.is_wrong_pack = False
st.is_buffer_overflow = False
st.buf_bit_pos = 0
st.is_data = True
st.i_data_buffer = 0
st.next_control_bit = BIT_PER_BYTE
st.i_sync_bit = 0
st.err_sync_bit = 0
st.pack_size = 0
st.data_buffer = bytearray(DATA_BYTE_SIZE_MAX)
def tick(
st: SimState,
t_us: float,
rising: bool,
rise_sync_time: int,
*,
trace_rise: list[dict] | None = None,
) -> None:
"""Один вызов tick — один фронт (как IR_DecoderRaw после pop)."""
rise_min = max(0, rise_sync_time - TOLERANCE_US)
rise_max = rise_sync_time + TOLERANCE_US
irmax = IR_TIMEOUT # упрощ.: без подстройки riseSyncTime в timeout
# listenStart: обрыв незавершённого приёма
if st.is_recive_raw and (t_us - st.prev_rise) > irmax * 2:
st.is_recive_raw = False
first_rx(st)
st.last_edge = t_us
skip_rest = False
if rising:
cand_rp = int(t_us - st.prev_rise)
cand_ht = int(t_us - st.prev_fall)
cand_lt = int(st.prev_fall - st.prev_rise)
if SHORT_LOW_GLITCH_REJECT:
short_low_glitch = (
st.is_recive
and not st.is_preamb
and cand_ht < (rise_min // 8)
and cand_lt >= rise_min
and cand_rp >= rise_min
and cand_rp <= IR_TIMEOUT
)
if short_low_glitch:
st.err_other += 1
if GLITCH_REJECT_PHASE_NUDGE:
st.prev_rise = glitch_phase_nudge(t_us, rise_sync_time, st.prev_rise)
skip_rest = True
if not skip_rest and MICRO_GAP_RISE_REJECT:
lt_ok = (cand_lt >= rise_min) or (cand_lt >= (rise_min // 4) and cand_lt < rise_min)
micro_gap = (
st.is_recive
and not st.is_preamb
and cand_ht < (rise_min // 8)
and lt_ok
and cand_rp >= (rise_min // 4)
and cand_rp < rise_min
and cand_rp <= IR_TIMEOUT
)
if micro_gap:
st.err_other += 1
if GLITCH_REJECT_PHASE_NUDGE:
st.prev_rise = glitch_phase_nudge(t_us, rise_sync_time, st.prev_rise)
skip_rest = True
if not skip_rest and cand_rp <= rise_max / 4 and not st.high_count and not st.low_count:
st.err_other += 1
skip_rest = True
if not skip_rest:
# Как IR_DecoderRaw::tick: до обновления prev_rise, иначе (t_us - prev_rise) == 0 на подъёме.
if cand_rp > irmax * 2 and not st.is_recive_raw:
first_rx(st)
st.preamb_front_counter = PREAMB_FRONTS - 1
st.is_preamb = True
st.is_recive = True
st.is_recive_raw = True
st.is_wrong_pack = False
st.rise_period = cand_rp
st.high_time = cand_ht
st.low_time = cand_lt
st.prev_rise = t_us
else:
if t_us - st.prev_fall > rise_min / 4:
st.prev_fall = t_us
else:
skip_err = False
if IN_MARK_DOUBLE_FALL_IGNORE:
hi_since_rise = t_us - st.prev_rise
mark_end_min = (rise_min * BIT_ACTIVE_TAKTS) // (
BIT_ACTIVE_TAKTS + BIT_PAUSE_TAKTS
)
skip_err = (
st.is_recive
and not st.is_preamb
and hi_since_rise < mark_end_min
)
if not skip_err:
st.err_other += 1
if skip_rest:
return
# Старт нового кадра после длинной паузы (в прошивке буфер обнуляется через available/таймаут;
# для мульти-пакета в симуляции явно first_rx, иначе i_data_buffer залипает).
if t_us > st.prev_rise and (t_us - st.prev_rise) > irmax * 2 and not st.is_recive_raw:
first_rx(st)
st.preamb_front_counter = PREAMB_FRONTS - 1
st.is_preamb = True
st.is_recive = True
st.is_recive_raw = True
st.is_wrong_pack = False
if st.preamb_front_counter:
if rising and st.rise_period < irmax:
if st.rise_period < rise_min // 2:
st.preamb_front_counter += 2
st.err_other += 1
st.preamb_front_counter -= 1
else:
if st.is_preamb:
st.is_preamb = False
st.prev_rise += st.rise_period / 2.0
return
if st.is_preamb:
return
if st.rise_period > irmax or st.is_buffer_overflow or st.rise_period < rise_min or st.is_wrong_pack:
return
if not rising:
return
st.high_count = st.low_count = st.all_count = 0
invert_err = False
def write_to_buffer(bit: bool, invert_fix: bool) -> None:
if st.i_data_buffer > DATA_BYTE_SIZE_MAX * 8:
st.is_buffer_overflow = True
if st.is_buffer_overflow or st.is_preamb or st.is_wrong_pack:
st.is_recive = False
st.is_recive_raw = False
return
if st.buf_bit_pos == st.next_control_bit:
st.next_control_bit += SYNC_BITS if st.is_data else BIT_PER_BYTE
st.is_data = not st.is_data
st.i_sync_bit = 0
st.err_sync_bit = 0
if st.is_data:
bi = st.i_data_buffer
st.data_buffer[bi // 8] |= (1 if bit else 0) << (7 - (bi % 8))
st.i_data_buffer += 1
st.buf_bit_pos += 1
st.bits_log.append((t_us, "D", 1 if bit else 0))
else:
if st.i_sync_bit == 0:
last_b = (st.data_buffer[((st.i_data_buffer - 1) // 8)] >> (7 - ((st.i_data_buffer - 1) % 8))) & 1
if bit != bool(last_b):
st.buf_bit_pos += 1
st.i_sync_bit += 1
st.bits_log.append((t_us, "S", 1 if bit else 0))
else:
st.i_sync_bit = 0
st.err_other += 1
st.err_sync_bit += 1
if st.err_sync_bit >= SYNC_BITS:
st.is_wrong_pack = True
else:
st.buf_bit_pos += 1
st.i_sync_bit += 1
st.bits_log.append((t_us, "S", 1 if bit else 0))
st.is_wrong_pack = st.err_sync_bit >= SYNC_BITS
if st.is_data and not st.is_wrong_pack:
if st.i_data_buffer == 8 * MSG_BYTES:
st.pack_size = st.data_buffer[0] & IR_MASK_MSG_INFO
if st.pack_size and st.i_data_buffer == st.pack_size * BIT_PER_BYTE:
ok = crc_check(st.data_buffer, st.pack_size)
st.packets.append((ok, st.pack_size, bytes(st.data_buffer[: st.pack_size])))
st.is_recive = False
st.is_recive_raw = False
# буфер не чистят здесь — как в IR_DecoderRaw; firstRX по listenStart
if around_rise_period(st.rise_period, rise_sync_time):
if st.high_time > st.low_time:
write_to_buffer(True, False)
else:
write_to_buffer(False, False)
elif RISE_GRAY_SINGLE_BIT_FALLBACK and rise_gray_single_bit_fallback(st.rise_period, rise_sync_time):
st.err_other += 1
if st.high_time > st.low_time:
write_to_buffer(True, False)
else:
write_to_buffer(False, False)
else:
hc = ceil_div(min(st.high_time, 0xFFFF), rise_sync_time)
lc = ceil_div(min(st.low_time, 0xFFFF), rise_sync_time)
ac = ceil_div(min(st.rise_period, 0xFFFF), rise_sync_time)
st.high_count = min(hc, 127)
st.low_count = min(lc, 127)
st.all_count = min(ac, 127)
if st.high_count == 0 and st.high_time > rise_sync_time // 3:
st.high_count += 1
st.err_other += 1
if st.low_count + st.high_count > st.all_count:
if st.low_count > st.high_count:
st.low_count = st.all_count - st.high_count
st.err_low += st.low_count
elif st.low_count < st.high_count:
st.high_count = st.all_count - st.low_count
st.err_high += st.high_count
elif st.low_count == st.high_count:
invert_err = True
st.err_other += st.all_count
if st.low_count < st.high_count:
st.err_high += st.high_count
else:
st.err_low += st.low_count
# Как IR_DecoderRaw.cpp / IrFoxDecoder: не более 8 LOW и 8 HIGH за один подъём (i < n && 8 - i).
i = 0
while i < st.low_count and (8 - i):
if i == st.low_count - 1 and invert_err:
invert_err = False
write_to_buffer(True, True)
else:
write_to_buffer(False, False)
i += 1
i = 0
while i < st.high_count and (8 - i):
if i == st.high_count - 1 and invert_err:
invert_err = False
write_to_buffer(False, True)
else:
write_to_buffer(True, False)
i += 1
if trace_rise is not None and rising and not skip_rest:
# После decode: залогировать только реальные записи битов (не преамбула, не ранний return выше)
if (
not st.is_preamb
and st.rise_period <= irmax
and not st.is_buffer_overflow
and st.rise_period >= rise_min
):
rec: dict = {
"t_us": t_us,
"rp": st.rise_period,
"ht": st.high_time,
"lt": st.low_time,
"i_buf": st.i_data_buffer,
"n_pkt": len(st.packets),
}
if around_rise_period(st.rise_period, rise_sync_time):
rec["branch"] = "around"
rec["bits_out"] = 1
elif RISE_GRAY_SINGLE_BIT_FALLBACK and rise_gray_single_bit_fallback(
st.rise_period, rise_sync_time
):
rec["branch"] = "gray"
rec["bits_out"] = 1
else:
rec["branch"] = "ceil"
rec["hc"] = st.high_count
rec["lc"] = st.low_count
rec["ac"] = st.all_count
rec["bits_out"] = st.low_count + st.high_count
trace_rise.append(rec)
def run_file(path: Path, max_packets: int = 0) -> SimState | None:
"""max_packets=0 — обработать весь файл (для регрессии по всем пакетам)."""
rows = parse_raw_csv(path)
if rows is None:
return None
edges = segments_to_edges(rows)
st = SimState()
rise_sync = BIT_TIME_US
for t_us, rising in edges:
tick(st, t_us, rising, rise_sync)
if max_packets and len(st.packets) >= max_packets:
break
return st
def main() -> None:
root = Path(__file__).resolve().parents[1]
default_set = [
("CAR", root / "Analyzer" / "raw" / "car_raw.txt"),
("POINT", root / "Analyzer" / "raw" / "point_raw.txt"),
("CARRAW3", root / "Analyzer" / "raw" / "carraw3.txt"),
("POINTRA3", root / "Analyzer" / "raw" / "pointraw_3.txt"),
]
files = default_set
if len(sys.argv) >= 2:
files = [(Path(p).name, Path(p)) for p in sys.argv[1:]]
for label, p in files:
if not p.exists():
print(f"skip {label}: {p} not found")
continue
st = run_file(p, max_packets=0)
print(f"=== {label} {p.name} ===")
if st is None:
print(
" (skip: decoder trace CSV Type/bit_idx, or missing Level+Duration; need Saleae level like car_raw.txt)"
)
continue
n_ok = sum(1 for x in st.packets if x[0])
n_bad = len(st.packets) - n_ok
print(
f" packets={len(st.packets)} crc_ok={n_ok} crc_bad={n_bad} "
f"err_other={st.err_other} err_low={st.err_low} err_high={st.err_high}"
)
for i, (ok, psz, raw) in enumerate(st.packets[:12]):
hx = raw.hex(" ")
print(f" pkt[{i}] crc_ok={ok} len={psz} {hx}")
if len(st.packets) > 12:
print(f" ... ({len(st.packets) - 12} more packets)")
print(f" first 24 bits: {st.bits_log[:24]}")
car = root / "Analyzer" / "raw" / "car_raw.txt"
point = root / "Analyzer" / "raw" / "point_raw.txt"
if car.exists() and point.exists():
sc = run_file(car, max_packets=1)
sp = run_file(point, max_packets=1)
if sc is None or sp is None:
return
print("\n=== First packet bit diff (D/S sequence) CAR vs POINT ===")
for i, (a, b) in enumerate(zip(sc.bits_log, sp.bits_log)):
if a != b:
print(f" idx {i}: CAR {a} vs POINT {b}")
break
else:
if len(sc.bits_log) != len(sp.bits_log):
print(f" len CAR={len(sc.bits_log)} POINT={len(sp.bits_log)}")
else:
print(" identical bit logs for min(len)")
if __name__ == "__main__":
main()