Files
PI_mikrokontroler_2/Python3/wmt_reader.py

111 lines
4.2 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Konwerter .wmt (nowy format próbek 12 B) -> CSV.
# python3 wmt_reader.py '/Users/katalog/00000001.wmt' -a -f -d <-wszystkie, +podfoldery, usuwa stare wmt
Plik .wmt:
Header (packed LE, 19 B):
char[3] magic="WMT"
uint16 version
uint16 headerSize
uint32 sampleSize (oczekiwane 12)
uint32 timestamp (Unix start akwizycji, sekundy)
uint32 reccount (liczba rekordów; może być 0)
Sample (packed LE, 12 B):
uint32 offset_us (µs od startu akwizycji)
uint8 sensor_id (0..6)
int16 x, y, z
uint8 ready (0/1)
CSV:
start_unix, offset_us, unix_ts, sensor_id, x, y, z, ready
"""
from __future__ import annotations
import argparse
import pathlib
import struct
import pandas as pd
from datetime import datetime
HEADER_FMT = "<3sHHIII" # 19 B
HEADER_SIZE = struct.calcsize(HEADER_FMT)
SAMPLE_FMT = "<IBhhhB" # 12 B: u32, u8, i16, i16, i16, u8
SAMPLE_SIZE = struct.calcsize(SAMPLE_FMT) # 12
def read_wmt_one(path: str) -> pd.DataFrame:
p = pathlib.Path(path)
blob = p.read_bytes()
if len(blob) < HEADER_SIZE:
raise ValueError(f"{p.name}: plik zbyt krótki ({len(blob)} B).")
magic, version, headerSize, sampleSize, start_unix, reccount = struct.unpack_from(HEADER_FMT, blob, 0)
if magic != b"WMT":
raise ValueError(f"{p.name}: niepoprawny magic {magic!r} (oczekiwano b'WMT').")
if sampleSize != SAMPLE_SIZE:
raise ValueError(f"{p.name}: sampleSize={sampleSize}, oczekiwano {SAMPLE_SIZE} (obsługiwany jest TYLKO nowy format 12 B).")
if len(blob) < headerSize:
raise ValueError(f"{p.name}: headerSize={headerSize} większy niż rozmiar pliku.")
# Wyświetlenie informacji o nagłówku
start_time = datetime.fromtimestamp(start_unix)
print(f"\n=== Plik: {p.name} ===")
print(f"Czas rozpoczęcia akwizycji: {start_time} (Unix: {start_unix})")
print(f"Liczba próbek zadeklarowana w nagłówku: {reccount}")
data = blob[headerSize:]
if len(data) % SAMPLE_SIZE != 0:
print(f"[WARN] {p.name}: długość danych {len(data)} nie jest wielokrotnością {SAMPLE_SIZE} (plik może być niepełny).")
nrec = len(data) // SAMPLE_SIZE
if reccount and reccount != nrec:
print(f"[INFO] {p.name}: reccount={reccount}, policzono={nrec}.")
else:
print(f"Policzono faktyczną liczbę próbek: {nrec}")
rows = []
off = 0
for _ in range(nrec):
rec = data[off:off+SAMPLE_SIZE]
offset_us, sensor_id, x, y, z, ready = struct.unpack(SAMPLE_FMT, rec)
unix_ts = start_unix + (offset_us / 1_000_000.0) # sekundy z ułamkiem
rows.append((start_unix, offset_us, unix_ts, sensor_id, x, y, z, int(ready)))
off += SAMPLE_SIZE
df = pd.DataFrame(rows, columns=["start_unix", "offset_us", "unix_ts", "sensor_id", "x", "y", "z", "ready"])
df = df.astype({"sensor_id": "int32", "x": "int32", "y": "int32", "z": "int32", "ready": "int32"})
return df
def main():
ap = argparse.ArgumentParser(description="Konwersja .wmt (nowy format 12 B) -> CSV.")
ap.add_argument("inputs", nargs="+", help="Ścieżki do plików .wmt")
ap.add_argument("--out-suffix", default=".csv", help="Sufiks wyjściowy (domyślnie .csv)")
ap.add_argument("--concat", action="store_true", help="Scal wszystkie wejścia do jednego CSV (na bazie pierwszego pliku)")
ap.add_argument("--no-header", action="store_true", help="Zapisz CSV bez nagłówka")
args = ap.parse_args()
dfs = []
for in_path in args.inputs:
df = read_wmt_one(in_path)
if args.concat:
df["_source"] = str(pathlib.Path(in_path).name)
dfs.append(df)
else:
out_path = str(pathlib.Path(in_path).with_suffix(args.out_suffix))
df.to_csv(out_path, index=False, header=not args.no_header)
print(f"Zapisano {out_path} ({len(df)} rekordów)")
if args.concat and dfs:
out_path = str(pathlib.Path(args.inputs[0]).with_suffix(args.out_suffix))
big = pd.concat(dfs, ignore_index=True)
big.to_csv(out_path, index=False, header=not args.no_header)
print(f"\nZapisano scalony {out_path} ({len(big)} rekordów z {len(dfs)} plików)")
if __name__ == "__main__":
main()