#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Konwerter .wmt (nowy format próbek 12 B) -> CSV. # python3 wmt_reader.py '/Users/katalog/00000001.wmt' -a -f -d <-wszystkie, +podfoldery, usuwa stare wmt Plik .wmt: Header (packed LE, 19 B): char[3] magic="WMT" uint16 version uint16 headerSize uint32 sampleSize (oczekiwane 12) uint32 timestamp (Unix start akwizycji, sekundy) uint32 reccount (liczba rekordów; może być 0) Sample (packed LE, 12 B): uint32 offset_us (µs od startu akwizycji) uint8 sensor_id (0..6) int16 x, y, z uint8 ready (0/1) CSV: start_unix, offset_us, unix_ts, sensor_id, x, y, z, ready """ from __future__ import annotations import argparse import pathlib import struct import pandas as pd from datetime import datetime HEADER_FMT = "<3sHHIII" # 19 B HEADER_SIZE = struct.calcsize(HEADER_FMT) SAMPLE_FMT = " pd.DataFrame: p = pathlib.Path(path) blob = p.read_bytes() if len(blob) < HEADER_SIZE: raise ValueError(f"{p.name}: plik zbyt krótki ({len(blob)} B).") magic, version, headerSize, sampleSize, start_unix, reccount = struct.unpack_from(HEADER_FMT, blob, 0) if magic != b"WMT": raise ValueError(f"{p.name}: niepoprawny magic {magic!r} (oczekiwano b'WMT').") if sampleSize != SAMPLE_SIZE: raise ValueError(f"{p.name}: sampleSize={sampleSize}, oczekiwano {SAMPLE_SIZE} (obsługiwany jest TYLKO nowy format 12 B).") if len(blob) < headerSize: raise ValueError(f"{p.name}: headerSize={headerSize} większy niż rozmiar pliku.") # Wyświetlenie informacji o nagłówku start_time = datetime.fromtimestamp(start_unix) print(f"\n=== Plik: {p.name} ===") print(f"Czas rozpoczęcia akwizycji: {start_time} (Unix: {start_unix})") print(f"Liczba próbek zadeklarowana w nagłówku: {reccount}") data = blob[headerSize:] if len(data) % SAMPLE_SIZE != 0: print(f"[WARN] {p.name}: długość danych {len(data)} nie jest wielokrotnością {SAMPLE_SIZE} (plik może być niepełny).") nrec = len(data) // SAMPLE_SIZE if reccount and reccount != nrec: print(f"[INFO] {p.name}: reccount={reccount}, policzono={nrec}.") else: print(f"Policzono faktyczną liczbę próbek: {nrec}") rows = [] off = 0 for _ in range(nrec): rec = data[off:off+SAMPLE_SIZE] offset_us, sensor_id, x, y, z, ready = struct.unpack(SAMPLE_FMT, rec) unix_ts = start_unix + (offset_us / 1_000_000.0) # sekundy z ułamkiem rows.append((start_unix, offset_us, unix_ts, sensor_id, x, y, z, int(ready))) off += SAMPLE_SIZE df = pd.DataFrame(rows, columns=["start_unix", "offset_us", "unix_ts", "sensor_id", "x", "y", "z", "ready"]) df = df.astype({"sensor_id": "int32", "x": "int32", "y": "int32", "z": "int32", "ready": "int32"}) return df def main(): ap = argparse.ArgumentParser(description="Konwersja .wmt (nowy format 12 B) -> CSV.") ap.add_argument("inputs", nargs="+", help="Ścieżki do plików .wmt") ap.add_argument("--out-suffix", default=".csv", help="Sufiks wyjściowy (domyślnie .csv)") ap.add_argument("--concat", action="store_true", help="Scal wszystkie wejścia do jednego CSV (na bazie pierwszego pliku)") ap.add_argument("--no-header", action="store_true", help="Zapisz CSV bez nagłówka") args = ap.parse_args() dfs = [] for in_path in args.inputs: df = read_wmt_one(in_path) if args.concat: df["_source"] = str(pathlib.Path(in_path).name) dfs.append(df) else: out_path = str(pathlib.Path(in_path).with_suffix(args.out_suffix)) df.to_csv(out_path, index=False, header=not args.no_header) print(f"Zapisano {out_path} ({len(df)} rekordów)") if args.concat and dfs: out_path = str(pathlib.Path(args.inputs[0]).with_suffix(args.out_suffix)) big = pd.concat(dfs, ignore_index=True) big.to_csv(out_path, index=False, header=not args.no_header) print(f"\nZapisano scalony {out_path} ({len(big)} rekordów z {len(dfs)} plików)") if __name__ == "__main__": main()