#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ wmt_reader.py — parser plików .wmt z nagłówkiem WMT i eksport do CSV Nagłówek (packed, little-endian; 19 B): char[3] magic // "WMT" uint16 version uint16 headerSize // rozmiar nagłówka w bajtach (powinno być 19) uint32 sampleSize // rozmiar rekordu Sample (powinno być 11) uint32 timestamp // Unix time startu akwizycji uint32 reccount // liczba rekordów w pliku Rekord Sample (packed, little-endian; 11 B): int32 ident int16 x, y, z bool ready // 1 bajt Wymagania: - Czytaj .wmt i zapisuj CSV. - Kolumny x,y,z mają być typu całkowitego (nie skalujemy tych wartości ani ich nie modyfikujemy). - Opcjonalnie można dodać kolumny x_g,y_g,z_g poprzez --range-g (x,y,z pozostają bez zmian). """ from __future__ import annotations import argparse import struct import pathlib import pandas as pd # ------------------------------ Format pliku ---------------------------------- HEADER_FMT = "<3sHHIII" # magic(3s), version(H), headerSize(H), sampleSize(I), timestamp(I), reccount(I) HEADER_SIZE = struct.calcsize(HEADER_FMT) # oczekiwane 19 # Nowy rekord Sample: ident(int32), x(int16), y(int16), z(int16), ready(bool-1B) SAMPLE_FMT = " pd.DataFrame: """ Wczytuje plik .wmt (z nagłówkiem WMT) do DataFrame. Kolumny: timestamp, ident, x, y, z, ready Gwarantujemy, że x,y,z są typu całkowitego (int32 w DataFrame), bez skalowania. """ p = pathlib.Path(path) blob = p.read_bytes() # --- Nagłówek --- if len(blob) < HEADER_SIZE: raise ValueError(f"{p.name}: za krótki nagłówek ({len(blob)} B).") magic, version, headerSize, sampleSize, start_unix, reccount = struct.unpack_from(HEADER_FMT, blob, 0) if magic != b"WMT": raise ValueError(f"{p.name}: nieprawidłowa sygnatura magic={magic!r} (oczekiwano b'WMT').") if headerSize != HEADER_SIZE: # Tolerujemy różnicę, ale ostrzegamy; dalej użyjemy headerSize jako offsetu danych print(f"[WARN] {p.name}: headerSize={headerSize}, spodziewano {HEADER_SIZE}.") if sampleSize != SAMPLE_SIZE_EXPECTED: raise ValueError(f"{p.name}: sampleSize={sampleSize}, oczekiwano {SAMPLE_SIZE_EXPECTED} (nowy format Sample=11B).") # --- Dane rekordów --- data_off = headerSize # początek rekordów wg nagłówka if len(blob) < data_off: raise ValueError(f"{p.name}: uszkodzony headerSize={headerSize} (większy niż plik).") data = blob[data_off:] if len(data) % sampleSize != 0: # to nie musi być błąd krytyczny (np. przerwany zapis), ale ostrzeżmy print(f"[WARN] {p.name}: długość danych {len(data)} nie jest wielokrotnością sampleSize={sampleSize}.") nrec = len(data) // sampleSize if reccount and reccount != nrec: print(f"[INFO] {p.name}: reccount w nagłówku = {reccount}, policzone rekordy = {nrec}.") rows = [] off = 0 for _ in range(nrec): rec = data[off:off + sampleSize] ident, x, y, z, ready = struct.unpack(SAMPLE_FMT, rec) rows.append(( start_unix, # timestamp startu akwizycji z nagłówka ident, x, y, z, bool(ready), )) off += sampleSize df = pd.DataFrame( rows, columns=["timestamp", "ident", "x", "y", "z", "ready"] ) # Wymuszenie typu całkowitego dla x,y,z – bez zmiany wartości df = df.astype({"x": "int32", "y": "int32", "z": "int32", "ident": "int32"}) # ready pozostaje bool return df # ---------------------------- Konwersja do jednostek g ------------------------ def to_g(df: pd.DataFrame, range_g: float) -> pd.DataFrame: """ Konwersja int -> g przy założeniu: Q = 32768 / range_g => g = value / Q (Dopisuje kolumny x_g,y_g,z_g; NIE modyfikuje x,y,z.) """ if range_g is None or range_g <= 0: raise ValueError("range_g musi być dodatni (np. 2, 4, 8, 16).") q = 32768.0 / float(range_g) out = df.copy() out["x_g"] = out["x"] / q out["y_g"] = out["y"] / q out["z_g"] = out["z"] / q return out # -------------------------------------- CLI ---------------------------------- def main() -> None: ap = argparse.ArgumentParser(description="Czytaj pliki .wmt (WMT, Sample 11B) i eksportuj do CSV.") ap.add_argument("inputs", nargs="+", help="Ścieżki do plików .wmt (jeden lub wiele).") ap.add_argument("--range-g", type=float, default=None, help="Jeśli podasz (np. 2,4,8,16), doda kolumny x_g,y_g,z_g (x,y,z pozostają całkowite).") ap.add_argument("--out-suffix", default=".csv", help="Sufiks wyjściowy (domyślnie .csv).") ap.add_argument("--concat", action="store_true", help="Jeśli ustawione, łączy wszystkie wejścia w JEDEN plik wynikowy (pierwszy.EXT).") ap.add_argument("--no-header", action="store_true", help="Zapisz CSV bez nagłówka.") args = ap.parse_args() dfs = [] for in_path in args.inputs: df = read_wmt(in_path) if args.range_g is not None: df = to_g(df, args.range_g) if args.concat: df["_source"] = str(pathlib.Path(in_path).name) dfs.append(df) else: out_path = str(pathlib.Path(in_path).with_suffix(args.out_suffix)) df.to_csv(out_path, index=False, header=not args.no_header) print(f"Zapisano {out_path} ({len(df)} rekordów)") if args.concat and dfs: out_path = str(pathlib.Path(args.inputs[0]).with_suffix(args.out_suffix)) big = pd.concat(dfs, ignore_index=True) big.to_csv(out_path, index=False, header=not args.no_header) print(f"Zapisano scalony {out_path} ({len(big)} rekordów z {len(dfs)} plików)") if __name__ == "__main__": main()