Files
IR-protocol/tools/ir_decoder_raw_sim.py
2026-04-07 13:25:55 +03:00

508 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Симуляция IR_DecoderRaw::tick() по CSV уровней (как Analyzer/raw/*_raw.txt).
Сверка с IR_DecoderRaw.cpp / IR_config.h — без Arduino, только логика декодера.
"""
from __future__ import annotations
import sys
from dataclasses import dataclass, field
from pathlib import Path
# IR_config.h
BIT_ACTIVE_TAKTS = 25
BIT_PAUSE_TAKTS = 12
CARRIER_HZ = 38000
CARRIER_PERIOD_US = 1_000_000 / CARRIER_HZ
BIT_TIME_US = int((BIT_ACTIVE_TAKTS + BIT_PAUSE_TAKTS) * CARRIER_PERIOD_US)
TOLERANCE_US = 300
SYNC_BITS = 3
BIT_PER_BYTE = 8
MSG_BYTES = 1
CRC_BYTES = 2
POLY1 = 0x31
POLY2 = 0x8C
DATA_BYTE_SIZE_MAX = 1 + 2 + 2 + 31 + 2
IR_MASK_MSG_INFO = 0x1F
PREAMB_FRONTS = 6 # preambPulse*2
RISE_MIN = BIT_TIME_US - TOLERANCE_US
RISE_MAX = BIT_TIME_US + TOLERANCE_US
IR_TIMEOUT = RISE_MAX * (8 + SYNC_BITS + 1)
# IR_config.h: IR_SHORT_LOW_GLITCH_REJECT (по умолчанию 1)
SHORT_LOW_GLITCH_REJECT = True
GLITCH_REJECT_PHASE_NUDGE = True
MICRO_GAP_RISE_REJECT = True
IN_MARK_DOUBLE_FALL_IGNORE = False # как IR_config по умолчанию; True — меньше err_other, на разбор битов не влияет
# IR_RISE_INCLUSIVE_AROUND, IR_RISE_GRAY_SINGLE_BIT_FALLBACK
RISE_INCLUSIVE_AROUND = True
RISE_GRAY_SINGLE_BIT_FALLBACK = False # как IR_config по умолчанию; True — только для экспериментов
def around_rise_period(rise_period: int, rise_sync: int) -> bool:
lo = max(0, rise_sync - TOLERANCE_US)
hi = rise_sync + TOLERANCE_US
if RISE_INCLUSIVE_AROUND:
return lo <= rise_period <= hi
return lo < rise_period < hi
def rise_gray_single_bit_fallback(rise_period: int, rise_sync: int) -> bool:
hi = rise_sync + TOLERANCE_US
return rise_period > hi and rise_period <= 2 * rise_sync
def glitch_phase_nudge(edge_us: float, rise_sync: int, prev_rise: float) -> float:
if not GLITCH_REJECT_PHASE_NUDGE:
return prev_rise
if edge_us <= rise_sync:
return prev_rise
nudged = edge_us - rise_sync
if nudged > prev_rise and nudged < edge_us:
return nudged
return prev_rise
def ceil_div(val: int, div: int) -> int:
ret = val // div
if ((val << 4) // div - (ret << 4)) >= 8:
ret += 1
return ret
def crc8(data: bytes, start: int, end: int, poly: int) -> int:
crc = 0xFF
for i in range(start, end):
crc ^= data[i]
for _ in range(8):
if crc & 0x80:
crc = ((crc << 1) ^ poly) & 0xFF
else:
crc = (crc << 1) & 0xFF
return crc
def crc_check(buf: bytearray, pack_size: int) -> bool:
ln = pack_size - CRC_BYTES
c1 = crc8(bytes(buf), 0, ln, POLY1)
c2 = crc8(bytes(buf), 0, ln + 1, POLY2)
crc = ((c1 << 8) & 0xFF00) | (c2 & 0xFF)
return buf[ln] == ((crc >> 8) & 0xFF) and buf[ln + 1] == (crc & 0xFF)
def parse_raw_csv(path: Path) -> list[tuple[float, str, float]] | None:
"""None — файл не в формате уровней (например экспорт битов декодера)."""
rows = []
with path.open(encoding="utf-8", errors="replace") as f:
header = f.readline()
h = header.lower()
if "duration" not in h and "level" not in h:
if "bit_idx" in h or (len(header.split(",")) >= 3 and header.split(",")[1].strip() == "Type"):
return None
for line in f:
line = line.strip()
if not line:
continue
parts = line.split(",")
if len(parts) < 4:
continue
t_s = float(parts[0])
level = parts[1].strip()
if level.upper() not in ("HIGH", "LOW"):
return None
dur_us = float(parts[3].replace(" us", "").strip())
rows.append((t_s, level, dur_us))
return rows
def segments_to_edges(rows: list[tuple[float, str, float]]) -> list[tuple[float, bool]]:
"""Фронты: (t_us, rising)."""
edges: list[tuple[float, bool]] = []
prev = None
for t_s, level, _dur in rows:
high = level.upper() == "HIGH"
if prev is not None:
rising = high
edges.append((t_s * 1e6, rising))
prev = high
return edges
@dataclass
class SimState:
prev_rise: float = 0.0
prev_fall: float = 0.0
rise_period: int = 0
high_time: int = 0
low_time: int = 0
last_edge: float = 0.0
preamb_front_counter: int = 0
is_preamb: bool = False
is_recive_raw: bool = False
is_recive: bool = False
is_wrong_pack: bool = False
is_buffer_overflow: bool = False
buf_bit_pos: int = 0
is_data: bool = True
i_data_buffer: int = 0
next_control_bit: int = BIT_PER_BYTE
i_sync_bit: int = 0
err_sync_bit: int = 0
pack_size: int = 0
data_buffer: bytearray = field(default_factory=lambda: bytearray(DATA_BYTE_SIZE_MAX))
err_low: int = 0
err_high: int = 0
err_other: int = 0
high_count: int = 0
low_count: int = 0
all_count: int = 0
packets: list[tuple[bool, int, bytes]] = field(default_factory=list) # crc_ok, pack_size, raw bytes
bits_log: list[tuple[float, str, int]] = field(default_factory=list) # t_us, kind, bit
def first_rx(st: SimState) -> None:
"""IR_DecoderRaw::firstRX — сброс буфера; isRecive/isReciveRaw в прошивке здесь не меняются."""
st.is_preamb = True
st.is_wrong_pack = False
st.is_buffer_overflow = False
st.buf_bit_pos = 0
st.is_data = True
st.i_data_buffer = 0
st.next_control_bit = BIT_PER_BYTE
st.i_sync_bit = 0
st.err_sync_bit = 0
st.pack_size = 0
st.data_buffer = bytearray(DATA_BYTE_SIZE_MAX)
def tick(
st: SimState,
t_us: float,
rising: bool,
rise_sync_time: int,
*,
trace_rise: list[dict] | None = None,
) -> None:
"""Один вызов tick — один фронт (как IR_DecoderRaw после pop)."""
rise_min = max(0, rise_sync_time - TOLERANCE_US)
rise_max = rise_sync_time + TOLERANCE_US
irmax = IR_TIMEOUT # упрощ.: без подстройки riseSyncTime в timeout
# listenStart: обрыв незавершённого приёма
if st.is_recive_raw and (t_us - st.prev_rise) > irmax * 2:
st.is_recive_raw = False
first_rx(st)
st.last_edge = t_us
skip_rest = False
if rising:
cand_rp = int(t_us - st.prev_rise)
cand_ht = int(t_us - st.prev_fall)
cand_lt = int(st.prev_fall - st.prev_rise)
if SHORT_LOW_GLITCH_REJECT:
short_low_glitch = (
st.is_recive
and not st.is_preamb
and cand_ht < (rise_min // 8)
and cand_lt >= rise_min
and cand_rp >= rise_min
and cand_rp <= IR_TIMEOUT
)
if short_low_glitch:
st.err_other += 1
if GLITCH_REJECT_PHASE_NUDGE:
st.prev_rise = glitch_phase_nudge(t_us, rise_sync_time, st.prev_rise)
skip_rest = True
if not skip_rest and MICRO_GAP_RISE_REJECT:
lt_ok = (cand_lt >= rise_min) or (cand_lt >= (rise_min // 4) and cand_lt < rise_min)
micro_gap = (
st.is_recive
and not st.is_preamb
and cand_ht < (rise_min // 8)
and lt_ok
and cand_rp >= (rise_min // 4)
and cand_rp < rise_min
and cand_rp <= IR_TIMEOUT
)
if micro_gap:
st.err_other += 1
if GLITCH_REJECT_PHASE_NUDGE:
st.prev_rise = glitch_phase_nudge(t_us, rise_sync_time, st.prev_rise)
skip_rest = True
if not skip_rest and cand_rp <= rise_max / 4 and not st.high_count and not st.low_count:
st.err_other += 1
skip_rest = True
if not skip_rest:
# Как IR_DecoderRaw::tick: до обновления prev_rise, иначе (t_us - prev_rise) == 0 на подъёме.
if cand_rp > irmax * 2 and not st.is_recive_raw:
first_rx(st)
st.preamb_front_counter = PREAMB_FRONTS - 1
st.is_preamb = True
st.is_recive = True
st.is_recive_raw = True
st.is_wrong_pack = False
st.rise_period = cand_rp
st.high_time = cand_ht
st.low_time = cand_lt
st.prev_rise = t_us
else:
if t_us - st.prev_fall > rise_min / 4:
st.prev_fall = t_us
else:
skip_err = False
if IN_MARK_DOUBLE_FALL_IGNORE:
hi_since_rise = t_us - st.prev_rise
mark_end_min = (rise_min * BIT_ACTIVE_TAKTS) // (
BIT_ACTIVE_TAKTS + BIT_PAUSE_TAKTS
)
skip_err = (
st.is_recive
and not st.is_preamb
and hi_since_rise < mark_end_min
)
if not skip_err:
st.err_other += 1
if skip_rest:
return
# Старт нового кадра после длинной паузы (в прошивке буфер обнуляется через available/таймаут;
# для мульти-пакета в симуляции явно first_rx, иначе i_data_buffer залипает).
if t_us > st.prev_rise and (t_us - st.prev_rise) > irmax * 2 and not st.is_recive_raw:
first_rx(st)
st.preamb_front_counter = PREAMB_FRONTS - 1
st.is_preamb = True
st.is_recive = True
st.is_recive_raw = True
st.is_wrong_pack = False
if st.preamb_front_counter:
if rising and st.rise_period < irmax:
if st.rise_period < rise_min // 2:
st.preamb_front_counter += 2
st.err_other += 1
st.preamb_front_counter -= 1
else:
if st.is_preamb:
st.is_preamb = False
st.prev_rise += st.rise_period / 2.0
return
if st.is_preamb:
return
if st.rise_period > irmax or st.is_buffer_overflow or st.rise_period < rise_min or st.is_wrong_pack:
return
if not rising:
return
st.high_count = st.low_count = st.all_count = 0
invert_err = False
def write_to_buffer(bit: bool, invert_fix: bool) -> None:
if st.i_data_buffer > DATA_BYTE_SIZE_MAX * 8:
st.is_buffer_overflow = True
if st.is_buffer_overflow or st.is_preamb or st.is_wrong_pack:
st.is_recive = False
st.is_recive_raw = False
return
if st.buf_bit_pos == st.next_control_bit:
st.next_control_bit += SYNC_BITS if st.is_data else BIT_PER_BYTE
st.is_data = not st.is_data
st.i_sync_bit = 0
st.err_sync_bit = 0
if st.is_data:
bi = st.i_data_buffer
st.data_buffer[bi // 8] |= (1 if bit else 0) << (7 - (bi % 8))
st.i_data_buffer += 1
st.buf_bit_pos += 1
st.bits_log.append((t_us, "D", 1 if bit else 0))
else:
if st.i_sync_bit == 0:
last_b = (st.data_buffer[((st.i_data_buffer - 1) // 8)] >> (7 - ((st.i_data_buffer - 1) % 8))) & 1
if bit != bool(last_b):
st.buf_bit_pos += 1
st.i_sync_bit += 1
st.bits_log.append((t_us, "S", 1 if bit else 0))
else:
st.i_sync_bit = 0
st.err_other += 1
st.err_sync_bit += 1
if st.err_sync_bit >= SYNC_BITS:
st.is_wrong_pack = True
else:
st.buf_bit_pos += 1
st.i_sync_bit += 1
st.bits_log.append((t_us, "S", 1 if bit else 0))
st.is_wrong_pack = st.err_sync_bit >= SYNC_BITS
if st.is_data and not st.is_wrong_pack:
if st.i_data_buffer == 8 * MSG_BYTES:
st.pack_size = st.data_buffer[0] & IR_MASK_MSG_INFO
if st.pack_size and st.i_data_buffer == st.pack_size * BIT_PER_BYTE:
ok = crc_check(st.data_buffer, st.pack_size)
st.packets.append((ok, st.pack_size, bytes(st.data_buffer[: st.pack_size])))
st.is_recive = False
st.is_recive_raw = False
# буфер не чистят здесь — как в IR_DecoderRaw; firstRX по listenStart
if around_rise_period(st.rise_period, rise_sync_time):
if st.high_time > st.low_time:
write_to_buffer(True, False)
else:
write_to_buffer(False, False)
elif RISE_GRAY_SINGLE_BIT_FALLBACK and rise_gray_single_bit_fallback(st.rise_period, rise_sync_time):
st.err_other += 1
if st.high_time > st.low_time:
write_to_buffer(True, False)
else:
write_to_buffer(False, False)
else:
hc = ceil_div(min(st.high_time, 0xFFFF), rise_sync_time)
lc = ceil_div(min(st.low_time, 0xFFFF), rise_sync_time)
ac = ceil_div(min(st.rise_period, 0xFFFF), rise_sync_time)
st.high_count = min(hc, 127)
st.low_count = min(lc, 127)
st.all_count = min(ac, 127)
if st.high_count == 0 and st.high_time > rise_sync_time // 3:
st.high_count += 1
st.err_other += 1
if st.low_count + st.high_count > st.all_count:
if st.low_count > st.high_count:
st.low_count = st.all_count - st.high_count
st.err_low += st.low_count
elif st.low_count < st.high_count:
st.high_count = st.all_count - st.low_count
st.err_high += st.high_count
elif st.low_count == st.high_count:
invert_err = True
st.err_other += st.all_count
if st.low_count < st.high_count:
st.err_high += st.high_count
else:
st.err_low += st.low_count
# Как IR_DecoderRaw.cpp / IrFoxDecoder: не более 8 LOW и 8 HIGH за один подъём (i < n && 8 - i).
i = 0
while i < st.low_count and (8 - i):
if i == st.low_count - 1 and invert_err:
invert_err = False
write_to_buffer(True, True)
else:
write_to_buffer(False, False)
i += 1
i = 0
while i < st.high_count and (8 - i):
if i == st.high_count - 1 and invert_err:
invert_err = False
write_to_buffer(False, True)
else:
write_to_buffer(True, False)
i += 1
if trace_rise is not None and rising and not skip_rest:
# После decode: залогировать только реальные записи битов (не преамбула, не ранний return выше)
if (
not st.is_preamb
and st.rise_period <= irmax
and not st.is_buffer_overflow
and st.rise_period >= rise_min
):
rec: dict = {
"t_us": t_us,
"rp": st.rise_period,
"ht": st.high_time,
"lt": st.low_time,
"i_buf": st.i_data_buffer,
"n_pkt": len(st.packets),
}
if around_rise_period(st.rise_period, rise_sync_time):
rec["branch"] = "around"
rec["bits_out"] = 1
elif RISE_GRAY_SINGLE_BIT_FALLBACK and rise_gray_single_bit_fallback(
st.rise_period, rise_sync_time
):
rec["branch"] = "gray"
rec["bits_out"] = 1
else:
rec["branch"] = "ceil"
rec["hc"] = st.high_count
rec["lc"] = st.low_count
rec["ac"] = st.all_count
rec["bits_out"] = st.low_count + st.high_count
trace_rise.append(rec)
def run_file(path: Path, max_packets: int = 0) -> SimState | None:
"""max_packets=0 — обработать весь файл (для регрессии по всем пакетам)."""
rows = parse_raw_csv(path)
if rows is None:
return None
edges = segments_to_edges(rows)
st = SimState()
rise_sync = BIT_TIME_US
for t_us, rising in edges:
tick(st, t_us, rising, rise_sync)
if max_packets and len(st.packets) >= max_packets:
break
return st
def main() -> None:
root = Path(__file__).resolve().parents[1]
default_set = [
("CAR", root / "Analyzer" / "raw" / "car_raw.txt"),
("POINT", root / "Analyzer" / "raw" / "point_raw.txt"),
("CARRAW3", root / "Analyzer" / "raw" / "carraw3.txt"),
("POINTRA3", root / "Analyzer" / "raw" / "pointraw_3.txt"),
]
files = default_set
if len(sys.argv) >= 2:
files = [(Path(p).name, Path(p)) for p in sys.argv[1:]]
for label, p in files:
if not p.exists():
print(f"skip {label}: {p} not found")
continue
st = run_file(p, max_packets=0)
print(f"=== {label} {p.name} ===")
if st is None:
print(
" (skip: decoder trace CSV Type/bit_idx, or missing Level+Duration; need Saleae level like car_raw.txt)"
)
continue
n_ok = sum(1 for x in st.packets if x[0])
n_bad = len(st.packets) - n_ok
print(
f" packets={len(st.packets)} crc_ok={n_ok} crc_bad={n_bad} "
f"err_other={st.err_other} err_low={st.err_low} err_high={st.err_high}"
)
for i, (ok, psz, raw) in enumerate(st.packets[:12]):
hx = raw.hex(" ")
print(f" pkt[{i}] crc_ok={ok} len={psz} {hx}")
if len(st.packets) > 12:
print(f" ... ({len(st.packets) - 12} more packets)")
print(f" first 24 bits: {st.bits_log[:24]}")
car = root / "Analyzer" / "raw" / "car_raw.txt"
point = root / "Analyzer" / "raw" / "point_raw.txt"
if car.exists() and point.exists():
sc = run_file(car, max_packets=1)
sp = run_file(point, max_packets=1)
if sc is None or sp is None:
return
print("\n=== First packet bit diff (D/S sequence) CAR vs POINT ===")
for i, (a, b) in enumerate(zip(sc.bits_log, sp.bits_log)):
if a != b:
print(f" idx {i}: CAR {a} vs POINT {b}")
break
else:
if len(sc.bits_log) != len(sp.bits_log):
print(f" len CAR={len(sc.bits_log)} POINT={len(sp.bits_log)}")
else:
print(" identical bit logs for min(len)")
if __name__ == "__main__":
main()