forked from Akcelerometry_drgania_WMT/PI_mikrokontroler
155 lines
6.0 KiB
Python
155 lines
6.0 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
wmt_reader.py — parser plików .wmt z nagłówkiem WMT i eksport do CSV
|
||
|
||
Nagłówek (packed, little-endian; 19 B):
|
||
char[3] magic // "WMT"
|
||
uint16 version
|
||
uint16 headerSize // rozmiar nagłówka w bajtach (powinno być 19)
|
||
uint32 sampleSize // rozmiar rekordu Sample (powinno być 11)
|
||
uint32 timestamp // Unix time startu akwizycji
|
||
uint32 reccount // liczba rekordów w pliku
|
||
|
||
Rekord Sample (packed, little-endian; 11 B):
|
||
int32 ident
|
||
int16 x, y, z
|
||
bool ready // 1 bajt
|
||
|
||
Wymagania:
|
||
- Czytaj .wmt i zapisuj CSV.
|
||
- Kolumny x,y,z mają być typu całkowitego (nie skalujemy tych wartości ani ich nie modyfikujemy).
|
||
- Opcjonalnie można dodać kolumny x_g,y_g,z_g poprzez --range-g (x,y,z pozostają bez zmian).
|
||
"""
|
||
from __future__ import annotations
|
||
import argparse
|
||
import struct
|
||
import pathlib
|
||
import pandas as pd
|
||
|
||
# ------------------------------ Format pliku ----------------------------------
|
||
|
||
HEADER_FMT = "<3sHHIII" # magic(3s), version(H), headerSize(H), sampleSize(I), timestamp(I), reccount(I)
|
||
HEADER_SIZE = struct.calcsize(HEADER_FMT) # oczekiwane 19
|
||
|
||
# Nowy rekord Sample: ident(int32), x(int16), y(int16), z(int16), ready(bool-1B)
|
||
SAMPLE_FMT = "<ihhh?" # 4 + 2 + 2 + 2 + 1 = 11 B
|
||
SAMPLE_SIZE_EXPECTED = struct.calcsize(SAMPLE_FMT) # 11
|
||
|
||
# --------------------------------- I/O ---------------------------------------
|
||
|
||
def read_wmt(path: str) -> pd.DataFrame:
|
||
"""
|
||
Wczytuje plik .wmt (z nagłówkiem WMT) do DataFrame.
|
||
Kolumny: timestamp, ident, x, y, z, ready
|
||
Gwarantujemy, że x,y,z są typu całkowitego (int32 w DataFrame), bez skalowania.
|
||
"""
|
||
p = pathlib.Path(path)
|
||
blob = p.read_bytes()
|
||
|
||
# --- Nagłówek ---
|
||
if len(blob) < HEADER_SIZE:
|
||
raise ValueError(f"{p.name}: za krótki nagłówek ({len(blob)} B).")
|
||
|
||
magic, version, headerSize, sampleSize, start_unix, reccount = struct.unpack_from(HEADER_FMT, blob, 0)
|
||
|
||
if magic != b"WMT":
|
||
raise ValueError(f"{p.name}: nieprawidłowa sygnatura magic={magic!r} (oczekiwano b'WMT').")
|
||
|
||
if headerSize != HEADER_SIZE:
|
||
# Tolerujemy różnicę, ale ostrzegamy; dalej użyjemy headerSize jako offsetu danych
|
||
print(f"[WARN] {p.name}: headerSize={headerSize}, spodziewano {HEADER_SIZE}.")
|
||
|
||
if sampleSize != SAMPLE_SIZE_EXPECTED:
|
||
raise ValueError(f"{p.name}: sampleSize={sampleSize}, oczekiwano {SAMPLE_SIZE_EXPECTED} (nowy format Sample=11B).")
|
||
|
||
# --- Dane rekordów ---
|
||
data_off = headerSize # początek rekordów wg nagłówka
|
||
if len(blob) < data_off:
|
||
raise ValueError(f"{p.name}: uszkodzony headerSize={headerSize} (większy niż plik).")
|
||
|
||
data = blob[data_off:]
|
||
if len(data) % sampleSize != 0:
|
||
# to nie musi być błąd krytyczny (np. przerwany zapis), ale ostrzeżmy
|
||
print(f"[WARN] {p.name}: długość danych {len(data)} nie jest wielokrotnością sampleSize={sampleSize}.")
|
||
|
||
nrec = len(data) // sampleSize
|
||
if reccount and reccount != nrec:
|
||
print(f"[INFO] {p.name}: reccount w nagłówku = {reccount}, policzone rekordy = {nrec}.")
|
||
|
||
rows = []
|
||
off = 0
|
||
for _ in range(nrec):
|
||
rec = data[off:off + sampleSize]
|
||
ident, x, y, z, ready = struct.unpack(SAMPLE_FMT, rec)
|
||
|
||
rows.append((
|
||
start_unix, # timestamp startu akwizycji z nagłówka
|
||
ident, x, y, z,
|
||
bool(ready),
|
||
))
|
||
off += sampleSize
|
||
|
||
df = pd.DataFrame(
|
||
rows,
|
||
columns=["timestamp", "ident", "x", "y", "z", "ready"]
|
||
)
|
||
|
||
# Wymuszenie typu całkowitego dla x,y,z – bez zmiany wartości
|
||
df = df.astype({"x": "int32", "y": "int32", "z": "int32", "ident": "int32"})
|
||
# ready pozostaje bool
|
||
return df
|
||
|
||
# ---------------------------- Konwersja do jednostek g ------------------------
|
||
|
||
def to_g(df: pd.DataFrame, range_g: float) -> pd.DataFrame:
|
||
"""
|
||
Konwersja int -> g przy założeniu: Q = 32768 / range_g => g = value / Q
|
||
(Dopisuje kolumny x_g,y_g,z_g; NIE modyfikuje x,y,z.)
|
||
"""
|
||
if range_g is None or range_g <= 0:
|
||
raise ValueError("range_g musi być dodatni (np. 2, 4, 8, 16).")
|
||
q = 32768.0 / float(range_g)
|
||
out = df.copy()
|
||
out["x_g"] = out["x"] / q
|
||
out["y_g"] = out["y"] / q
|
||
out["z_g"] = out["z"] / q
|
||
return out
|
||
|
||
# -------------------------------------- CLI ----------------------------------
|
||
|
||
def main() -> None:
|
||
ap = argparse.ArgumentParser(description="Czytaj pliki .wmt (WMT, Sample 11B) i eksportuj do CSV.")
|
||
ap.add_argument("inputs", nargs="+", help="Ścieżki do plików .wmt (jeden lub wiele).")
|
||
ap.add_argument("--range-g", type=float, default=None,
|
||
help="Jeśli podasz (np. 2,4,8,16), doda kolumny x_g,y_g,z_g (x,y,z pozostają całkowite).")
|
||
ap.add_argument("--out-suffix", default=".csv",
|
||
help="Sufiks wyjściowy (domyślnie .csv).")
|
||
ap.add_argument("--concat", action="store_true",
|
||
help="Jeśli ustawione, łączy wszystkie wejścia w JEDEN plik wynikowy (pierwszy.EXT).")
|
||
ap.add_argument("--no-header", action="store_true",
|
||
help="Zapisz CSV bez nagłówka.")
|
||
args = ap.parse_args()
|
||
|
||
dfs = []
|
||
for in_path in args.inputs:
|
||
df = read_wmt(in_path)
|
||
if args.range_g is not None:
|
||
df = to_g(df, args.range_g)
|
||
if args.concat:
|
||
df["_source"] = str(pathlib.Path(in_path).name)
|
||
dfs.append(df)
|
||
else:
|
||
out_path = str(pathlib.Path(in_path).with_suffix(args.out_suffix))
|
||
df.to_csv(out_path, index=False, header=not args.no_header)
|
||
print(f"Zapisano {out_path} ({len(df)} rekordów)")
|
||
|
||
if args.concat and dfs:
|
||
out_path = str(pathlib.Path(args.inputs[0]).with_suffix(args.out_suffix))
|
||
big = pd.concat(dfs, ignore_index=True)
|
||
big.to_csv(out_path, index=False, header=not args.no_header)
|
||
print(f"Zapisano scalony {out_path} ({len(big)} rekordów z {len(dfs)} plików)")
|
||
|
||
if __name__ == "__main__":
|
||
main()
|