Files
IR-protocol/tools/ir_decoder_sim.py
2026-04-02 17:25:10 +03:00

722 lines
30 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Офлайн-симуляция IR_DecoderRaw::tick + writeToBuffer по журналу @IRF1v1:
См. ref/IR_EDGE_TRACE_FORMAT.md
При FRAME_END: сводка (чистые биты / burst / отброшенные фронты), список точечных
исправлений (преамбула, пропуск такта, поджатие low/high, sync), при crc_ok=False —
разбор несовпадения CRC; в первой строке пакет как hex= и bin= (8 бит на байт MSB-first,
байты через пробел); отдельная строка «синхро» — счётчик ошибок 1-го бита тройки (как
err_syncBit в прошивке) и принятые тройки sync-битов между байтами (2-й и 3-й в прошивке
не сверяются с шаблоном, только пишутся в поток — их значения для наглядности сдвига).
WRONG_PACK_SYNC — отдельное событие с причиной и тем же блоком «синхро».
Флаг -v/--verbose: дополнительно каждая строка для «чистого» бита (aroundRise) и
отброшенных фронтов; без флага эти события только в счётчиках сводки.
Не моделирует IRDEBUG_SERIAL_SOFT_REJECT (жёсткий Wrong sync).
Таймауты: между фронтами, если gap > IR_timeout*2 и isRecive — checkTimeout.
"""
from __future__ import annotations
import argparse
import binascii
import os
import re
import sys
from dataclasses import dataclass, field
from typing import List, Tuple
# --- IR_config.h (как в прошивке, целочисленно) ---
CARRIER_FREC = 38000
CARRIER_PERIOD = 1000000 // CARRIER_FREC
BIT_ACTIVE_TAKTS = 25
BIT_PAUSE_TAKTS = 12
BIT_TAKTS = BIT_ACTIVE_TAKTS + BIT_PAUSE_TAKTS
BIT_TIME = BIT_TAKTS * CARRIER_PERIOD
TOLERANCE = 300
SYNCBITS = 3
BIT_PER_BYTE = 8
MSGBYTES = 1
CRC_BYTES = 2
POLY1 = 0x31
POLY2 = 0x8C
IR_MASK_MSG_INFO = 0x1F
PREAMB_PULSE = 3
PREAMB_FRONTS = PREAMB_PULSE * 2
BYTE_PER_PACK = 31
DATA_BYTE_SIZE_MAX = MSGBYTES + 2 + 2 + BYTE_PER_PACK + CRC_BYTES
FREE_FREC = False
SKIP_DECODE_FLAG = 0x01
# Myagkie tsveta dlya terminala (256-color), ne iarkie default 31/32
_ANSI_GREEN = "\033[38;5;107m" # priglushennyj zelenyj (ne iarkij 32)
_ANSI_RED = "\033[38;5;174m" # pylno-rozovyj / myagkij krasnyj (ne iarkij 31)
_ANSI_RESET = "\033[0m"
# Zolotistyj / birjuzovyj + zhirnyj dlya hex i bin paketa v stroke FRAME_END
_HEX_PACKET_FG = "\033[1;38;5;222m"
_BIN_PACKET_FG = "\033[1;38;5;109m"
def _bytes_bin_msb(data: bytes) -> str:
"""8 bit na bajt (MSB pervyj, kak v writeToBuffer), bajty cherez probel."""
return " ".join(f"{b:08b}" for b in data)
def _use_terminal_color() -> bool:
if os.environ.get("NO_COLOR"):
return False
try:
return sys.stdout.isatty()
except Exception:
return False
def _colorize_block(text: str, ok: bool, enabled: bool) -> str:
if not enabled:
return text
code = _ANSI_GREEN if ok else _ANSI_RED
return f"{code}{text}{_ANSI_RESET}"
def _highlight_hex_bin_in_frame_line(line: str, ok: bool, enabled: bool) -> str:
"""Odna stroka s FRAME_END: vydeljaet hex= i bin=."""
if not enabled or "FRAME_END" not in line:
return line
parent = _ANSI_GREEN if ok else _ANSI_RED
after = "\033[22m" + parent
spans: list[tuple[int, int, str]] = []
mhx = re.search(r"hex=([0-9a-fA-F]+)", line)
if mhx:
spans.append((mhx.start(1), mhx.end(1), _HEX_PACKET_FG))
mbn = re.search(r"bin=([01 ]+)$", line)
if mbn:
spans.append((mbn.start(1), mbn.end(1), _BIN_PACKET_FG))
if not spans:
return line
spans.sort(key=lambda t: t[0])
out: list[str] = []
last = 0
for s, e, col in spans:
out.append(line[last:s])
out.append(col + line[s:e] + after)
last = e
out.append(line[last:])
return "".join(out)
def _highlight_frame_end_payloads(ev: str, ok: bool, enabled: bool) -> str:
"""Vo vseh strokah FRAME_END vydeljaet hex= i bin= (v tom chisle povtor svodki v konce)."""
if not enabled:
return ev
return "\n".join(_highlight_hex_bin_in_frame_line(L, ok, enabled) for L in ev.split("\n"))
def _packet_event_tone(ev: str) -> str | None:
"""'ok' | 'bad' | None — dlya pokrashki celogo bloka sobytija."""
head = ev.split("\n", 1)[0]
if "FRAME_END" in head:
if "crc_ok=True" in head:
return "ok"
if "crc_ok=False" in head:
return "bad"
if "WRONG_PACK_SYNC" in head:
return "bad"
return None
def rise_time_max(rise_sync: int) -> int:
return rise_sync + TOLERANCE
def rise_time_min(rise_sync: int) -> int:
return rise_sync - TOLERANCE
def ir_timeout_us(rise_sync: int) -> int:
return rise_time_max(rise_sync) * (8 + SYNCBITS + 1)
def around_rise(t: int, rise_sync: int) -> bool:
return rise_time_min(rise_sync) < t < rise_time_max(rise_sync)
def ceil_div(val: int, divider: int) -> int:
ret = val // divider
if (val << 4) // divider - (ret << 4) >= 8:
ret += 1
return ret
def crc8(data: bytes, start: int, end: int, poly: int) -> int:
crc = 0xFF
for i in range(start, end):
crc ^= data[i]
for _ in range(8):
if crc & 0x80:
crc = ((crc << 1) ^ poly) & 0xFF
else:
crc = (crc << 1) & 0xFF
return crc
def crc_compute_bytes(data: bytearray, pack_size: int) -> Tuple[int, int, int]:
"""Ожидаемые байты CRC: (crc1_hi, crc2_lo, len_payload) или (-1,-1,-1) при неверном pack_size."""
if pack_size < 1 + CRC_BYTES or pack_size > DATA_BYTE_SIZE_MAX:
return (-1, -1, -1)
ln = pack_size - CRC_BYTES
c1 = crc8(bytes(data), 0, ln, POLY1)
c2 = crc8(bytes(data), 0, ln + 1, POLY2)
return (c1 & 0xFF, c2 & 0xFF, ln)
def crc_check(data: bytearray, pack_size: int) -> bool:
ln = pack_size - CRC_BYTES
c1 = crc8(bytes(data), 0, ln, POLY1)
c2 = crc8(bytes(data), 0, ln + 1, POLY2)
crc = ((c1 << 8) & ~0xFF) | (c2 & 0xFF)
return data[ln] == ((crc >> 8) & 0xFF) and data[ln + 1] == (crc & 0xFF)
def crc_failure_lines(data: bytearray, pack_size: int) -> List[str]:
"""Подробности при несовпадении CRC."""
out: List[str] = []
exp_hi, exp_lo, ln = crc_compute_bytes(data, pack_size)
if ln < 0:
out.append(f" некорректный pack_size={pack_size} (ожидается 1..{DATA_BYTE_SIZE_MAX})")
return out
got_hi = data[pack_size - 2]
got_lo = data[pack_size - 1]
hdr = data[0]
msg_t = (hdr >> 5) & 0x07
out.append(
f" байт[0]=0x{hdr:02x} тип_сообщения={msg_t} заявл._длинаадра={hdr & IR_MASK_MSG_INFO} байт"
)
out.append(
f" правило CRC: crc8(data[0..{ln - 1}], poly1=0x{POLY1:02x}) -> байт[{ln}]; "
f"crc8(data[0..{ln}], poly2=0x{POLY2:02x}) -> байт[{ln + 1}]"
)
out.append(f" ожидалось: {exp_hi:02x} {exp_lo:02x} принято в кадре: {got_hi:02x} {got_lo:02x}")
if got_hi != exp_hi:
out.append(
f" первый байт CRC не совпал - искажены данные [0..{ln - 1}] и/или этот байт CRC"
)
if got_lo != exp_lo:
out.append(
" второй байт CRC не совпал - в poly2 входит уже первый байт CRC; типично сдвиг битовой границы"
)
pl = data[:ln].hex()
out.append(f" полезная нагрузка без CRC ({ln} байт): {pl}")
return out
@dataclass
class EdgeRec:
t_us: int
level: int # 0/1 после фронта
flags: int
def parse_irf1_lines(text: str) -> List[EdgeRec]:
out: List[EdgeRec] = []
pat = re.compile(r"@IRF1v1:([0-9a-fA-F]+)\s*")
for m in pat.finditer(text):
hx = m.group(1)
if len(hx) % 2:
continue
try:
raw = binascii.unhexlify(hx)
except binascii.Error:
continue
if len(raw) < 3:
continue
meta = raw[0]
count = raw[1] | (raw[2] << 8)
need = 3 + count * 6
if len(raw) < need or count > 2000:
continue
p = 3
for _ in range(count):
t = raw[p] | (raw[p + 1] << 8) | (raw[p + 2] << 16) | (raw[p + 3] << 24)
lvl = raw[p + 4]
flg = raw[p + 5]
out.append(EdgeRec(t, lvl & 1, flg))
p += 6
if meta & 1:
pass # overflow — запись могла обрезаться
return out
@dataclass
class DecoderSim:
prev_rise: int = 0
prev_fall: int = 0
rise_period: int = 0
high_time: int = 0
low_time: int = 0
last_edge_time: int = 0
preamb_front_counter: int = 0
is_preamb: bool = False
is_recive: bool = False
is_recive_raw: bool = False
is_wrong_pack: bool = False
is_buffer_overflow: bool = False
rise_sync_time: int = BIT_TIME
high_count: int = 0
low_count: int = 0
all_count: int = 0
i_data_buffer: int = 0
buf_bit_pos: int = 0
next_control_bit: int = BIT_PER_BYTE
is_data: bool = True
i_sync_bit: int = 0
err_sync_bit: int = 0
data_buffer: bytearray = field(default_factory=lambda: bytearray(DATA_BYTE_SIZE_MAX))
pack_size: int = 0
errors_other: int = 0
events: List[str] = field(default_factory=list)
verbose: bool = False
"""Подстроеки/исправления как в IR_DecoderRaw::tick (за текущий пакет)."""
packet_fixes: List[str] = field(default_factory=list)
_fatal_sync_event_sent: bool = False
stat_clean_bits: int = 0
stat_burst_edges: int = 0
stat_debounce_rise: int = 0
stat_debounce_fall: int = 0
# Mezhdu bajtami: trojki sync-bitov (kak v writeToBuffer); oshibka schetaetsja tolko za 1-j bit
stat_sync_first_error: int = 0
sync_groups: List[str] = field(default_factory=list)
sync_bits_current_group: List[int] = field(default_factory=list)
def _fix(self, msg: str) -> None:
self.packet_fixes.append(msg)
def _clear_packet_state(self) -> None:
self.packet_fixes.clear()
self._fatal_sync_event_sent = False
self.stat_clean_bits = 0
self.stat_burst_edges = 0
self.stat_debounce_rise = 0
self.stat_debounce_fall = 0
self.stat_sync_first_error = 0
self.sync_groups.clear()
self.sync_bits_current_group.clear()
def _sync_bit_consumed(self, bit_val: int) -> None:
"""Odin prinjatyj sync-bit (bufBitPos++ v vetke sync v proshivke)."""
self.sync_bits_current_group.append(bit_val & 1)
if len(self.sync_bits_current_group) == SYNCBITS:
self.sync_groups.append("".join(str(b) for b in self.sync_bits_current_group))
self.sync_bits_current_group.clear()
def _sync_summary_lines(self, *, with_firmware_note: bool = False) -> List[str]:
"""Stroki svodki po sinhrobitam dlya FRAME_END i WRONG_PACK_SYNC."""
sg = "/".join(self.sync_groups) if self.sync_groups else ""
lines = [
f" синхро: ошибок_1-го_битаак_в_IR_DecoderRaw)={self.stat_sync_first_error}; "
f"полныхроек={len(self.sync_groups)}; биты_троек={sg}"
]
if self.sync_bits_current_group:
tail = "".join(str(b) for b in self.sync_bits_current_group)
lines.append(f" синхро: незавершённая_тройка (уже приняты биты): {tail}")
if with_firmware_note:
lines.append(
" синхро: в прошивке при ошибке считается только случай «1-й бит тройки совпал с последним "
"data-битом» (errors.other++, err_syncBit); 2-й и 3-й sync-биты не сравниваются с эталоном."
)
return lines
def _emit_wrong_sync_fatal(self, t: int) -> None:
lines = [
f"t={t} WRONG_PACK_SYNC (аналог ERROR: Wrong sync bit в прошивке, err_sync_bit>={SYNCBITS})"
]
if self.packet_fixes:
lines.append(" подстройки и исправления до ошибки:")
for fx in self.packet_fixes:
lines.append(f" - {fx}")
lines.extend(self._sync_summary_lines(with_firmware_note=True))
lines.append(
" причина фатала: повторы неверного 1-го sync-бита накапливают err_syncBit до порога syncBits."
)
self.events.append("\n".join(lines))
def first_rx(self) -> None:
self.is_buffer_overflow = False
self.pack_size = 0
self.buf_bit_pos = 0
self.is_data = True
self.i_data_buffer = 0
self.next_control_bit = BIT_PER_BYTE
self.i_sync_bit = 0
self.err_sync_bit = 0
self.is_wrong_pack = False
self.data_buffer[:] = bytes(DATA_BYTE_SIZE_MAX)
self.rise_sync_time = BIT_TIME
self.stat_sync_first_error = 0
self.sync_groups.clear()
self.sync_bits_current_group.clear()
def listen_start(self, now: int) -> None:
to = ir_timeout_us(self.rise_sync_time)
if self.is_recive_raw and (now - self.prev_rise) > to * 2:
self.events.append(f"t={now} listenStart abort raw (gap from prev_rise)")
self.is_recive_raw = False
self._clear_packet_state()
self.first_rx()
def check_timeout(self, now: int) -> None:
if not self.is_recive:
return
to = ir_timeout_us(self.rise_sync_time)
if now - self.last_edge_time > to * 2:
self.events.append(f"t={now} checkTimeout (gap since last edge)")
self.is_recive = False
self.last_edge_time = now
def write_to_buffer(self, bit_val: int) -> None:
if self.i_data_buffer > DATA_BYTE_SIZE_MAX * 8:
self.is_buffer_overflow = True
self._fix("переполнение буфера битов (writeToBuffer: i_dataBuffer > dataByteSizeMax*8)")
if self.is_buffer_overflow or self.is_preamb or self.is_wrong_pack:
self.is_recive = False
self.is_recive_raw = False
return
if self.buf_bit_pos == self.next_control_bit:
self.next_control_bit += SYNCBITS if self.is_data else BIT_PER_BYTE
self.is_data = not self.is_data
self.i_sync_bit = 0
self.err_sync_bit = 0
if self.is_data:
bi = self.i_data_buffer // 8
self.data_buffer[bi] |= (bit_val & 1) << (7 - (self.i_data_buffer % 8))
self.i_data_buffer += 1
self.buf_bit_pos += 1
else:
if self.i_sync_bit == 0:
prev_bit = (
self.data_buffer[(self.i_data_buffer - 1) // 8]
>> (7 - (self.i_data_buffer - 1) % 8)
) & 1
if bit_val != prev_bit:
self.buf_bit_pos += 1
self.i_sync_bit += 1
self._sync_bit_consumed(bit_val)
else:
self.i_sync_bit = 0
self.errors_other += 1
self.err_sync_bit += 1
self.stat_sync_first_error += 1
self._fix(
f"sync: 1-й sync-бит совпал с последним data-битом (data={prev_bit}); "
f"err_sync_bit={self.err_sync_bit}/{SYNCBITS} (как в прошивке)"
)
if self.err_sync_bit >= SYNCBITS:
self.is_wrong_pack = True
if not self._fatal_sync_event_sent:
self._fatal_sync_event_sent = True
self._emit_wrong_sync_fatal(self.last_edge_time)
else:
self.buf_bit_pos += 1
self.i_sync_bit += 1
self._sync_bit_consumed(bit_val)
self.is_wrong_pack = self.err_sync_bit >= SYNCBITS
if self.is_data and not self.is_wrong_pack:
if self.i_data_buffer == 8 * MSGBYTES:
self.pack_size = self.data_buffer[0] & IR_MASK_MSG_INFO
if self.pack_size and self.i_data_buffer == self.pack_size * BIT_PER_BYTE:
ok = crc_check(self.data_buffer, self.pack_size)
raw = self.data_buffer[: self.pack_size]
hx = raw.hex()
bstr = _bytes_bin_msb(raw)
frame_line = (
f"t={self.last_edge_time} FRAME_END pack={self.pack_size} crc_ok={ok} hex={hx} bin={bstr}"
)
sync_lines = self._sync_summary_lines()
tick_summary = (
f" сводка тактов: чистых_битов_aroundRise={self.stat_clean_bits}, "
f"фронтов_с_burst-коррекцией={self.stat_burst_edges}, "
f"отброшенныхронтов_up={self.stat_debounce_rise}, down={self.stat_debounce_fall}"
)
def _frame_summary_block() -> List[str]:
return [frame_line, *sync_lines, tick_summary]
lines: List[str] = []
lines.extend(_frame_summary_block())
if self.packet_fixes:
if self.verbose:
lines.append(
" подстройки и исправления за пакет, подробный режим (-v) (аналог IR_DecoderRaw::tick):"
)
else:
lines.append(
" подстройки и исправления за пакет (преамбула, пропуск такта, burst, sync; "
"без строк по каждому «чистому» биту — включите -v):"
)
for fx in self.packet_fixes:
lines.append(f" - {fx}")
else:
lines.append(
" дополнительных исправлений нет (см. сводку; для строк по каждому биту: -v/--verbose)"
)
if not ok:
lines.append(" неуспешный пакет — причина:")
lines.extend(crc_failure_lines(self.data_buffer, self.pack_size))
lines.append(" --- сводка пакета (конец записи) ---")
lines.extend(_frame_summary_block())
self.events.append("\n".join(lines))
self.is_recive = False
self.is_recive_raw = False
self._clear_packet_state()
self.first_rx()
def tick_edge(self, t: int, level: int) -> None:
"""Один фронт: level = состояние линии ПОСЛЕ фронта (как dir в C++)."""
self.listen_start(t)
self.last_edge_time = t
rising = level == 1
if rising:
cond = (t - self.prev_rise > rise_time_max(self.rise_sync_time) // 4) or self.high_count or self.low_count
if cond:
self.rise_period = t - self.prev_rise
self.high_time = t - self.prev_fall
self.low_time = self.prev_fall - self.prev_rise
self.prev_rise = t
else:
self.errors_other += 1
self.stat_debounce_rise += 1
if self.verbose:
self._fix(
f"t={t} отброшен фронт ↑: слишком короткий интервал до предыдущего ↑ "
f"(<= riseTimeMax/4 при hc=lc=0), errors.other++"
)
else:
if t - self.prev_fall > rise_time_min(self.rise_sync_time) // 4:
self.prev_fall = t
else:
self.errors_other += 1
self.stat_debounce_fall += 1
if self.verbose:
self._fix(
f"t={t} отброшен фронт ↓: слишком короткий интервал до предыдущего ↓ (<= riseTimeMin/4), errors.other++"
)
rt = self.rise_sync_time
to = ir_timeout_us(rt)
if t > self.prev_rise and (t - self.prev_rise) > to * 2 and not self.is_recive_raw:
self.preamb_front_counter = PREAMB_FRONTS - 1
self.is_preamb = True
self.is_recive = True
self.is_recive_raw = True
self.is_wrong_pack = False
self._clear_packet_state()
self.events.append(f"t={t} PACKET_START (long idle)")
if self.preamb_front_counter:
if rising and self.rise_period < to:
if self.rise_period < rise_time_min(rt) // 2:
self.preamb_front_counter += 2
self.errors_other += 1
self._fix(
f"преамбула: «рваная единица» risePeriod={self.rise_period} us < riseTimeMin/2 "
f"({rise_time_min(rt) // 2} us) -> preambFrontCounter += 2, errors.other++"
)
elif FREE_FREC:
old = self.rise_sync_time
self.rise_sync_time = (self.rise_sync_time + self.rise_period // 2) // 2
self._fix(
f"преамбула: подстройка riseSyncTime {old}->{self.rise_sync_time} us (freeFrec)"
)
self.preamb_front_counter -= 1
else:
if self.is_preamb:
self.is_preamb = False
half = self.rise_period // 2
self.prev_rise += half
self._fix(
f"после преамбулы: prev_rise += risePeriod/2 (+{half} us) - фазовая привязка к центру бита"
)
return
if self.is_preamb:
return
if self.rise_period > to or self.is_buffer_overflow or self.rise_period < rise_time_min(rt) or self.is_wrong_pack:
if self.is_recive and rising and (self.rise_period > to or self.rise_period < rise_time_min(rt)):
reason = (
f"risePeriod={self.rise_period} us: "
+ (f"> IR_timeout={to} us " if self.rise_period > to else "")
+ (f"< riseTimeMin={rise_time_min(rt)} us " if self.rise_period < rise_time_min(rt) else "")
)
self._fix(f"t={t} пропуск такта (goto END): {reason.strip()}")
return
if not rising:
return
self.high_count = 0
self.low_count = 0
self.all_count = 0
invert_err = False
rt = self.rise_sync_time
if around_rise(self.rise_period, rt):
self.stat_clean_bits += 1
bit = 1 if self.high_time > self.low_time else 0
if self.verbose:
self._fix(
f"t={t} «чистый» бит: aroundRise (risePeriod={self.rise_period} us в [{rise_time_min(rt)}..{rise_time_max(rt)}]), "
f"highTime={self.high_time} lowTime={self.low_time} us -> bit {bit}"
)
self.write_to_buffer(bit)
else:
self.stat_burst_edges += 1
self.high_count = ceil_div(self.high_time, rt)
self.low_count = ceil_div(self.low_time, rt)
self.all_count = ceil_div(self.rise_period, rt)
self._fix(
f"t={t} пропуск такта / растяжение: risePeriod={self.rise_period} us вне aroundRise "
f"[{rise_time_min(rt)}..{rise_time_max(rt)}]; "
f"ceil_div: highTime/{rt}->{self.high_count}, lowTime/{rt}->{self.low_count}, risePeriod/{rt}->{self.all_count}"
)
if self.high_count == 0 and self.high_time > rt // 3:
self.high_count += 1
self.errors_other += 1
self._fix(
f"доп. коррекция: highCount был 0 при highTime={self.high_time} > riseTime/3 ({rt // 3}) -> highCount++"
)
if self.low_count + self.high_count > self.all_count:
lo, hi, ac = self.low_count, self.high_count, self.all_count
if self.low_count > self.high_count:
self.low_count = self.all_count - self.high_count
self._fix(
f"поджатие: low+high>{ac} и low>high -> lowCount {lo}->{self.low_count} (лишние нули)"
)
elif self.low_count < self.high_count:
self.high_count = self.all_count - self.low_count
self._fix(
f"поджатие: low+high>{ac} и low<high -> highCount {hi}->{self.high_count} (лишние единицы)"
)
elif self.low_count == self.high_count:
invert_err = True
self.errors_other += self.all_count
self._fix(
f"поджатие: low==high при low+high>{ac} -> invertErr (последний из low-цикла пишется как 1)"
)
i = 0
while i < self.low_count and (8 - i):
if i == self.low_count - 1 and invert_err:
invert_err = False
self.write_to_buffer(1)
else:
self.write_to_buffer(0)
i += 1
i = 0
while i < self.high_count and (8 - i):
if i == self.high_count - 1 and invert_err:
invert_err = False
self.write_to_buffer(0)
else:
self.write_to_buffer(1)
i += 1
def timing_stats(edges: List[EdgeRec]) -> None:
dts: List[int] = []
for i in range(1, len(edges)):
d = edges[i].t_us - edges[i - 1].t_us
if 0 <= d < 1_000_000:
dts.append(d)
if not dts:
print("Нет интервалов для статистики.")
return
dts.sort()
def pct(p: float) -> int:
return dts[int(len(dts) * p)]
print("--- Inter-edge deltas in log (us) ---")
print(f" N={len(dts)} min={dts[0]} p50={pct(0.5)} p90={pct(0.9)} max={dts[-1]}")
print(
f" bitTime(ref)~{BIT_TIME} us aroundRise window ({rise_time_min(BIT_TIME)}..{rise_time_max(BIT_TIME)}) us"
)
# грубые корзины
buckets = [0, 0, 0, 0, 0]
for d in dts:
if d < 200:
buckets[0] += 1
elif d < 600:
buckets[1] += 1
elif d < 1200:
buckets[2] += 1
elif d < 3000:
buckets[3] += 1
else:
buckets[4] += 1
print(f" корзины [0-200) [200-600) [600-1200) [1200-3000) [3000+): {buckets}")
def main() -> int:
ap = argparse.ArgumentParser(description="Симуляция IR decode по @IRF1v1 логу")
ap.add_argument("logfile", nargs="?", default=None, help="Текстовый лог с @IRF1v1:")
ap.add_argument("--include-skipped", action="store_true", help="Подмешивать фронты с SKIP_DECODE (обычно нет)")
ap.add_argument("--max-events", type=int, default=80, help="Макс. событий FRAME_START/END в отчёте")
ap.add_argument(
"--no-color",
action="store_true",
help="Bez ANSI-tsvetov (ili zadajte NO_COLOR v okruzhenii)",
)
ap.add_argument(
"-v",
"--verbose",
action="store_true",
help="Podrobnyj vyvod: kazhdyj chistyj bit (aroundRise), otbrosy frontov; inache tolko svodka",
)
args = ap.parse_args()
if not args.logfile:
print("Укажите путь к логу, например: python tools/ir_decoder_sim.py ref/ISR_self_frontlog.txt")
return 1
text = open(args.logfile, "r", encoding="utf-8", errors="replace").read()
raw_edges = parse_irf1_lines(text)
edges = [e for e in raw_edges if args.include_skipped or not (e.flags & SKIP_DECODE_FLAG)]
edges.sort(key=lambda e: (e.t_us, id(e)))
print(f"Записей фронтов (после фильтра SKIP): {len(edges)} (всего распарсено: {len(raw_edges)})")
timing_stats(edges)
dec = DecoderSim(verbose=args.verbose)
for e in edges:
to_us = ir_timeout_us(dec.rise_sync_time)
if dec.is_recive and dec.last_edge_time > 0 and (e.t_us - dec.last_edge_time) > to_us * 2:
dec.check_timeout(e.t_us)
dec.tick_edge(e.t_us, e.level)
print("--- События декодера (первые N), пакеты разделены пустой строкой ---")
slice_ev = dec.events[: args.max_events]
first_packet = True
use_color = _use_terminal_color() and not args.no_color
for ev in slice_ev:
head = ev.split("\n", 1)[0]
if "PACKET_START" in head:
if not first_packet:
print()
first_packet = False
tone = _packet_event_tone(ev)
if tone is not None:
ok = tone == "ok"
ev_out = _highlight_frame_end_payloads(ev, ok=ok, enabled=use_color)
print(_colorize_block(ev_out, ok=ok, enabled=use_color))
else:
print(ev)
if len(dec.events) > args.max_events:
print(f"... всего событий: {len(dec.events)}")
print(f"errors_other={dec.errors_other} wrong_pack_end={dec.is_wrong_pack} recive={dec.is_recive}")
return 0
if __name__ == "__main__":
sys.exit(main())