Files

155 lines
6.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
wmt_reader.py — parser plików .wmt z nagłówkiem WMT i eksport do CSV
Nagłówek (packed, little-endian; 19 B):
char[3] magic // "WMT"
uint16 version
uint16 headerSize // rozmiar nagłówka w bajtach (powinno być 19)
uint32 sampleSize // rozmiar rekordu Sample (powinno być 11)
uint32 timestamp // Unix time startu akwizycji
uint32 reccount // liczba rekordów w pliku
Rekord Sample (packed, little-endian; 11 B):
int32 ident
int16 x, y, z
bool ready // 1 bajt
Wymagania:
- Czytaj .wmt i zapisuj CSV.
- Kolumny x,y,z mają być typu całkowitego (nie skalujemy tych wartości ani ich nie modyfikujemy).
- Opcjonalnie można dodać kolumny x_g,y_g,z_g poprzez --range-g (x,y,z pozostają bez zmian).
"""
from __future__ import annotations
import argparse
import struct
import pathlib
import pandas as pd
# ------------------------------ Format pliku ----------------------------------
HEADER_FMT = "<3sHHIII" # magic(3s), version(H), headerSize(H), sampleSize(I), timestamp(I), reccount(I)
HEADER_SIZE = struct.calcsize(HEADER_FMT) # oczekiwane 19
# Nowy rekord Sample: ident(int32), x(int16), y(int16), z(int16), ready(bool-1B)
SAMPLE_FMT = "<ihhh?" # 4 + 2 + 2 + 2 + 1 = 11 B
SAMPLE_SIZE_EXPECTED = struct.calcsize(SAMPLE_FMT) # 11
# --------------------------------- I/O ---------------------------------------
def read_wmt(path: str) -> pd.DataFrame:
"""
Wczytuje plik .wmt (z nagłówkiem WMT) do DataFrame.
Kolumny: timestamp, ident, x, y, z, ready
Gwarantujemy, że x,y,z są typu całkowitego (int32 w DataFrame), bez skalowania.
"""
p = pathlib.Path(path)
blob = p.read_bytes()
# --- Nagłówek ---
if len(blob) < HEADER_SIZE:
raise ValueError(f"{p.name}: za krótki nagłówek ({len(blob)} B).")
magic, version, headerSize, sampleSize, start_unix, reccount = struct.unpack_from(HEADER_FMT, blob, 0)
if magic != b"WMT":
raise ValueError(f"{p.name}: nieprawidłowa sygnatura magic={magic!r} (oczekiwano b'WMT').")
if headerSize != HEADER_SIZE:
# Tolerujemy różnicę, ale ostrzegamy; dalej użyjemy headerSize jako offsetu danych
print(f"[WARN] {p.name}: headerSize={headerSize}, spodziewano {HEADER_SIZE}.")
if sampleSize != SAMPLE_SIZE_EXPECTED:
raise ValueError(f"{p.name}: sampleSize={sampleSize}, oczekiwano {SAMPLE_SIZE_EXPECTED} (nowy format Sample=11B).")
# --- Dane rekordów ---
data_off = headerSize # początek rekordów wg nagłówka
if len(blob) < data_off:
raise ValueError(f"{p.name}: uszkodzony headerSize={headerSize} (większy niż plik).")
data = blob[data_off:]
if len(data) % sampleSize != 0:
# to nie musi być błąd krytyczny (np. przerwany zapis), ale ostrzeżmy
print(f"[WARN] {p.name}: długość danych {len(data)} nie jest wielokrotnością sampleSize={sampleSize}.")
nrec = len(data) // sampleSize
if reccount and reccount != nrec:
print(f"[INFO] {p.name}: reccount w nagłówku = {reccount}, policzone rekordy = {nrec}.")
rows = []
off = 0
for _ in range(nrec):
rec = data[off:off + sampleSize]
ident, x, y, z, ready = struct.unpack(SAMPLE_FMT, rec)
rows.append((
start_unix, # timestamp startu akwizycji z nagłówka
ident, x, y, z,
bool(ready),
))
off += sampleSize
df = pd.DataFrame(
rows,
columns=["timestamp", "ident", "x", "y", "z", "ready"]
)
# Wymuszenie typu całkowitego dla x,y,z bez zmiany wartości
df = df.astype({"x": "int32", "y": "int32", "z": "int32", "ident": "int32"})
# ready pozostaje bool
return df
# ---------------------------- Konwersja do jednostek g ------------------------
def to_g(df: pd.DataFrame, range_g: float) -> pd.DataFrame:
"""
Konwersja int -> g przy założeniu: Q = 32768 / range_g => g = value / Q
(Dopisuje kolumny x_g,y_g,z_g; NIE modyfikuje x,y,z.)
"""
if range_g is None or range_g <= 0:
raise ValueError("range_g musi być dodatni (np. 2, 4, 8, 16).")
q = 32768.0 / float(range_g)
out = df.copy()
out["x_g"] = out["x"] / q
out["y_g"] = out["y"] / q
out["z_g"] = out["z"] / q
return out
# -------------------------------------- CLI ----------------------------------
def main() -> None:
ap = argparse.ArgumentParser(description="Czytaj pliki .wmt (WMT, Sample 11B) i eksportuj do CSV.")
ap.add_argument("inputs", nargs="+", help="Ścieżki do plików .wmt (jeden lub wiele).")
ap.add_argument("--range-g", type=float, default=None,
help="Jeśli podasz (np. 2,4,8,16), doda kolumny x_g,y_g,z_g (x,y,z pozostają całkowite).")
ap.add_argument("--out-suffix", default=".csv",
help="Sufiks wyjściowy (domyślnie .csv).")
ap.add_argument("--concat", action="store_true",
help="Jeśli ustawione, łączy wszystkie wejścia w JEDEN plik wynikowy (pierwszy.EXT).")
ap.add_argument("--no-header", action="store_true",
help="Zapisz CSV bez nagłówka.")
args = ap.parse_args()
dfs = []
for in_path in args.inputs:
df = read_wmt(in_path)
if args.range_g is not None:
df = to_g(df, args.range_g)
if args.concat:
df["_source"] = str(pathlib.Path(in_path).name)
dfs.append(df)
else:
out_path = str(pathlib.Path(in_path).with_suffix(args.out_suffix))
df.to_csv(out_path, index=False, header=not args.no_header)
print(f"Zapisano {out_path} ({len(df)} rekordów)")
if args.concat and dfs:
out_path = str(pathlib.Path(args.inputs[0]).with_suffix(args.out_suffix))
big = pd.concat(dfs, ignore_index=True)
big.to_csv(out_path, index=False, header=not args.no_header)
print(f"Zapisano scalony {out_path} ({len(big)} rekordów z {len(dfs)} plików)")
if __name__ == "__main__":
main()