mirror of
https://github.com/Show-maket/IR-protocol.git
synced 2026-04-28 03:08:08 +00:00
727 lines
30 KiB
Python
727 lines
30 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
Офлайн-симуляция IR_DecoderRaw::tick + writeToBuffer по журналу @IRF1v1:
|
||
См. ref/IR_EDGE_TRACE_FORMAT.md
|
||
|
||
При FRAME_END: сводка (чистые биты / burst / отброшенные фронты), список точечных
|
||
исправлений (преамбула, пропуск такта, поджатие low/high, sync), при crc_ok=False —
|
||
разбор несовпадения CRC; в первой строке пакет как hex= и bin= (8 бит на байт MSB-first,
|
||
байты через пробел); отдельная строка «синхро» — счётчик ошибок 1-го бита тройки (как
|
||
err_syncBit в прошивке) и принятые тройки sync-битов между байтами (2-й и 3-й в прошивке
|
||
не сверяются с шаблоном, только пишутся в поток — их значения для наглядности сдвига).
|
||
WRONG_PACK_SYNC — отдельное событие с причиной и тем же блоком «синхро».
|
||
|
||
Флаг -v/--verbose: дополнительно каждая строка для «чистого» бита (aroundRise) и
|
||
отброшенных фронтов; без флага эти события только в счётчиках сводки.
|
||
|
||
Не моделирует IRDEBUG_SERIAL_SOFT_REJECT (жёсткий Wrong sync).
|
||
Таймауты: как IR_DecoderRaw::tick — listenStart и checkTimeout в начале каждого тика (не только при пустых
|
||
очередях), пауза > IR_timeout*2 по lastEdgeTime; при checkTimeout: isReciveRaw=0, firstRX(), lastEdgeTime=now.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import binascii
|
||
import os
|
||
import re
|
||
import sys
|
||
from dataclasses import dataclass, field
|
||
from typing import List, Tuple
|
||
|
||
# --- IR_config.h (как в прошивке, целочисленно) ---
|
||
CARRIER_FREC = 38000
|
||
CARRIER_PERIOD = 1000000 // CARRIER_FREC
|
||
BIT_ACTIVE_TAKTS = 25
|
||
BIT_PAUSE_TAKTS = 12
|
||
BIT_TAKTS = BIT_ACTIVE_TAKTS + BIT_PAUSE_TAKTS
|
||
BIT_TIME = BIT_TAKTS * CARRIER_PERIOD
|
||
TOLERANCE = 300
|
||
SYNCBITS = 3
|
||
BIT_PER_BYTE = 8
|
||
MSGBYTES = 1
|
||
CRC_BYTES = 2
|
||
POLY1 = 0x31
|
||
POLY2 = 0x8C
|
||
IR_MASK_MSG_INFO = 0x1F
|
||
PREAMB_PULSE = 3
|
||
PREAMB_FRONTS = PREAMB_PULSE * 2
|
||
BYTE_PER_PACK = 31
|
||
DATA_BYTE_SIZE_MAX = MSGBYTES + 2 + 2 + BYTE_PER_PACK + CRC_BYTES
|
||
FREE_FREC = False
|
||
SKIP_DECODE_FLAG = 0x01
|
||
|
||
# Myagkie tsveta dlya terminala (256-color), ne iarkie default 31/32
|
||
_ANSI_GREEN = "\033[38;5;107m" # priglushennyj zelenyj (ne iarkij 32)
|
||
_ANSI_RED = "\033[38;5;174m" # pylno-rozovyj / myagkij krasnyj (ne iarkij 31)
|
||
_ANSI_RESET = "\033[0m"
|
||
# Zolotistyj / birjuzovyj + zhirnyj dlya hex i bin paketa v stroke FRAME_END
|
||
_HEX_PACKET_FG = "\033[1;38;5;222m"
|
||
_BIN_PACKET_FG = "\033[1;38;5;109m"
|
||
|
||
|
||
def _bytes_bin_msb(data: bytes) -> str:
|
||
"""8 bit na bajt (MSB pervyj, kak v writeToBuffer), bajty cherez probel."""
|
||
return " ".join(f"{b:08b}" for b in data)
|
||
|
||
|
||
def _use_terminal_color() -> bool:
|
||
if os.environ.get("NO_COLOR"):
|
||
return False
|
||
try:
|
||
return sys.stdout.isatty()
|
||
except Exception:
|
||
return False
|
||
|
||
|
||
def _colorize_block(text: str, ok: bool, enabled: bool) -> str:
|
||
if not enabled:
|
||
return text
|
||
code = _ANSI_GREEN if ok else _ANSI_RED
|
||
return f"{code}{text}{_ANSI_RESET}"
|
||
|
||
|
||
def _highlight_hex_bin_in_frame_line(line: str, ok: bool, enabled: bool) -> str:
|
||
"""Odna stroka s FRAME_END: vydeljaet hex= i bin=."""
|
||
if not enabled or "FRAME_END" not in line:
|
||
return line
|
||
parent = _ANSI_GREEN if ok else _ANSI_RED
|
||
after = "\033[22m" + parent
|
||
spans: list[tuple[int, int, str]] = []
|
||
mhx = re.search(r"hex=([0-9a-fA-F]+)", line)
|
||
if mhx:
|
||
spans.append((mhx.start(1), mhx.end(1), _HEX_PACKET_FG))
|
||
mbn = re.search(r"bin=([01 ]+)$", line)
|
||
if mbn:
|
||
spans.append((mbn.start(1), mbn.end(1), _BIN_PACKET_FG))
|
||
if not spans:
|
||
return line
|
||
spans.sort(key=lambda t: t[0])
|
||
out: list[str] = []
|
||
last = 0
|
||
for s, e, col in spans:
|
||
out.append(line[last:s])
|
||
out.append(col + line[s:e] + after)
|
||
last = e
|
||
out.append(line[last:])
|
||
return "".join(out)
|
||
|
||
|
||
def _highlight_frame_end_payloads(ev: str, ok: bool, enabled: bool) -> str:
|
||
"""Vo vseh strokah FRAME_END vydeljaet hex= i bin= (v tom chisle povtor svodki v konce)."""
|
||
if not enabled:
|
||
return ev
|
||
return "\n".join(_highlight_hex_bin_in_frame_line(L, ok, enabled) for L in ev.split("\n"))
|
||
|
||
|
||
def _packet_event_tone(ev: str) -> str | None:
|
||
"""'ok' | 'bad' | None — dlya pokrashki celogo bloka sobytija."""
|
||
head = ev.split("\n", 1)[0]
|
||
if "FRAME_END" in head:
|
||
if "crc_ok=True" in head:
|
||
return "ok"
|
||
if "crc_ok=False" in head:
|
||
return "bad"
|
||
if "WRONG_PACK_SYNC" in head:
|
||
return "bad"
|
||
return None
|
||
|
||
|
||
def rise_time_max(rise_sync: int) -> int:
|
||
return rise_sync + TOLERANCE
|
||
|
||
|
||
def rise_time_min(rise_sync: int) -> int:
|
||
return rise_sync - TOLERANCE
|
||
|
||
|
||
def ir_timeout_us(rise_sync: int) -> int:
|
||
return rise_time_max(rise_sync) * (8 + SYNCBITS + 1)
|
||
|
||
|
||
def around_rise(t: int, rise_sync: int) -> bool:
|
||
return rise_time_min(rise_sync) < t < rise_time_max(rise_sync)
|
||
|
||
|
||
def ceil_div(val: int, divider: int) -> int:
|
||
ret = val // divider
|
||
if (val << 4) // divider - (ret << 4) >= 8:
|
||
ret += 1
|
||
return ret
|
||
|
||
|
||
def crc8(data: bytes, start: int, end: int, poly: int) -> int:
|
||
crc = 0xFF
|
||
for i in range(start, end):
|
||
crc ^= data[i]
|
||
for _ in range(8):
|
||
if crc & 0x80:
|
||
crc = ((crc << 1) ^ poly) & 0xFF
|
||
else:
|
||
crc = (crc << 1) & 0xFF
|
||
return crc
|
||
|
||
|
||
def crc_compute_bytes(data: bytearray, pack_size: int) -> Tuple[int, int, int]:
|
||
"""Ожидаемые байты CRC: (crc1_hi, crc2_lo, len_payload) или (-1,-1,-1) при неверном pack_size."""
|
||
if pack_size < 1 + CRC_BYTES or pack_size > DATA_BYTE_SIZE_MAX:
|
||
return (-1, -1, -1)
|
||
ln = pack_size - CRC_BYTES
|
||
c1 = crc8(bytes(data), 0, ln, POLY1)
|
||
c2 = crc8(bytes(data), 0, ln + 1, POLY2)
|
||
return (c1 & 0xFF, c2 & 0xFF, ln)
|
||
|
||
|
||
def crc_check(data: bytearray, pack_size: int) -> bool:
|
||
ln = pack_size - CRC_BYTES
|
||
c1 = crc8(bytes(data), 0, ln, POLY1)
|
||
c2 = crc8(bytes(data), 0, ln + 1, POLY2)
|
||
crc = ((c1 << 8) & ~0xFF) | (c2 & 0xFF)
|
||
return data[ln] == ((crc >> 8) & 0xFF) and data[ln + 1] == (crc & 0xFF)
|
||
|
||
|
||
def crc_failure_lines(data: bytearray, pack_size: int) -> List[str]:
|
||
"""Подробности при несовпадении CRC."""
|
||
out: List[str] = []
|
||
exp_hi, exp_lo, ln = crc_compute_bytes(data, pack_size)
|
||
if ln < 0:
|
||
out.append(f" некорректный pack_size={pack_size} (ожидается 1..{DATA_BYTE_SIZE_MAX})")
|
||
return out
|
||
got_hi = data[pack_size - 2]
|
||
got_lo = data[pack_size - 1]
|
||
hdr = data[0]
|
||
msg_t = (hdr >> 5) & 0x07
|
||
out.append(
|
||
f" байт[0]=0x{hdr:02x} тип_сообщения={msg_t} заявл._длина_кадра={hdr & IR_MASK_MSG_INFO} байт"
|
||
)
|
||
out.append(
|
||
f" правило CRC: crc8(data[0..{ln - 1}], poly1=0x{POLY1:02x}) -> байт[{ln}]; "
|
||
f"crc8(data[0..{ln}], poly2=0x{POLY2:02x}) -> байт[{ln + 1}]"
|
||
)
|
||
out.append(f" ожидалось: {exp_hi:02x} {exp_lo:02x} принято в кадре: {got_hi:02x} {got_lo:02x}")
|
||
if got_hi != exp_hi:
|
||
out.append(
|
||
f" первый байт CRC не совпал - искажены данные [0..{ln - 1}] и/или этот байт CRC"
|
||
)
|
||
if got_lo != exp_lo:
|
||
out.append(
|
||
" второй байт CRC не совпал - в poly2 входит уже первый байт CRC; типично сдвиг битовой границы"
|
||
)
|
||
pl = data[:ln].hex()
|
||
out.append(f" полезная нагрузка без CRC ({ln} байт): {pl}")
|
||
return out
|
||
|
||
|
||
@dataclass
|
||
class EdgeRec:
|
||
t_us: int
|
||
level: int # 0/1 после фронта
|
||
flags: int
|
||
|
||
|
||
def parse_irf1_lines(text: str) -> List[EdgeRec]:
|
||
out: List[EdgeRec] = []
|
||
pat = re.compile(r"@IRF1v1:([0-9a-fA-F]+)\s*")
|
||
for m in pat.finditer(text):
|
||
hx = m.group(1)
|
||
if len(hx) % 2:
|
||
continue
|
||
try:
|
||
raw = binascii.unhexlify(hx)
|
||
except binascii.Error:
|
||
continue
|
||
if len(raw) < 3:
|
||
continue
|
||
meta = raw[0]
|
||
count = raw[1] | (raw[2] << 8)
|
||
need = 3 + count * 6
|
||
if len(raw) < need or count > 2000:
|
||
continue
|
||
p = 3
|
||
for _ in range(count):
|
||
t = raw[p] | (raw[p + 1] << 8) | (raw[p + 2] << 16) | (raw[p + 3] << 24)
|
||
lvl = raw[p + 4]
|
||
flg = raw[p + 5]
|
||
out.append(EdgeRec(t, lvl & 1, flg))
|
||
p += 6
|
||
if meta & 1:
|
||
pass # overflow — запись могла обрезаться
|
||
return out
|
||
|
||
|
||
@dataclass
|
||
class DecoderSim:
|
||
prev_rise: int = 0
|
||
prev_fall: int = 0
|
||
rise_period: int = 0
|
||
high_time: int = 0
|
||
low_time: int = 0
|
||
last_edge_time: int = 0
|
||
preamb_front_counter: int = 0
|
||
is_preamb: bool = False
|
||
is_recive: bool = False
|
||
is_recive_raw: bool = False
|
||
is_wrong_pack: bool = False
|
||
is_buffer_overflow: bool = False
|
||
rise_sync_time: int = BIT_TIME
|
||
high_count: int = 0
|
||
low_count: int = 0
|
||
all_count: int = 0
|
||
i_data_buffer: int = 0
|
||
buf_bit_pos: int = 0
|
||
next_control_bit: int = BIT_PER_BYTE
|
||
is_data: bool = True
|
||
i_sync_bit: int = 0
|
||
err_sync_bit: int = 0
|
||
data_buffer: bytearray = field(default_factory=lambda: bytearray(DATA_BYTE_SIZE_MAX))
|
||
pack_size: int = 0
|
||
errors_other: int = 0
|
||
events: List[str] = field(default_factory=list)
|
||
verbose: bool = False
|
||
"""Подстроеки/исправления как в IR_DecoderRaw::tick (за текущий пакет)."""
|
||
packet_fixes: List[str] = field(default_factory=list)
|
||
_fatal_sync_event_sent: bool = False
|
||
stat_clean_bits: int = 0
|
||
stat_burst_edges: int = 0
|
||
stat_debounce_rise: int = 0
|
||
stat_debounce_fall: int = 0
|
||
# Mezhdu bajtami: trojki sync-bitov (kak v writeToBuffer); oshibka schetaetsja tolko za 1-j bit
|
||
stat_sync_first_error: int = 0
|
||
sync_groups: List[str] = field(default_factory=list)
|
||
sync_bits_current_group: List[int] = field(default_factory=list)
|
||
|
||
def _fix(self, msg: str) -> None:
|
||
self.packet_fixes.append(msg)
|
||
|
||
def _clear_packet_state(self) -> None:
|
||
self.packet_fixes.clear()
|
||
self._fatal_sync_event_sent = False
|
||
self.stat_clean_bits = 0
|
||
self.stat_burst_edges = 0
|
||
self.stat_debounce_rise = 0
|
||
self.stat_debounce_fall = 0
|
||
self.stat_sync_first_error = 0
|
||
self.sync_groups.clear()
|
||
self.sync_bits_current_group.clear()
|
||
|
||
def _sync_bit_consumed(self, bit_val: int) -> None:
|
||
"""Odin prinjatyj sync-bit (bufBitPos++ v vetke sync v proshivke)."""
|
||
self.sync_bits_current_group.append(bit_val & 1)
|
||
if len(self.sync_bits_current_group) == SYNCBITS:
|
||
self.sync_groups.append("".join(str(b) for b in self.sync_bits_current_group))
|
||
self.sync_bits_current_group.clear()
|
||
|
||
def _sync_summary_lines(self, *, with_firmware_note: bool = False) -> List[str]:
|
||
"""Stroki svodki po sinhrobitam dlya FRAME_END i WRONG_PACK_SYNC."""
|
||
sg = "/".join(self.sync_groups) if self.sync_groups else "—"
|
||
lines = [
|
||
f" синхро: ошибок_1-го_бита(как_в_IR_DecoderRaw)={self.stat_sync_first_error}; "
|
||
f"полных_троек={len(self.sync_groups)}; биты_троек={sg}"
|
||
]
|
||
if self.sync_bits_current_group:
|
||
tail = "".join(str(b) for b in self.sync_bits_current_group)
|
||
lines.append(f" синхро: незавершённая_тройка (уже приняты биты): {tail}")
|
||
if with_firmware_note:
|
||
lines.append(
|
||
" синхро: в прошивке при ошибке считается только случай «1-й бит тройки совпал с последним "
|
||
"data-битом» (errors.other++, err_syncBit); 2-й и 3-й sync-биты не сравниваются с эталоном."
|
||
)
|
||
return lines
|
||
|
||
def _emit_wrong_sync_fatal(self, t: int) -> None:
|
||
lines = [
|
||
f"t={t} WRONG_PACK_SYNC (аналог ERROR: Wrong sync bit в прошивке, err_sync_bit>={SYNCBITS})"
|
||
]
|
||
if self.packet_fixes:
|
||
lines.append(" подстройки и исправления до ошибки:")
|
||
for fx in self.packet_fixes:
|
||
lines.append(f" - {fx}")
|
||
lines.extend(self._sync_summary_lines(with_firmware_note=True))
|
||
lines.append(
|
||
" причина фатала: повторы неверного 1-го sync-бита накапливают err_syncBit до порога syncBits."
|
||
)
|
||
self.events.append("\n".join(lines))
|
||
|
||
def first_rx(self) -> None:
|
||
self.is_buffer_overflow = False
|
||
self.pack_size = 0
|
||
self.buf_bit_pos = 0
|
||
self.is_data = True
|
||
self.i_data_buffer = 0
|
||
self.next_control_bit = BIT_PER_BYTE
|
||
self.i_sync_bit = 0
|
||
self.err_sync_bit = 0
|
||
self.is_wrong_pack = False
|
||
self.data_buffer[:] = bytes(DATA_BYTE_SIZE_MAX)
|
||
self.rise_sync_time = BIT_TIME
|
||
self.stat_sync_first_error = 0
|
||
self.sync_groups.clear()
|
||
self.sync_bits_current_group.clear()
|
||
|
||
def listen_start(self, now: int) -> None:
|
||
to = ir_timeout_us(self.rise_sync_time)
|
||
if self.is_recive_raw and self.last_edge_time > 0 and (now - self.last_edge_time) > to * 2:
|
||
self.events.append(f"t={now} listenStart abort raw (gap since last edge, как IR_DecoderRaw)")
|
||
self.is_recive_raw = False
|
||
self._clear_packet_state()
|
||
self.first_rx()
|
||
|
||
def check_timeout(self, now: int) -> None:
|
||
if not self.is_recive:
|
||
return
|
||
to = ir_timeout_us(self.rise_sync_time)
|
||
if now - self.last_edge_time > to * 2:
|
||
self.events.append(f"t={now} checkTimeout -> isReciveRaw=0, firstRX() (как IR_DecoderRaw)")
|
||
self.is_recive = False
|
||
self.is_recive_raw = False
|
||
self._clear_packet_state()
|
||
self.first_rx()
|
||
# Не last_edge_time = now: в прошивке убрано — расхождение с метками фронтов из очереди.
|
||
|
||
def write_to_buffer(self, bit_val: int) -> None:
|
||
if self.i_data_buffer > DATA_BYTE_SIZE_MAX * 8:
|
||
self.is_buffer_overflow = True
|
||
self._fix("переполнение буфера битов (writeToBuffer: i_dataBuffer > dataByteSizeMax*8)")
|
||
if self.is_buffer_overflow or self.is_preamb or self.is_wrong_pack:
|
||
self.is_recive = False
|
||
self.is_recive_raw = False
|
||
self.first_rx()
|
||
return
|
||
|
||
if self.buf_bit_pos == self.next_control_bit:
|
||
self.next_control_bit += SYNCBITS if self.is_data else BIT_PER_BYTE
|
||
self.is_data = not self.is_data
|
||
self.i_sync_bit = 0
|
||
self.err_sync_bit = 0
|
||
|
||
if self.is_data:
|
||
bi = self.i_data_buffer // 8
|
||
self.data_buffer[bi] |= (bit_val & 1) << (7 - (self.i_data_buffer % 8))
|
||
self.i_data_buffer += 1
|
||
self.buf_bit_pos += 1
|
||
else:
|
||
if self.i_sync_bit == 0:
|
||
prev_bit = (
|
||
self.data_buffer[(self.i_data_buffer - 1) // 8]
|
||
>> (7 - (self.i_data_buffer - 1) % 8)
|
||
) & 1
|
||
if bit_val != prev_bit:
|
||
self.buf_bit_pos += 1
|
||
self.i_sync_bit += 1
|
||
self._sync_bit_consumed(bit_val)
|
||
else:
|
||
self.i_sync_bit = 0
|
||
self.errors_other += 1
|
||
self.err_sync_bit += 1
|
||
self.stat_sync_first_error += 1
|
||
self._fix(
|
||
f"sync: 1-й sync-бит совпал с последним data-битом (data={prev_bit}); "
|
||
f"err_sync_bit={self.err_sync_bit}/{SYNCBITS} (как в прошивке)"
|
||
)
|
||
if self.err_sync_bit >= SYNCBITS:
|
||
self.is_wrong_pack = True
|
||
if not self._fatal_sync_event_sent:
|
||
self._fatal_sync_event_sent = True
|
||
self._emit_wrong_sync_fatal(self.last_edge_time)
|
||
else:
|
||
self.buf_bit_pos += 1
|
||
self.i_sync_bit += 1
|
||
self._sync_bit_consumed(bit_val)
|
||
self.is_wrong_pack = self.err_sync_bit >= SYNCBITS
|
||
|
||
if self.is_data and not self.is_wrong_pack:
|
||
if self.i_data_buffer == 8 * MSGBYTES:
|
||
self.pack_size = self.data_buffer[0] & IR_MASK_MSG_INFO
|
||
if self.pack_size and self.i_data_buffer == self.pack_size * BIT_PER_BYTE:
|
||
ok = crc_check(self.data_buffer, self.pack_size)
|
||
raw = self.data_buffer[: self.pack_size]
|
||
hx = raw.hex()
|
||
bstr = _bytes_bin_msb(raw)
|
||
frame_line = (
|
||
f"t={self.last_edge_time} FRAME_END pack={self.pack_size} crc_ok={ok} hex={hx} bin={bstr}"
|
||
)
|
||
sync_lines = self._sync_summary_lines()
|
||
tick_summary = (
|
||
f" сводка тактов: чистых_битов_aroundRise={self.stat_clean_bits}, "
|
||
f"фронтов_с_burst-коррекцией={self.stat_burst_edges}, "
|
||
f"отброшенных_фронтов_up={self.stat_debounce_rise}, down={self.stat_debounce_fall}"
|
||
)
|
||
|
||
def _frame_summary_block() -> List[str]:
|
||
return [frame_line, *sync_lines, tick_summary]
|
||
|
||
lines: List[str] = []
|
||
lines.extend(_frame_summary_block())
|
||
if self.packet_fixes:
|
||
if self.verbose:
|
||
lines.append(
|
||
" подстройки и исправления за пакет, подробный режим (-v) (аналог IR_DecoderRaw::tick):"
|
||
)
|
||
else:
|
||
lines.append(
|
||
" подстройки и исправления за пакет (преамбула, пропуск такта, burst, sync; "
|
||
"без строк по каждому «чистому» биту — включите -v):"
|
||
)
|
||
for fx in self.packet_fixes:
|
||
lines.append(f" - {fx}")
|
||
else:
|
||
lines.append(
|
||
" дополнительных исправлений нет (см. сводку; для строк по каждому биту: -v/--verbose)"
|
||
)
|
||
if not ok:
|
||
lines.append(" неуспешный пакет — причина:")
|
||
lines.extend(crc_failure_lines(self.data_buffer, self.pack_size))
|
||
lines.append(" --- сводка пакета (конец записи) ---")
|
||
lines.extend(_frame_summary_block())
|
||
self.events.append("\n".join(lines))
|
||
self.is_recive = False
|
||
self.is_recive_raw = False
|
||
self._clear_packet_state()
|
||
self.first_rx()
|
||
|
||
def tick_edge(self, t: int, level: int) -> None:
|
||
"""Один фронт: level = состояние линии ПОСЛЕ фронта (как dir в C++)."""
|
||
self.listen_start(t)
|
||
to = ir_timeout_us(self.rise_sync_time)
|
||
if self.is_recive and self.last_edge_time > 0 and (t - self.last_edge_time) > to * 2:
|
||
self.check_timeout(t)
|
||
self.last_edge_time = t
|
||
rising = level == 1
|
||
|
||
if rising:
|
||
cond = (t - self.prev_rise > rise_time_max(self.rise_sync_time) // 4) or self.high_count or self.low_count
|
||
if cond:
|
||
self.rise_period = t - self.prev_rise
|
||
self.high_time = t - self.prev_fall
|
||
self.low_time = self.prev_fall - self.prev_rise
|
||
self.prev_rise = t
|
||
else:
|
||
self.errors_other += 1
|
||
self.stat_debounce_rise += 1
|
||
if self.verbose:
|
||
self._fix(
|
||
f"t={t} отброшен фронт ↑: слишком короткий интервал до предыдущего ↑ "
|
||
f"(<= riseTimeMax/4 при hc=lc=0), errors.other++"
|
||
)
|
||
else:
|
||
if t - self.prev_fall > rise_time_min(self.rise_sync_time) // 4:
|
||
self.prev_fall = t
|
||
else:
|
||
self.errors_other += 1
|
||
self.stat_debounce_fall += 1
|
||
if self.verbose:
|
||
self._fix(
|
||
f"t={t} отброшен фронт ↓: слишком короткий интервал до предыдущего ↓ (<= riseTimeMin/4), errors.other++"
|
||
)
|
||
|
||
rt = self.rise_sync_time
|
||
to = ir_timeout_us(rt)
|
||
if t > self.prev_rise and (t - self.prev_rise) > to * 2 and not self.is_recive_raw:
|
||
self.preamb_front_counter = PREAMB_FRONTS - 1
|
||
self.is_preamb = True
|
||
self.is_recive = True
|
||
self.is_recive_raw = True
|
||
self.is_wrong_pack = False
|
||
self._clear_packet_state()
|
||
self.events.append(f"t={t} PACKET_START (long idle)")
|
||
|
||
if self.preamb_front_counter:
|
||
if rising and self.rise_period < to:
|
||
if self.rise_period < rise_time_min(rt) // 2:
|
||
self.preamb_front_counter += 2
|
||
self.errors_other += 1
|
||
self._fix(
|
||
f"преамбула: «рваная единица» risePeriod={self.rise_period} us < riseTimeMin/2 "
|
||
f"({rise_time_min(rt) // 2} us) -> preambFrontCounter += 2, errors.other++"
|
||
)
|
||
elif FREE_FREC:
|
||
old = self.rise_sync_time
|
||
self.rise_sync_time = (self.rise_sync_time + self.rise_period // 2) // 2
|
||
self._fix(
|
||
f"преамбула: подстройка riseSyncTime {old}->{self.rise_sync_time} us (freeFrec)"
|
||
)
|
||
self.preamb_front_counter -= 1
|
||
else:
|
||
if self.is_preamb:
|
||
self.is_preamb = False
|
||
half = self.rise_period // 2
|
||
self.prev_rise += half
|
||
self._fix(
|
||
f"после преамбулы: prev_rise += risePeriod/2 (+{half} us) - фазовая привязка к центру бита"
|
||
)
|
||
return
|
||
|
||
if self.is_preamb:
|
||
return
|
||
|
||
if self.rise_period > to or self.is_buffer_overflow or self.rise_period < rise_time_min(rt) or self.is_wrong_pack:
|
||
if self.is_recive and rising and (self.rise_period > to or self.rise_period < rise_time_min(rt)):
|
||
reason = (
|
||
f"risePeriod={self.rise_period} us: "
|
||
+ (f"> IR_timeout={to} us " if self.rise_period > to else "")
|
||
+ (f"< riseTimeMin={rise_time_min(rt)} us " if self.rise_period < rise_time_min(rt) else "")
|
||
)
|
||
self._fix(f"t={t} пропуск такта (goto END): {reason.strip()}")
|
||
return
|
||
|
||
if not rising:
|
||
return
|
||
|
||
self.high_count = 0
|
||
self.low_count = 0
|
||
self.all_count = 0
|
||
invert_err = False
|
||
rt = self.rise_sync_time
|
||
|
||
if around_rise(self.rise_period, rt):
|
||
self.stat_clean_bits += 1
|
||
bit = 1 if self.high_time > self.low_time else 0
|
||
if self.verbose:
|
||
self._fix(
|
||
f"t={t} «чистый» бит: aroundRise (risePeriod={self.rise_period} us в [{rise_time_min(rt)}..{rise_time_max(rt)}]), "
|
||
f"highTime={self.high_time} lowTime={self.low_time} us -> bit {bit}"
|
||
)
|
||
self.write_to_buffer(bit)
|
||
else:
|
||
self.stat_burst_edges += 1
|
||
self.high_count = ceil_div(self.high_time, rt)
|
||
self.low_count = ceil_div(self.low_time, rt)
|
||
self.all_count = ceil_div(self.rise_period, rt)
|
||
self._fix(
|
||
f"t={t} пропуск такта / растяжение: risePeriod={self.rise_period} us вне aroundRise "
|
||
f"[{rise_time_min(rt)}..{rise_time_max(rt)}]; "
|
||
f"ceil_div: highTime/{rt}->{self.high_count}, lowTime/{rt}->{self.low_count}, risePeriod/{rt}->{self.all_count}"
|
||
)
|
||
if self.high_count == 0 and self.high_time > rt // 3:
|
||
self.high_count += 1
|
||
self.errors_other += 1
|
||
self._fix(
|
||
f"доп. коррекция: highCount был 0 при highTime={self.high_time} > riseTime/3 ({rt // 3}) -> highCount++"
|
||
)
|
||
if self.low_count + self.high_count > self.all_count:
|
||
lo, hi, ac = self.low_count, self.high_count, self.all_count
|
||
if self.low_count > self.high_count:
|
||
self.low_count = self.all_count - self.high_count
|
||
self._fix(
|
||
f"поджатие: low+high>{ac} и low>high -> lowCount {lo}->{self.low_count} (лишние нули)"
|
||
)
|
||
elif self.low_count < self.high_count:
|
||
self.high_count = self.all_count - self.low_count
|
||
self._fix(
|
||
f"поджатие: low+high>{ac} и low<high -> highCount {hi}->{self.high_count} (лишние единицы)"
|
||
)
|
||
elif self.low_count == self.high_count:
|
||
invert_err = True
|
||
self.errors_other += self.all_count
|
||
self._fix(
|
||
f"поджатие: low==high при low+high>{ac} -> invertErr (последний из low-цикла пишется как 1)"
|
||
)
|
||
i = 0
|
||
while i < self.low_count and (8 - i):
|
||
if i == self.low_count - 1 and invert_err:
|
||
invert_err = False
|
||
self.write_to_buffer(1)
|
||
else:
|
||
self.write_to_buffer(0)
|
||
i += 1
|
||
i = 0
|
||
while i < self.high_count and (8 - i):
|
||
if i == self.high_count - 1 and invert_err:
|
||
invert_err = False
|
||
self.write_to_buffer(0)
|
||
else:
|
||
self.write_to_buffer(1)
|
||
i += 1
|
||
|
||
|
||
def timing_stats(edges: List[EdgeRec]) -> None:
|
||
dts: List[int] = []
|
||
for i in range(1, len(edges)):
|
||
d = edges[i].t_us - edges[i - 1].t_us
|
||
if 0 <= d < 1_000_000:
|
||
dts.append(d)
|
||
if not dts:
|
||
print("Нет интервалов для статистики.")
|
||
return
|
||
dts.sort()
|
||
def pct(p: float) -> int:
|
||
return dts[int(len(dts) * p)]
|
||
print("--- Inter-edge deltas in log (us) ---")
|
||
print(f" N={len(dts)} min={dts[0]} p50={pct(0.5)} p90={pct(0.9)} max={dts[-1]}")
|
||
print(
|
||
f" bitTime(ref)~{BIT_TIME} us aroundRise window ({rise_time_min(BIT_TIME)}..{rise_time_max(BIT_TIME)}) us"
|
||
)
|
||
# грубые корзины
|
||
buckets = [0, 0, 0, 0, 0]
|
||
for d in dts:
|
||
if d < 200:
|
||
buckets[0] += 1
|
||
elif d < 600:
|
||
buckets[1] += 1
|
||
elif d < 1200:
|
||
buckets[2] += 1
|
||
elif d < 3000:
|
||
buckets[3] += 1
|
||
else:
|
||
buckets[4] += 1
|
||
print(f" корзины [0-200) [200-600) [600-1200) [1200-3000) [3000+): {buckets}")
|
||
|
||
|
||
def main() -> int:
|
||
ap = argparse.ArgumentParser(description="Симуляция IR decode по @IRF1v1 логу")
|
||
ap.add_argument("logfile", nargs="?", default=None, help="Текстовый лог с @IRF1v1:")
|
||
ap.add_argument("--include-skipped", action="store_true", help="Подмешивать фронты с SKIP_DECODE (обычно нет)")
|
||
ap.add_argument("--max-events", type=int, default=80, help="Макс. событий FRAME_START/END в отчёте")
|
||
ap.add_argument(
|
||
"--no-color",
|
||
action="store_true",
|
||
help="Bez ANSI-tsvetov (ili zadajte NO_COLOR v okruzhenii)",
|
||
)
|
||
ap.add_argument(
|
||
"-v",
|
||
"--verbose",
|
||
action="store_true",
|
||
help="Podrobnyj vyvod: kazhdyj chistyj bit (aroundRise), otbrosy frontov; inache tolko svodka",
|
||
)
|
||
args = ap.parse_args()
|
||
if not args.logfile:
|
||
print("Укажите путь к логу, например: python tools/ir_decoder_sim.py ref/ISR_self_frontlog.txt")
|
||
return 1
|
||
text = open(args.logfile, "r", encoding="utf-8", errors="replace").read()
|
||
raw_edges = parse_irf1_lines(text)
|
||
edges = [e for e in raw_edges if args.include_skipped or not (e.flags & SKIP_DECODE_FLAG)]
|
||
edges.sort(key=lambda e: (e.t_us, id(e)))
|
||
print(f"Записей фронтов (после фильтра SKIP): {len(edges)} (всего распарсено: {len(raw_edges)})")
|
||
timing_stats(edges)
|
||
|
||
dec = DecoderSim(verbose=args.verbose)
|
||
for e in edges:
|
||
dec.tick_edge(e.t_us, e.level)
|
||
|
||
print("--- События декодера (первые N), пакеты разделены пустой строкой ---")
|
||
slice_ev = dec.events[: args.max_events]
|
||
first_packet = True
|
||
use_color = _use_terminal_color() and not args.no_color
|
||
for ev in slice_ev:
|
||
head = ev.split("\n", 1)[0]
|
||
if "PACKET_START" in head:
|
||
if not first_packet:
|
||
print()
|
||
first_packet = False
|
||
tone = _packet_event_tone(ev)
|
||
if tone is not None:
|
||
ok = tone == "ok"
|
||
ev_out = _highlight_frame_end_payloads(ev, ok=ok, enabled=use_color)
|
||
print(_colorize_block(ev_out, ok=ok, enabled=use_color))
|
||
else:
|
||
print(ev)
|
||
if len(dec.events) > args.max_events:
|
||
print(f"... всего событий: {len(dec.events)}")
|
||
print(f"errors_other={dec.errors_other} wrong_pack_end={dec.is_wrong_pack} recive={dec.is_recive}")
|
||
return 0
|
||
|
||
|
||
if __name__ == "__main__":
|
||
sys.exit(main())
|