forked from Akcelerometry_drgania_WMT/PI_mikrokontroler
Initial commit: PI_mikrokontroler changes
This commit is contained in:
110
firmware_adxl345_spi/Python3/wmt_reader.py
Normal file
110
firmware_adxl345_spi/Python3/wmt_reader.py
Normal file
@@ -0,0 +1,110 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
Konwerter .wmt (nowy format próbek 12 B) -> CSV.
|
||||
# python3 wmt_reader.py '/Users/katalog/00000001.wmt' -a -f -d <-wszystkie, +podfoldery, usuwa stare wmt
|
||||
|
||||
Plik .wmt:
|
||||
Header (packed LE, 19 B):
|
||||
char[3] magic="WMT"
|
||||
uint16 version
|
||||
uint16 headerSize
|
||||
uint32 sampleSize (oczekiwane 12)
|
||||
uint32 timestamp (Unix start akwizycji, sekundy)
|
||||
uint32 reccount (liczba rekordów; może być 0)
|
||||
|
||||
Sample (packed LE, 12 B):
|
||||
uint32 offset_us (µs od startu akwizycji)
|
||||
uint8 sensor_id (0..6)
|
||||
int16 x, y, z
|
||||
uint8 ready (0/1)
|
||||
|
||||
CSV:
|
||||
start_unix, offset_us, unix_ts, sensor_id, x, y, z, ready
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
import argparse
|
||||
import pathlib
|
||||
import struct
|
||||
import pandas as pd
|
||||
from datetime import datetime
|
||||
|
||||
HEADER_FMT = "<3sHHIII" # 19 B
|
||||
HEADER_SIZE = struct.calcsize(HEADER_FMT)
|
||||
|
||||
SAMPLE_FMT = "<IBhhhB" # 12 B: u32, u8, i16, i16, i16, u8
|
||||
SAMPLE_SIZE = struct.calcsize(SAMPLE_FMT) # 12
|
||||
|
||||
def read_wmt_one(path: str) -> pd.DataFrame:
|
||||
p = pathlib.Path(path)
|
||||
blob = p.read_bytes()
|
||||
|
||||
if len(blob) < HEADER_SIZE:
|
||||
raise ValueError(f"{p.name}: plik zbyt krótki ({len(blob)} B).")
|
||||
|
||||
magic, version, headerSize, sampleSize, start_unix, reccount = struct.unpack_from(HEADER_FMT, blob, 0)
|
||||
|
||||
if magic != b"WMT":
|
||||
raise ValueError(f"{p.name}: niepoprawny magic {magic!r} (oczekiwano b'WMT').")
|
||||
if sampleSize != SAMPLE_SIZE:
|
||||
raise ValueError(f"{p.name}: sampleSize={sampleSize}, oczekiwano {SAMPLE_SIZE} (obsługiwany jest TYLKO nowy format 12 B).")
|
||||
if len(blob) < headerSize:
|
||||
raise ValueError(f"{p.name}: headerSize={headerSize} większy niż rozmiar pliku.")
|
||||
|
||||
# Wyświetlenie informacji o nagłówku
|
||||
start_time = datetime.fromtimestamp(start_unix)
|
||||
print(f"\n=== Plik: {p.name} ===")
|
||||
print(f"Czas rozpoczęcia akwizycji: {start_time} (Unix: {start_unix})")
|
||||
print(f"Liczba próbek zadeklarowana w nagłówku: {reccount}")
|
||||
|
||||
data = blob[headerSize:]
|
||||
if len(data) % SAMPLE_SIZE != 0:
|
||||
print(f"[WARN] {p.name}: długość danych {len(data)} nie jest wielokrotnością {SAMPLE_SIZE} (plik może być niepełny).")
|
||||
|
||||
nrec = len(data) // SAMPLE_SIZE
|
||||
if reccount and reccount != nrec:
|
||||
print(f"[INFO] {p.name}: reccount={reccount}, policzono={nrec}.")
|
||||
else:
|
||||
print(f"Policzono faktyczną liczbę próbek: {nrec}")
|
||||
|
||||
rows = []
|
||||
off = 0
|
||||
for _ in range(nrec):
|
||||
rec = data[off:off+SAMPLE_SIZE]
|
||||
offset_us, sensor_id, x, y, z, ready = struct.unpack(SAMPLE_FMT, rec)
|
||||
unix_ts = start_unix + (offset_us / 1_000_000.0) # sekundy z ułamkiem
|
||||
rows.append((start_unix, offset_us, unix_ts, sensor_id, x, y, z, int(ready)))
|
||||
off += SAMPLE_SIZE
|
||||
|
||||
df = pd.DataFrame(rows, columns=["start_unix", "offset_us", "unix_ts", "sensor_id", "x", "y", "z", "ready"])
|
||||
df = df.astype({"sensor_id": "int32", "x": "int32", "y": "int32", "z": "int32", "ready": "int32"})
|
||||
return df
|
||||
|
||||
def main():
|
||||
ap = argparse.ArgumentParser(description="Konwersja .wmt (nowy format 12 B) -> CSV.")
|
||||
ap.add_argument("inputs", nargs="+", help="Ścieżki do plików .wmt")
|
||||
ap.add_argument("--out-suffix", default=".csv", help="Sufiks wyjściowy (domyślnie .csv)")
|
||||
ap.add_argument("--concat", action="store_true", help="Scal wszystkie wejścia do jednego CSV (na bazie pierwszego pliku)")
|
||||
ap.add_argument("--no-header", action="store_true", help="Zapisz CSV bez nagłówka")
|
||||
args = ap.parse_args()
|
||||
|
||||
dfs = []
|
||||
for in_path in args.inputs:
|
||||
df = read_wmt_one(in_path)
|
||||
if args.concat:
|
||||
df["_source"] = str(pathlib.Path(in_path).name)
|
||||
dfs.append(df)
|
||||
else:
|
||||
out_path = str(pathlib.Path(in_path).with_suffix(args.out_suffix))
|
||||
df.to_csv(out_path, index=False, header=not args.no_header)
|
||||
print(f"Zapisano {out_path} ({len(df)} rekordów)")
|
||||
|
||||
if args.concat and dfs:
|
||||
out_path = str(pathlib.Path(args.inputs[0]).with_suffix(args.out_suffix))
|
||||
big = pd.concat(dfs, ignore_index=True)
|
||||
big.to_csv(out_path, index=False, header=not args.no_header)
|
||||
print(f"\nZapisano scalony {out_path} ({len(big)} rekordów z {len(dfs)} plików)")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user