#!/usr/bin/env python3 """ Симуляция IR_DecoderRaw::tick() по CSV уровней (как Analyzer/raw/*_raw.txt). Сверка с IR_DecoderRaw.cpp / IR_config.h — без Arduino, только логика декодера. """ from __future__ import annotations import sys from dataclasses import dataclass, field from pathlib import Path # IR_config.h BIT_ACTIVE_TAKTS = 25 BIT_PAUSE_TAKTS = 12 CARRIER_HZ = 38000 CARRIER_PERIOD_US = 1_000_000 / CARRIER_HZ BIT_TIME_US = int((BIT_ACTIVE_TAKTS + BIT_PAUSE_TAKTS) * CARRIER_PERIOD_US) TOLERANCE_US = 300 SYNC_BITS = 3 BIT_PER_BYTE = 8 MSG_BYTES = 1 CRC_BYTES = 2 POLY1 = 0x31 POLY2 = 0x8C DATA_BYTE_SIZE_MAX = 1 + 2 + 2 + 31 + 2 IR_MASK_MSG_INFO = 0x1F PREAMB_FRONTS = 6 # preambPulse*2 RISE_MIN = BIT_TIME_US - TOLERANCE_US RISE_MAX = BIT_TIME_US + TOLERANCE_US IR_TIMEOUT = RISE_MAX * (8 + SYNC_BITS + 1) # IR_config.h: IR_SHORT_LOW_GLITCH_REJECT (по умолчанию 1) SHORT_LOW_GLITCH_REJECT = True GLITCH_REJECT_PHASE_NUDGE = True MICRO_GAP_RISE_REJECT = True IN_MARK_DOUBLE_FALL_IGNORE = False # как IR_config по умолчанию; True — меньше err_other, на разбор битов не влияет # IR_RISE_INCLUSIVE_AROUND, IR_RISE_GRAY_SINGLE_BIT_FALLBACK RISE_INCLUSIVE_AROUND = True RISE_GRAY_SINGLE_BIT_FALLBACK = False # как IR_config по умолчанию; True — только для экспериментов def around_rise_period(rise_period: int, rise_sync: int) -> bool: lo = max(0, rise_sync - TOLERANCE_US) hi = rise_sync + TOLERANCE_US if RISE_INCLUSIVE_AROUND: return lo <= rise_period <= hi return lo < rise_period < hi def rise_gray_single_bit_fallback(rise_period: int, rise_sync: int) -> bool: hi = rise_sync + TOLERANCE_US return rise_period > hi and rise_period <= 2 * rise_sync def glitch_phase_nudge(edge_us: float, rise_sync: int, prev_rise: float) -> float: if not GLITCH_REJECT_PHASE_NUDGE: return prev_rise if edge_us <= rise_sync: return prev_rise nudged = edge_us - rise_sync if nudged > prev_rise and nudged < edge_us: return nudged return prev_rise def ceil_div(val: int, div: int) -> int: ret = val // div if ((val << 4) // div - (ret << 4)) >= 8: ret += 1 return ret def crc8(data: bytes, start: int, end: int, poly: int) -> int: crc = 0xFF for i in range(start, end): crc ^= data[i] for _ in range(8): if crc & 0x80: crc = ((crc << 1) ^ poly) & 0xFF else: crc = (crc << 1) & 0xFF return crc def crc_check(buf: bytearray, pack_size: int) -> bool: ln = pack_size - CRC_BYTES c1 = crc8(bytes(buf), 0, ln, POLY1) c2 = crc8(bytes(buf), 0, ln + 1, POLY2) crc = ((c1 << 8) & 0xFF00) | (c2 & 0xFF) return buf[ln] == ((crc >> 8) & 0xFF) and buf[ln + 1] == (crc & 0xFF) def parse_raw_csv(path: Path) -> list[tuple[float, str, float]] | None: """None — файл не в формате уровней (например экспорт битов декодера).""" rows = [] with path.open(encoding="utf-8", errors="replace") as f: header = f.readline() h = header.lower() if "duration" not in h and "level" not in h: if "bit_idx" in h or (len(header.split(",")) >= 3 and header.split(",")[1].strip() == "Type"): return None for line in f: line = line.strip() if not line: continue parts = line.split(",") if len(parts) < 4: continue t_s = float(parts[0]) level = parts[1].strip() if level.upper() not in ("HIGH", "LOW"): return None dur_us = float(parts[3].replace(" us", "").strip()) rows.append((t_s, level, dur_us)) return rows def segments_to_edges(rows: list[tuple[float, str, float]]) -> list[tuple[float, bool]]: """Фронты: (t_us, rising).""" edges: list[tuple[float, bool]] = [] prev = None for t_s, level, _dur in rows: high = level.upper() == "HIGH" if prev is not None: rising = high edges.append((t_s * 1e6, rising)) prev = high return edges @dataclass class SimState: prev_rise: float = 0.0 prev_fall: float = 0.0 rise_period: int = 0 high_time: int = 0 low_time: int = 0 last_edge: float = 0.0 preamb_front_counter: int = 0 is_preamb: bool = False is_recive_raw: bool = False is_recive: bool = False is_wrong_pack: bool = False is_buffer_overflow: bool = False buf_bit_pos: int = 0 is_data: bool = True i_data_buffer: int = 0 next_control_bit: int = BIT_PER_BYTE i_sync_bit: int = 0 err_sync_bit: int = 0 pack_size: int = 0 data_buffer: bytearray = field(default_factory=lambda: bytearray(DATA_BYTE_SIZE_MAX)) err_low: int = 0 err_high: int = 0 err_other: int = 0 high_count: int = 0 low_count: int = 0 all_count: int = 0 packets: list[tuple[bool, int, bytes]] = field(default_factory=list) # crc_ok, pack_size, raw bytes bits_log: list[tuple[float, str, int]] = field(default_factory=list) # t_us, kind, bit def first_rx(st: SimState) -> None: """IR_DecoderRaw::firstRX — сброс буфера; isRecive/isReciveRaw в прошивке здесь не меняются.""" st.is_preamb = True st.is_wrong_pack = False st.is_buffer_overflow = False st.buf_bit_pos = 0 st.is_data = True st.i_data_buffer = 0 st.next_control_bit = BIT_PER_BYTE st.i_sync_bit = 0 st.err_sync_bit = 0 st.pack_size = 0 st.data_buffer = bytearray(DATA_BYTE_SIZE_MAX) def tick( st: SimState, t_us: float, rising: bool, rise_sync_time: int, *, trace_rise: list[dict] | None = None, ) -> None: """Один вызов tick — один фронт (как IR_DecoderRaw после pop).""" rise_min = max(0, rise_sync_time - TOLERANCE_US) rise_max = rise_sync_time + TOLERANCE_US irmax = IR_TIMEOUT # упрощ.: без подстройки riseSyncTime в timeout # listenStart: обрыв незавершённого приёма if st.is_recive_raw and (t_us - st.prev_rise) > irmax * 2: st.is_recive_raw = False first_rx(st) st.last_edge = t_us skip_rest = False if rising: cand_rp = int(t_us - st.prev_rise) cand_ht = int(t_us - st.prev_fall) cand_lt = int(st.prev_fall - st.prev_rise) if SHORT_LOW_GLITCH_REJECT: short_low_glitch = ( st.is_recive and not st.is_preamb and cand_ht < (rise_min // 8) and cand_lt >= rise_min and cand_rp >= rise_min and cand_rp <= IR_TIMEOUT ) if short_low_glitch: st.err_other += 1 if GLITCH_REJECT_PHASE_NUDGE: st.prev_rise = glitch_phase_nudge(t_us, rise_sync_time, st.prev_rise) skip_rest = True if not skip_rest and MICRO_GAP_RISE_REJECT: lt_ok = (cand_lt >= rise_min) or (cand_lt >= (rise_min // 4) and cand_lt < rise_min) micro_gap = ( st.is_recive and not st.is_preamb and cand_ht < (rise_min // 8) and lt_ok and cand_rp >= (rise_min // 4) and cand_rp < rise_min and cand_rp <= IR_TIMEOUT ) if micro_gap: st.err_other += 1 if GLITCH_REJECT_PHASE_NUDGE: st.prev_rise = glitch_phase_nudge(t_us, rise_sync_time, st.prev_rise) skip_rest = True if not skip_rest and cand_rp <= rise_max / 4 and not st.high_count and not st.low_count: st.err_other += 1 skip_rest = True if not skip_rest: # Как IR_DecoderRaw::tick: до обновления prev_rise, иначе (t_us - prev_rise) == 0 на подъёме. if cand_rp > irmax * 2 and not st.is_recive_raw: first_rx(st) st.preamb_front_counter = PREAMB_FRONTS - 1 st.is_preamb = True st.is_recive = True st.is_recive_raw = True st.is_wrong_pack = False st.rise_period = cand_rp st.high_time = cand_ht st.low_time = cand_lt st.prev_rise = t_us else: if t_us - st.prev_fall > rise_min / 4: st.prev_fall = t_us else: skip_err = False if IN_MARK_DOUBLE_FALL_IGNORE: hi_since_rise = t_us - st.prev_rise mark_end_min = (rise_min * BIT_ACTIVE_TAKTS) // ( BIT_ACTIVE_TAKTS + BIT_PAUSE_TAKTS ) skip_err = ( st.is_recive and not st.is_preamb and hi_since_rise < mark_end_min ) if not skip_err: st.err_other += 1 if skip_rest: return # Старт нового кадра после длинной паузы (в прошивке буфер обнуляется через available/таймаут; # для мульти-пакета в симуляции явно first_rx, иначе i_data_buffer залипает). if t_us > st.prev_rise and (t_us - st.prev_rise) > irmax * 2 and not st.is_recive_raw: first_rx(st) st.preamb_front_counter = PREAMB_FRONTS - 1 st.is_preamb = True st.is_recive = True st.is_recive_raw = True st.is_wrong_pack = False if st.preamb_front_counter: if rising and st.rise_period < irmax: if st.rise_period < rise_min // 2: st.preamb_front_counter += 2 st.err_other += 1 st.preamb_front_counter -= 1 else: if st.is_preamb: st.is_preamb = False st.prev_rise += st.rise_period / 2.0 return if st.is_preamb: return if st.rise_period > irmax or st.is_buffer_overflow or st.rise_period < rise_min or st.is_wrong_pack: return if not rising: return st.high_count = st.low_count = st.all_count = 0 invert_err = False def write_to_buffer(bit: bool, invert_fix: bool) -> None: if st.i_data_buffer > DATA_BYTE_SIZE_MAX * 8: st.is_buffer_overflow = True if st.is_buffer_overflow or st.is_preamb or st.is_wrong_pack: st.is_recive = False st.is_recive_raw = False return if st.buf_bit_pos == st.next_control_bit: st.next_control_bit += SYNC_BITS if st.is_data else BIT_PER_BYTE st.is_data = not st.is_data st.i_sync_bit = 0 st.err_sync_bit = 0 if st.is_data: bi = st.i_data_buffer st.data_buffer[bi // 8] |= (1 if bit else 0) << (7 - (bi % 8)) st.i_data_buffer += 1 st.buf_bit_pos += 1 st.bits_log.append((t_us, "D", 1 if bit else 0)) else: if st.i_sync_bit == 0: last_b = (st.data_buffer[((st.i_data_buffer - 1) // 8)] >> (7 - ((st.i_data_buffer - 1) % 8))) & 1 if bit != bool(last_b): st.buf_bit_pos += 1 st.i_sync_bit += 1 st.bits_log.append((t_us, "S", 1 if bit else 0)) else: st.i_sync_bit = 0 st.err_other += 1 st.err_sync_bit += 1 if st.err_sync_bit >= SYNC_BITS: st.is_wrong_pack = True else: st.buf_bit_pos += 1 st.i_sync_bit += 1 st.bits_log.append((t_us, "S", 1 if bit else 0)) st.is_wrong_pack = st.err_sync_bit >= SYNC_BITS if st.is_data and not st.is_wrong_pack: if st.i_data_buffer == 8 * MSG_BYTES: st.pack_size = st.data_buffer[0] & IR_MASK_MSG_INFO if st.pack_size and st.i_data_buffer == st.pack_size * BIT_PER_BYTE: ok = crc_check(st.data_buffer, st.pack_size) st.packets.append((ok, st.pack_size, bytes(st.data_buffer[: st.pack_size]))) st.is_recive = False st.is_recive_raw = False # буфер не чистят здесь — как в IR_DecoderRaw; firstRX по listenStart if around_rise_period(st.rise_period, rise_sync_time): if st.high_time > st.low_time: write_to_buffer(True, False) else: write_to_buffer(False, False) elif RISE_GRAY_SINGLE_BIT_FALLBACK and rise_gray_single_bit_fallback(st.rise_period, rise_sync_time): st.err_other += 1 if st.high_time > st.low_time: write_to_buffer(True, False) else: write_to_buffer(False, False) else: hc = ceil_div(min(st.high_time, 0xFFFF), rise_sync_time) lc = ceil_div(min(st.low_time, 0xFFFF), rise_sync_time) ac = ceil_div(min(st.rise_period, 0xFFFF), rise_sync_time) st.high_count = min(hc, 127) st.low_count = min(lc, 127) st.all_count = min(ac, 127) if st.high_count == 0 and st.high_time > rise_sync_time // 3: st.high_count += 1 st.err_other += 1 if st.low_count + st.high_count > st.all_count: if st.low_count > st.high_count: st.low_count = st.all_count - st.high_count st.err_low += st.low_count elif st.low_count < st.high_count: st.high_count = st.all_count - st.low_count st.err_high += st.high_count elif st.low_count == st.high_count: invert_err = True st.err_other += st.all_count if st.low_count < st.high_count: st.err_high += st.high_count else: st.err_low += st.low_count # Как IR_DecoderRaw.cpp / IrFoxDecoder: не более 8 LOW и 8 HIGH за один подъём (i < n && 8 - i). i = 0 while i < st.low_count and (8 - i): if i == st.low_count - 1 and invert_err: invert_err = False write_to_buffer(True, True) else: write_to_buffer(False, False) i += 1 i = 0 while i < st.high_count and (8 - i): if i == st.high_count - 1 and invert_err: invert_err = False write_to_buffer(False, True) else: write_to_buffer(True, False) i += 1 if trace_rise is not None and rising and not skip_rest: # После decode: залогировать только реальные записи битов (не преамбула, не ранний return выше) if ( not st.is_preamb and st.rise_period <= irmax and not st.is_buffer_overflow and st.rise_period >= rise_min ): rec: dict = { "t_us": t_us, "rp": st.rise_period, "ht": st.high_time, "lt": st.low_time, "i_buf": st.i_data_buffer, "n_pkt": len(st.packets), } if around_rise_period(st.rise_period, rise_sync_time): rec["branch"] = "around" rec["bits_out"] = 1 elif RISE_GRAY_SINGLE_BIT_FALLBACK and rise_gray_single_bit_fallback( st.rise_period, rise_sync_time ): rec["branch"] = "gray" rec["bits_out"] = 1 else: rec["branch"] = "ceil" rec["hc"] = st.high_count rec["lc"] = st.low_count rec["ac"] = st.all_count rec["bits_out"] = st.low_count + st.high_count trace_rise.append(rec) def run_file(path: Path, max_packets: int = 0) -> SimState | None: """max_packets=0 — обработать весь файл (для регрессии по всем пакетам).""" rows = parse_raw_csv(path) if rows is None: return None edges = segments_to_edges(rows) st = SimState() rise_sync = BIT_TIME_US for t_us, rising in edges: tick(st, t_us, rising, rise_sync) if max_packets and len(st.packets) >= max_packets: break return st def main() -> None: root = Path(__file__).resolve().parents[1] default_set = [ ("CAR", root / "Analyzer" / "raw" / "car_raw.txt"), ("POINT", root / "Analyzer" / "raw" / "point_raw.txt"), ("CARRAW3", root / "Analyzer" / "raw" / "carraw3.txt"), ("POINTRA3", root / "Analyzer" / "raw" / "pointraw_3.txt"), ] files = default_set if len(sys.argv) >= 2: files = [(Path(p).name, Path(p)) for p in sys.argv[1:]] for label, p in files: if not p.exists(): print(f"skip {label}: {p} not found") continue st = run_file(p, max_packets=0) print(f"=== {label} {p.name} ===") if st is None: print( " (skip: decoder trace CSV Type/bit_idx, or missing Level+Duration; need Saleae level like car_raw.txt)" ) continue n_ok = sum(1 for x in st.packets if x[0]) n_bad = len(st.packets) - n_ok print( f" packets={len(st.packets)} crc_ok={n_ok} crc_bad={n_bad} " f"err_other={st.err_other} err_low={st.err_low} err_high={st.err_high}" ) for i, (ok, psz, raw) in enumerate(st.packets[:12]): hx = raw.hex(" ") print(f" pkt[{i}] crc_ok={ok} len={psz} {hx}") if len(st.packets) > 12: print(f" ... ({len(st.packets) - 12} more packets)") print(f" first 24 bits: {st.bits_log[:24]}") car = root / "Analyzer" / "raw" / "car_raw.txt" point = root / "Analyzer" / "raw" / "point_raw.txt" if car.exists() and point.exists(): sc = run_file(car, max_packets=1) sp = run_file(point, max_packets=1) if sc is None or sp is None: return print("\n=== First packet bit diff (D/S sequence) CAR vs POINT ===") for i, (a, b) in enumerate(zip(sc.bits_log, sp.bits_log)): if a != b: print(f" idx {i}: CAR {a} vs POINT {b}") break else: if len(sc.bits_log) != len(sp.bits_log): print(f" len CAR={len(sc.bits_log)} POINT={len(sp.bits_log)}") else: print(" identical bit logs for min(len)") if __name__ == "__main__": main()