#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Офлайн-симуляция IR_DecoderRaw::tick + writeToBuffer по журналу @IRF1v1: См. ref/IR_EDGE_TRACE_FORMAT.md При FRAME_END: сводка (чистые биты / burst / отброшенные фронты), список точечных исправлений (преамбула, пропуск такта, поджатие low/high, sync), при crc_ok=False — разбор несовпадения CRC; в первой строке пакет как hex= и bin= (8 бит на байт MSB-first, байты через пробел); отдельная строка «синхро» — счётчик ошибок 1-го бита тройки (как err_syncBit в прошивке) и принятые тройки sync-битов между байтами (2-й и 3-й в прошивке не сверяются с шаблоном, только пишутся в поток — их значения для наглядности сдвига). WRONG_PACK_SYNC — отдельное событие с причиной и тем же блоком «синхро». Флаг -v/--verbose: дополнительно каждая строка для «чистого» бита (aroundRise) и отброшенных фронтов; без флага эти события только в счётчиках сводки. Не моделирует IRDEBUG_SERIAL_SOFT_REJECT (жёсткий Wrong sync). Таймауты: как IR_DecoderRaw::tick — listenStart и checkTimeout в начале каждого тика (не только при пустых очередях), пауза > IR_timeout*2 по lastEdgeTime; при checkTimeout: isReciveRaw=0, firstRX(), lastEdgeTime=now. """ from __future__ import annotations import argparse import binascii import os import re import sys from dataclasses import dataclass, field from typing import List, Tuple # --- IR_config.h (как в прошивке, целочисленно) --- CARRIER_FREC = 38000 CARRIER_PERIOD = 1000000 // CARRIER_FREC BIT_ACTIVE_TAKTS = 25 BIT_PAUSE_TAKTS = 12 BIT_TAKTS = BIT_ACTIVE_TAKTS + BIT_PAUSE_TAKTS BIT_TIME = BIT_TAKTS * CARRIER_PERIOD TOLERANCE = 300 SYNCBITS = 3 BIT_PER_BYTE = 8 MSGBYTES = 1 CRC_BYTES = 2 POLY1 = 0x31 POLY2 = 0x8C IR_MASK_MSG_INFO = 0x1F PREAMB_PULSE = 3 PREAMB_FRONTS = PREAMB_PULSE * 2 BYTE_PER_PACK = 31 DATA_BYTE_SIZE_MAX = MSGBYTES + 2 + 2 + BYTE_PER_PACK + CRC_BYTES FREE_FREC = False SKIP_DECODE_FLAG = 0x01 # Myagkie tsveta dlya terminala (256-color), ne iarkie default 31/32 _ANSI_GREEN = "\033[38;5;107m" # priglushennyj zelenyj (ne iarkij 32) _ANSI_RED = "\033[38;5;174m" # pylno-rozovyj / myagkij krasnyj (ne iarkij 31) _ANSI_RESET = "\033[0m" # Zolotistyj / birjuzovyj + zhirnyj dlya hex i bin paketa v stroke FRAME_END _HEX_PACKET_FG = "\033[1;38;5;222m" _BIN_PACKET_FG = "\033[1;38;5;109m" def _bytes_bin_msb(data: bytes) -> str: """8 bit na bajt (MSB pervyj, kak v writeToBuffer), bajty cherez probel.""" return " ".join(f"{b:08b}" for b in data) def _use_terminal_color() -> bool: if os.environ.get("NO_COLOR"): return False try: return sys.stdout.isatty() except Exception: return False def _colorize_block(text: str, ok: bool, enabled: bool) -> str: if not enabled: return text code = _ANSI_GREEN if ok else _ANSI_RED return f"{code}{text}{_ANSI_RESET}" def _highlight_hex_bin_in_frame_line(line: str, ok: bool, enabled: bool) -> str: """Odna stroka s FRAME_END: vydeljaet hex= i bin=.""" if not enabled or "FRAME_END" not in line: return line parent = _ANSI_GREEN if ok else _ANSI_RED after = "\033[22m" + parent spans: list[tuple[int, int, str]] = [] mhx = re.search(r"hex=([0-9a-fA-F]+)", line) if mhx: spans.append((mhx.start(1), mhx.end(1), _HEX_PACKET_FG)) mbn = re.search(r"bin=([01 ]+)$", line) if mbn: spans.append((mbn.start(1), mbn.end(1), _BIN_PACKET_FG)) if not spans: return line spans.sort(key=lambda t: t[0]) out: list[str] = [] last = 0 for s, e, col in spans: out.append(line[last:s]) out.append(col + line[s:e] + after) last = e out.append(line[last:]) return "".join(out) def _highlight_frame_end_payloads(ev: str, ok: bool, enabled: bool) -> str: """Vo vseh strokah FRAME_END vydeljaet hex= i bin= (v tom chisle povtor svodki v konce).""" if not enabled: return ev return "\n".join(_highlight_hex_bin_in_frame_line(L, ok, enabled) for L in ev.split("\n")) def _packet_event_tone(ev: str) -> str | None: """'ok' | 'bad' | None — dlya pokrashki celogo bloka sobytija.""" head = ev.split("\n", 1)[0] if "FRAME_END" in head: if "crc_ok=True" in head: return "ok" if "crc_ok=False" in head: return "bad" if "WRONG_PACK_SYNC" in head: return "bad" return None def rise_time_max(rise_sync: int) -> int: return rise_sync + TOLERANCE def rise_time_min(rise_sync: int) -> int: return rise_sync - TOLERANCE def ir_timeout_us(rise_sync: int) -> int: return rise_time_max(rise_sync) * (8 + SYNCBITS + 1) def around_rise(t: int, rise_sync: int) -> bool: return rise_time_min(rise_sync) < t < rise_time_max(rise_sync) def ceil_div(val: int, divider: int) -> int: ret = val // divider if (val << 4) // divider - (ret << 4) >= 8: ret += 1 return ret def crc8(data: bytes, start: int, end: int, poly: int) -> int: crc = 0xFF for i in range(start, end): crc ^= data[i] for _ in range(8): if crc & 0x80: crc = ((crc << 1) ^ poly) & 0xFF else: crc = (crc << 1) & 0xFF return crc def crc_compute_bytes(data: bytearray, pack_size: int) -> Tuple[int, int, int]: """Ожидаемые байты CRC: (crc1_hi, crc2_lo, len_payload) или (-1,-1,-1) при неверном pack_size.""" if pack_size < 1 + CRC_BYTES or pack_size > DATA_BYTE_SIZE_MAX: return (-1, -1, -1) ln = pack_size - CRC_BYTES c1 = crc8(bytes(data), 0, ln, POLY1) c2 = crc8(bytes(data), 0, ln + 1, POLY2) return (c1 & 0xFF, c2 & 0xFF, ln) def crc_check(data: bytearray, pack_size: int) -> bool: ln = pack_size - CRC_BYTES c1 = crc8(bytes(data), 0, ln, POLY1) c2 = crc8(bytes(data), 0, ln + 1, POLY2) crc = ((c1 << 8) & ~0xFF) | (c2 & 0xFF) return data[ln] == ((crc >> 8) & 0xFF) and data[ln + 1] == (crc & 0xFF) def crc_failure_lines(data: bytearray, pack_size: int) -> List[str]: """Подробности при несовпадении CRC.""" out: List[str] = [] exp_hi, exp_lo, ln = crc_compute_bytes(data, pack_size) if ln < 0: out.append(f" некорректный pack_size={pack_size} (ожидается 1..{DATA_BYTE_SIZE_MAX})") return out got_hi = data[pack_size - 2] got_lo = data[pack_size - 1] hdr = data[0] msg_t = (hdr >> 5) & 0x07 out.append( f" байт[0]=0x{hdr:02x} тип_сообщения={msg_t} заявл._длина_кадра={hdr & IR_MASK_MSG_INFO} байт" ) out.append( f" правило CRC: crc8(data[0..{ln - 1}], poly1=0x{POLY1:02x}) -> байт[{ln}]; " f"crc8(data[0..{ln}], poly2=0x{POLY2:02x}) -> байт[{ln + 1}]" ) out.append(f" ожидалось: {exp_hi:02x} {exp_lo:02x} принято в кадре: {got_hi:02x} {got_lo:02x}") if got_hi != exp_hi: out.append( f" первый байт CRC не совпал - искажены данные [0..{ln - 1}] и/или этот байт CRC" ) if got_lo != exp_lo: out.append( " второй байт CRC не совпал - в poly2 входит уже первый байт CRC; типично сдвиг битовой границы" ) pl = data[:ln].hex() out.append(f" полезная нагрузка без CRC ({ln} байт): {pl}") return out @dataclass class EdgeRec: t_us: int level: int # 0/1 после фронта flags: int def parse_irf1_lines(text: str) -> List[EdgeRec]: out: List[EdgeRec] = [] pat = re.compile(r"@IRF1v1:([0-9a-fA-F]+)\s*") for m in pat.finditer(text): hx = m.group(1) if len(hx) % 2: continue try: raw = binascii.unhexlify(hx) except binascii.Error: continue if len(raw) < 3: continue meta = raw[0] count = raw[1] | (raw[2] << 8) need = 3 + count * 6 if len(raw) < need or count > 2000: continue p = 3 for _ in range(count): t = raw[p] | (raw[p + 1] << 8) | (raw[p + 2] << 16) | (raw[p + 3] << 24) lvl = raw[p + 4] flg = raw[p + 5] out.append(EdgeRec(t, lvl & 1, flg)) p += 6 if meta & 1: pass # overflow — запись могла обрезаться return out @dataclass class DecoderSim: prev_rise: int = 0 prev_fall: int = 0 rise_period: int = 0 high_time: int = 0 low_time: int = 0 last_edge_time: int = 0 preamb_front_counter: int = 0 is_preamb: bool = False is_recive: bool = False is_recive_raw: bool = False is_wrong_pack: bool = False is_buffer_overflow: bool = False rise_sync_time: int = BIT_TIME high_count: int = 0 low_count: int = 0 all_count: int = 0 i_data_buffer: int = 0 buf_bit_pos: int = 0 next_control_bit: int = BIT_PER_BYTE is_data: bool = True i_sync_bit: int = 0 err_sync_bit: int = 0 data_buffer: bytearray = field(default_factory=lambda: bytearray(DATA_BYTE_SIZE_MAX)) pack_size: int = 0 errors_other: int = 0 events: List[str] = field(default_factory=list) verbose: bool = False """Подстроеки/исправления как в IR_DecoderRaw::tick (за текущий пакет).""" packet_fixes: List[str] = field(default_factory=list) _fatal_sync_event_sent: bool = False stat_clean_bits: int = 0 stat_burst_edges: int = 0 stat_debounce_rise: int = 0 stat_debounce_fall: int = 0 # Mezhdu bajtami: trojki sync-bitov (kak v writeToBuffer); oshibka schetaetsja tolko za 1-j bit stat_sync_first_error: int = 0 sync_groups: List[str] = field(default_factory=list) sync_bits_current_group: List[int] = field(default_factory=list) def _fix(self, msg: str) -> None: self.packet_fixes.append(msg) def _clear_packet_state(self) -> None: self.packet_fixes.clear() self._fatal_sync_event_sent = False self.stat_clean_bits = 0 self.stat_burst_edges = 0 self.stat_debounce_rise = 0 self.stat_debounce_fall = 0 self.stat_sync_first_error = 0 self.sync_groups.clear() self.sync_bits_current_group.clear() def _sync_bit_consumed(self, bit_val: int) -> None: """Odin prinjatyj sync-bit (bufBitPos++ v vetke sync v proshivke).""" self.sync_bits_current_group.append(bit_val & 1) if len(self.sync_bits_current_group) == SYNCBITS: self.sync_groups.append("".join(str(b) for b in self.sync_bits_current_group)) self.sync_bits_current_group.clear() def _sync_summary_lines(self, *, with_firmware_note: bool = False) -> List[str]: """Stroki svodki po sinhrobitam dlya FRAME_END i WRONG_PACK_SYNC.""" sg = "/".join(self.sync_groups) if self.sync_groups else "—" lines = [ f" синхро: ошибок_1-го_бита(как_в_IR_DecoderRaw)={self.stat_sync_first_error}; " f"полных_троек={len(self.sync_groups)}; биты_троек={sg}" ] if self.sync_bits_current_group: tail = "".join(str(b) for b in self.sync_bits_current_group) lines.append(f" синхро: незавершённая_тройка (уже приняты биты): {tail}") if with_firmware_note: lines.append( " синхро: в прошивке при ошибке считается только случай «1-й бит тройки совпал с последним " "data-битом» (errors.other++, err_syncBit); 2-й и 3-й sync-биты не сравниваются с эталоном." ) return lines def _emit_wrong_sync_fatal(self, t: int) -> None: lines = [ f"t={t} WRONG_PACK_SYNC (аналог ERROR: Wrong sync bit в прошивке, err_sync_bit>={SYNCBITS})" ] if self.packet_fixes: lines.append(" подстройки и исправления до ошибки:") for fx in self.packet_fixes: lines.append(f" - {fx}") lines.extend(self._sync_summary_lines(with_firmware_note=True)) lines.append( " причина фатала: повторы неверного 1-го sync-бита накапливают err_syncBit до порога syncBits." ) self.events.append("\n".join(lines)) def first_rx(self) -> None: self.is_buffer_overflow = False self.pack_size = 0 self.buf_bit_pos = 0 self.is_data = True self.i_data_buffer = 0 self.next_control_bit = BIT_PER_BYTE self.i_sync_bit = 0 self.err_sync_bit = 0 self.is_wrong_pack = False self.data_buffer[:] = bytes(DATA_BYTE_SIZE_MAX) self.rise_sync_time = BIT_TIME self.stat_sync_first_error = 0 self.sync_groups.clear() self.sync_bits_current_group.clear() def listen_start(self, now: int) -> None: to = ir_timeout_us(self.rise_sync_time) if self.is_recive_raw and self.last_edge_time > 0 and (now - self.last_edge_time) > to * 2: self.events.append(f"t={now} listenStart abort raw (gap since last edge, как IR_DecoderRaw)") self.is_recive_raw = False self._clear_packet_state() self.first_rx() def check_timeout(self, now: int) -> None: if not self.is_recive: return to = ir_timeout_us(self.rise_sync_time) if now - self.last_edge_time > to * 2: self.events.append(f"t={now} checkTimeout -> isReciveRaw=0, firstRX() (как IR_DecoderRaw)") self.is_recive = False self.is_recive_raw = False self._clear_packet_state() self.first_rx() # Не last_edge_time = now: в прошивке убрано — расхождение с метками фронтов из очереди. def write_to_buffer(self, bit_val: int) -> None: if self.i_data_buffer > DATA_BYTE_SIZE_MAX * 8: self.is_buffer_overflow = True self._fix("переполнение буфера битов (writeToBuffer: i_dataBuffer > dataByteSizeMax*8)") if self.is_buffer_overflow or self.is_preamb or self.is_wrong_pack: self.is_recive = False self.is_recive_raw = False self.first_rx() return if self.buf_bit_pos == self.next_control_bit: self.next_control_bit += SYNCBITS if self.is_data else BIT_PER_BYTE self.is_data = not self.is_data self.i_sync_bit = 0 self.err_sync_bit = 0 if self.is_data: bi = self.i_data_buffer // 8 self.data_buffer[bi] |= (bit_val & 1) << (7 - (self.i_data_buffer % 8)) self.i_data_buffer += 1 self.buf_bit_pos += 1 else: if self.i_sync_bit == 0: prev_bit = ( self.data_buffer[(self.i_data_buffer - 1) // 8] >> (7 - (self.i_data_buffer - 1) % 8) ) & 1 if bit_val != prev_bit: self.buf_bit_pos += 1 self.i_sync_bit += 1 self._sync_bit_consumed(bit_val) else: self.i_sync_bit = 0 self.errors_other += 1 self.err_sync_bit += 1 self.stat_sync_first_error += 1 self._fix( f"sync: 1-й sync-бит совпал с последним data-битом (data={prev_bit}); " f"err_sync_bit={self.err_sync_bit}/{SYNCBITS} (как в прошивке)" ) if self.err_sync_bit >= SYNCBITS: self.is_wrong_pack = True if not self._fatal_sync_event_sent: self._fatal_sync_event_sent = True self._emit_wrong_sync_fatal(self.last_edge_time) else: self.buf_bit_pos += 1 self.i_sync_bit += 1 self._sync_bit_consumed(bit_val) self.is_wrong_pack = self.err_sync_bit >= SYNCBITS if self.is_data and not self.is_wrong_pack: if self.i_data_buffer == 8 * MSGBYTES: self.pack_size = self.data_buffer[0] & IR_MASK_MSG_INFO if self.pack_size and self.i_data_buffer == self.pack_size * BIT_PER_BYTE: ok = crc_check(self.data_buffer, self.pack_size) raw = self.data_buffer[: self.pack_size] hx = raw.hex() bstr = _bytes_bin_msb(raw) frame_line = ( f"t={self.last_edge_time} FRAME_END pack={self.pack_size} crc_ok={ok} hex={hx} bin={bstr}" ) sync_lines = self._sync_summary_lines() tick_summary = ( f" сводка тактов: чистых_битов_aroundRise={self.stat_clean_bits}, " f"фронтов_с_burst-коррекцией={self.stat_burst_edges}, " f"отброшенных_фронтов_up={self.stat_debounce_rise}, down={self.stat_debounce_fall}" ) def _frame_summary_block() -> List[str]: return [frame_line, *sync_lines, tick_summary] lines: List[str] = [] lines.extend(_frame_summary_block()) if self.packet_fixes: if self.verbose: lines.append( " подстройки и исправления за пакет, подробный режим (-v) (аналог IR_DecoderRaw::tick):" ) else: lines.append( " подстройки и исправления за пакет (преамбула, пропуск такта, burst, sync; " "без строк по каждому «чистому» биту — включите -v):" ) for fx in self.packet_fixes: lines.append(f" - {fx}") else: lines.append( " дополнительных исправлений нет (см. сводку; для строк по каждому биту: -v/--verbose)" ) if not ok: lines.append(" неуспешный пакет — причина:") lines.extend(crc_failure_lines(self.data_buffer, self.pack_size)) lines.append(" --- сводка пакета (конец записи) ---") lines.extend(_frame_summary_block()) self.events.append("\n".join(lines)) self.is_recive = False self.is_recive_raw = False self._clear_packet_state() self.first_rx() def tick_edge(self, t: int, level: int) -> None: """Один фронт: level = состояние линии ПОСЛЕ фронта (как dir в C++).""" self.listen_start(t) to = ir_timeout_us(self.rise_sync_time) if self.is_recive and self.last_edge_time > 0 and (t - self.last_edge_time) > to * 2: self.check_timeout(t) self.last_edge_time = t rising = level == 1 if rising: cond = (t - self.prev_rise > rise_time_max(self.rise_sync_time) // 4) or self.high_count or self.low_count if cond: self.rise_period = t - self.prev_rise self.high_time = t - self.prev_fall self.low_time = self.prev_fall - self.prev_rise self.prev_rise = t else: self.errors_other += 1 self.stat_debounce_rise += 1 if self.verbose: self._fix( f"t={t} отброшен фронт ↑: слишком короткий интервал до предыдущего ↑ " f"(<= riseTimeMax/4 при hc=lc=0), errors.other++" ) else: if t - self.prev_fall > rise_time_min(self.rise_sync_time) // 4: self.prev_fall = t else: self.errors_other += 1 self.stat_debounce_fall += 1 if self.verbose: self._fix( f"t={t} отброшен фронт ↓: слишком короткий интервал до предыдущего ↓ (<= riseTimeMin/4), errors.other++" ) rt = self.rise_sync_time to = ir_timeout_us(rt) if t > self.prev_rise and (t - self.prev_rise) > to * 2 and not self.is_recive_raw: self.preamb_front_counter = PREAMB_FRONTS - 1 self.is_preamb = True self.is_recive = True self.is_recive_raw = True self.is_wrong_pack = False self._clear_packet_state() self.events.append(f"t={t} PACKET_START (long idle)") if self.preamb_front_counter: if rising and self.rise_period < to: if self.rise_period < rise_time_min(rt) // 2: self.preamb_front_counter += 2 self.errors_other += 1 self._fix( f"преамбула: «рваная единица» risePeriod={self.rise_period} us < riseTimeMin/2 " f"({rise_time_min(rt) // 2} us) -> preambFrontCounter += 2, errors.other++" ) elif FREE_FREC: old = self.rise_sync_time self.rise_sync_time = (self.rise_sync_time + self.rise_period // 2) // 2 self._fix( f"преамбула: подстройка riseSyncTime {old}->{self.rise_sync_time} us (freeFrec)" ) self.preamb_front_counter -= 1 else: if self.is_preamb: self.is_preamb = False half = self.rise_period // 2 self.prev_rise += half self._fix( f"после преамбулы: prev_rise += risePeriod/2 (+{half} us) - фазовая привязка к центру бита" ) return if self.is_preamb: return if self.rise_period > to or self.is_buffer_overflow or self.rise_period < rise_time_min(rt) or self.is_wrong_pack: if self.is_recive and rising and (self.rise_period > to or self.rise_period < rise_time_min(rt)): reason = ( f"risePeriod={self.rise_period} us: " + (f"> IR_timeout={to} us " if self.rise_period > to else "") + (f"< riseTimeMin={rise_time_min(rt)} us " if self.rise_period < rise_time_min(rt) else "") ) self._fix(f"t={t} пропуск такта (goto END): {reason.strip()}") return if not rising: return self.high_count = 0 self.low_count = 0 self.all_count = 0 invert_err = False rt = self.rise_sync_time if around_rise(self.rise_period, rt): self.stat_clean_bits += 1 bit = 1 if self.high_time > self.low_time else 0 if self.verbose: self._fix( f"t={t} «чистый» бит: aroundRise (risePeriod={self.rise_period} us в [{rise_time_min(rt)}..{rise_time_max(rt)}]), " f"highTime={self.high_time} lowTime={self.low_time} us -> bit {bit}" ) self.write_to_buffer(bit) else: self.stat_burst_edges += 1 self.high_count = ceil_div(self.high_time, rt) self.low_count = ceil_div(self.low_time, rt) self.all_count = ceil_div(self.rise_period, rt) self._fix( f"t={t} пропуск такта / растяжение: risePeriod={self.rise_period} us вне aroundRise " f"[{rise_time_min(rt)}..{rise_time_max(rt)}]; " f"ceil_div: highTime/{rt}->{self.high_count}, lowTime/{rt}->{self.low_count}, risePeriod/{rt}->{self.all_count}" ) if self.high_count == 0 and self.high_time > rt // 3: self.high_count += 1 self.errors_other += 1 self._fix( f"доп. коррекция: highCount был 0 при highTime={self.high_time} > riseTime/3 ({rt // 3}) -> highCount++" ) if self.low_count + self.high_count > self.all_count: lo, hi, ac = self.low_count, self.high_count, self.all_count if self.low_count > self.high_count: self.low_count = self.all_count - self.high_count self._fix( f"поджатие: low+high>{ac} и low>high -> lowCount {lo}->{self.low_count} (лишние нули)" ) elif self.low_count < self.high_count: self.high_count = self.all_count - self.low_count self._fix( f"поджатие: low+high>{ac} и low highCount {hi}->{self.high_count} (лишние единицы)" ) elif self.low_count == self.high_count: invert_err = True self.errors_other += self.all_count self._fix( f"поджатие: low==high при low+high>{ac} -> invertErr (последний из low-цикла пишется как 1)" ) i = 0 while i < self.low_count and (8 - i): if i == self.low_count - 1 and invert_err: invert_err = False self.write_to_buffer(1) else: self.write_to_buffer(0) i += 1 i = 0 while i < self.high_count and (8 - i): if i == self.high_count - 1 and invert_err: invert_err = False self.write_to_buffer(0) else: self.write_to_buffer(1) i += 1 def timing_stats(edges: List[EdgeRec]) -> None: dts: List[int] = [] for i in range(1, len(edges)): d = edges[i].t_us - edges[i - 1].t_us if 0 <= d < 1_000_000: dts.append(d) if not dts: print("Нет интервалов для статистики.") return dts.sort() def pct(p: float) -> int: return dts[int(len(dts) * p)] print("--- Inter-edge deltas in log (us) ---") print(f" N={len(dts)} min={dts[0]} p50={pct(0.5)} p90={pct(0.9)} max={dts[-1]}") print( f" bitTime(ref)~{BIT_TIME} us aroundRise window ({rise_time_min(BIT_TIME)}..{rise_time_max(BIT_TIME)}) us" ) # грубые корзины buckets = [0, 0, 0, 0, 0] for d in dts: if d < 200: buckets[0] += 1 elif d < 600: buckets[1] += 1 elif d < 1200: buckets[2] += 1 elif d < 3000: buckets[3] += 1 else: buckets[4] += 1 print(f" корзины [0-200) [200-600) [600-1200) [1200-3000) [3000+): {buckets}") def main() -> int: ap = argparse.ArgumentParser(description="Симуляция IR decode по @IRF1v1 логу") ap.add_argument("logfile", nargs="?", default=None, help="Текстовый лог с @IRF1v1:") ap.add_argument("--include-skipped", action="store_true", help="Подмешивать фронты с SKIP_DECODE (обычно нет)") ap.add_argument("--max-events", type=int, default=80, help="Макс. событий FRAME_START/END в отчёте") ap.add_argument( "--no-color", action="store_true", help="Bez ANSI-tsvetov (ili zadajte NO_COLOR v okruzhenii)", ) ap.add_argument( "-v", "--verbose", action="store_true", help="Podrobnyj vyvod: kazhdyj chistyj bit (aroundRise), otbrosy frontov; inache tolko svodka", ) args = ap.parse_args() if not args.logfile: print("Укажите путь к логу, например: python tools/ir_decoder_sim.py ref/ISR_self_frontlog.txt") return 1 text = open(args.logfile, "r", encoding="utf-8", errors="replace").read() raw_edges = parse_irf1_lines(text) edges = [e for e in raw_edges if args.include_skipped or not (e.flags & SKIP_DECODE_FLAG)] edges.sort(key=lambda e: (e.t_us, id(e))) print(f"Записей фронтов (после фильтра SKIP): {len(edges)} (всего распарсено: {len(raw_edges)})") timing_stats(edges) dec = DecoderSim(verbose=args.verbose) for e in edges: dec.tick_edge(e.t_us, e.level) print("--- События декодера (первые N), пакеты разделены пустой строкой ---") slice_ev = dec.events[: args.max_events] first_packet = True use_color = _use_terminal_color() and not args.no_color for ev in slice_ev: head = ev.split("\n", 1)[0] if "PACKET_START" in head: if not first_packet: print() first_packet = False tone = _packet_event_tone(ev) if tone is not None: ok = tone == "ok" ev_out = _highlight_frame_end_payloads(ev, ok=ok, enabled=use_color) print(_colorize_block(ev_out, ok=ok, enabled=use_color)) else: print(ev) if len(dec.events) > args.max_events: print(f"... всего событий: {len(dec.events)}") print(f"errors_other={dec.errors_other} wrong_pack_end={dec.is_wrong_pack} recive={dec.is_recive}") return 0 if __name__ == "__main__": sys.exit(main())