forked from Akcelerometry_drgania_WMT/PI_mikrokontroler
111 lines
4.2 KiB
Python
111 lines
4.2 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
Konwerter .wmt (nowy format próbek 12 B) -> CSV.
|
|
# python3 wmt_reader.py '/Users/katalog/00000001.wmt' -a -f -d <-wszystkie, +podfoldery, usuwa stare wmt
|
|
|
|
Plik .wmt:
|
|
Header (packed LE, 19 B):
|
|
char[3] magic="WMT"
|
|
uint16 version
|
|
uint16 headerSize
|
|
uint32 sampleSize (oczekiwane 12)
|
|
uint32 timestamp (Unix start akwizycji, sekundy)
|
|
uint32 reccount (liczba rekordów; może być 0)
|
|
|
|
Sample (packed LE, 12 B):
|
|
uint32 offset_us (µs od startu akwizycji)
|
|
uint8 sensor_id (0..6)
|
|
int16 x, y, z
|
|
uint8 ready (0/1)
|
|
|
|
CSV:
|
|
start_unix, offset_us, unix_ts, sensor_id, x, y, z, ready
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
import argparse
|
|
import pathlib
|
|
import struct
|
|
import pandas as pd
|
|
from datetime import datetime
|
|
|
|
HEADER_FMT = "<3sHHIII" # 19 B
|
|
HEADER_SIZE = struct.calcsize(HEADER_FMT)
|
|
|
|
SAMPLE_FMT = "<IBhhhB" # 12 B: u32, u8, i16, i16, i16, u8
|
|
SAMPLE_SIZE = struct.calcsize(SAMPLE_FMT) # 12
|
|
|
|
def read_wmt_one(path: str) -> pd.DataFrame:
|
|
p = pathlib.Path(path)
|
|
blob = p.read_bytes()
|
|
|
|
if len(blob) < HEADER_SIZE:
|
|
raise ValueError(f"{p.name}: plik zbyt krótki ({len(blob)} B).")
|
|
|
|
magic, version, headerSize, sampleSize, start_unix, reccount = struct.unpack_from(HEADER_FMT, blob, 0)
|
|
|
|
if magic != b"WMT":
|
|
raise ValueError(f"{p.name}: niepoprawny magic {magic!r} (oczekiwano b'WMT').")
|
|
if sampleSize != SAMPLE_SIZE:
|
|
raise ValueError(f"{p.name}: sampleSize={sampleSize}, oczekiwano {SAMPLE_SIZE} (obsługiwany jest TYLKO nowy format 12 B).")
|
|
if len(blob) < headerSize:
|
|
raise ValueError(f"{p.name}: headerSize={headerSize} większy niż rozmiar pliku.")
|
|
|
|
# Wyświetlenie informacji o nagłówku
|
|
start_time = datetime.fromtimestamp(start_unix)
|
|
print(f"\n=== Plik: {p.name} ===")
|
|
print(f"Czas rozpoczęcia akwizycji: {start_time} (Unix: {start_unix})")
|
|
print(f"Liczba próbek zadeklarowana w nagłówku: {reccount}")
|
|
|
|
data = blob[headerSize:]
|
|
if len(data) % SAMPLE_SIZE != 0:
|
|
print(f"[WARN] {p.name}: długość danych {len(data)} nie jest wielokrotnością {SAMPLE_SIZE} (plik może być niepełny).")
|
|
|
|
nrec = len(data) // SAMPLE_SIZE
|
|
if reccount and reccount != nrec:
|
|
print(f"[INFO] {p.name}: reccount={reccount}, policzono={nrec}.")
|
|
else:
|
|
print(f"Policzono faktyczną liczbę próbek: {nrec}")
|
|
|
|
rows = []
|
|
off = 0
|
|
for _ in range(nrec):
|
|
rec = data[off:off+SAMPLE_SIZE]
|
|
offset_us, sensor_id, x, y, z, ready = struct.unpack(SAMPLE_FMT, rec)
|
|
unix_ts = start_unix + (offset_us / 1_000_000.0) # sekundy z ułamkiem
|
|
rows.append((start_unix, offset_us, unix_ts, sensor_id, x, y, z, int(ready)))
|
|
off += SAMPLE_SIZE
|
|
|
|
df = pd.DataFrame(rows, columns=["start_unix", "offset_us", "unix_ts", "sensor_id", "x", "y", "z", "ready"])
|
|
df = df.astype({"sensor_id": "int32", "x": "int32", "y": "int32", "z": "int32", "ready": "int32"})
|
|
return df
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser(description="Konwersja .wmt (nowy format 12 B) -> CSV.")
|
|
ap.add_argument("inputs", nargs="+", help="Ścieżki do plików .wmt")
|
|
ap.add_argument("--out-suffix", default=".csv", help="Sufiks wyjściowy (domyślnie .csv)")
|
|
ap.add_argument("--concat", action="store_true", help="Scal wszystkie wejścia do jednego CSV (na bazie pierwszego pliku)")
|
|
ap.add_argument("--no-header", action="store_true", help="Zapisz CSV bez nagłówka")
|
|
args = ap.parse_args()
|
|
|
|
dfs = []
|
|
for in_path in args.inputs:
|
|
df = read_wmt_one(in_path)
|
|
if args.concat:
|
|
df["_source"] = str(pathlib.Path(in_path).name)
|
|
dfs.append(df)
|
|
else:
|
|
out_path = str(pathlib.Path(in_path).with_suffix(args.out_suffix))
|
|
df.to_csv(out_path, index=False, header=not args.no_header)
|
|
print(f"Zapisano {out_path} ({len(df)} rekordów)")
|
|
|
|
if args.concat and dfs:
|
|
out_path = str(pathlib.Path(args.inputs[0]).with_suffix(args.out_suffix))
|
|
big = pd.concat(dfs, ignore_index=True)
|
|
big.to_csv(out_path, index=False, header=not args.no_header)
|
|
print(f"\nZapisano scalony {out_path} ({len(big)} rekordów z {len(dfs)} plików)")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|