#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Konwersja .wmt do JSON i csv # 22.03.2026 LK # python3 reader2.py '/Users/lklich/Desktop/Adam Błachowicz' -a -f -d <-wszystkie, +podfoldery, usuwa stare wmt from __future__ import annotations import argparse import pathlib import struct import json import pandas as pd from datetime import datetime from typing import Tuple, Dict, Any HEADER_FMT = "<3sHHIII" # 19 B HEADER_SIZE = struct.calcsize(HEADER_FMT) SAMPLE_FMT = " Tuple[pd.DataFrame, Dict[str, Any]]: blob = path.read_bytes() if len(blob) < HEADER_SIZE: raise ValueError(f"{path.name}: plik zbyt krótki ({len(blob)} B).") magic, version, headerSize, sampleSize, start_unix, reccount = struct.unpack_from(HEADER_FMT, blob, 0) if magic != b"WMT": raise ValueError(f"{path.name}: niepoprawny magic {magic!r}.") dt_obj = datetime.fromtimestamp(start_unix) start_time_pl = dt_obj.strftime("%d.%m.%Y %H:%M:%S") start_time_iso = dt_obj.isoformat() data = blob[headerSize:] nrec = len(data) // SAMPLE_SIZE meta = { "filename": path.name, "full_path": str(path.absolute()), "wmt_version": version, "start_unix": start_unix, "start_time_iso": start_time_iso, "start_time_pl": start_time_pl, "declared_reccount": reccount, "actual_reccount": nrec, "is_incomplete": len(data) % SAMPLE_SIZE != 0 } rows = [] off = 0 for _ in range(nrec): rec = data[off:off+SAMPLE_SIZE] offset_us, sensor_id, x, y, z, ready = struct.unpack(SAMPLE_FMT, rec) unix_ts = start_unix + (offset_us / 1_000_000.0) rows.append((start_unix, offset_us, unix_ts, sensor_id, x, y, z, int(ready))) off += SAMPLE_SIZE df = pd.DataFrame(rows, columns=["start_unix", "offset_us", "unix_ts", "sensor_id", "x", "y", "z", "ready"]) df = df.astype({"sensor_id": "int32", "x": "int32", "y": "int32", "z": "int32", "ready": "int32"}) return df, meta def save_outputs(df: pd.DataFrame, base_path: pathlib.Path, meta: Dict[str, Any], no_header: bool, suffix: str): csv_path = base_path.with_suffix(suffix) json_path = base_path.with_suffix(".json") with open(csv_path, 'w', encoding='utf-8') as f: f.write(f"# Plik: {meta.get('filename', 'N/A')}\n") f.write(f"# Wersja WMT: {meta.get('wmt_version', 'N/A')}\n") f.write(f"# Start: {meta.get('start_time_pl', 'N/A')} (Unix: {meta.get('start_unix', 0)})\n") f.write(f"# Rekordy: {meta.get('actual_reccount', 0)}\n") df.to_csv(f, index=False, header=not no_header) with open(json_path, 'w', encoding='utf-8') as f: json.dump(meta, f, indent=4, ensure_ascii=False) def main(): ap = argparse.ArgumentParser(description="Konwersja .wmt -> CSV + JSON.") ap.add_argument("input", help="Ścieżka do pliku lub katalogu") ap.add_argument("-a", "--all", action="store_true", help="Przetwarzaj wszystkie pliki .wmt w katalogu") ap.add_argument("-f", "--recursive", action="store_true", help="Przeszukuj podfoldery (wymaga -a)") ap.add_argument("-o", "--overwrite", action="store_true", help="Nadpisuj istniejące pliki wyjściowe") ap.add_argument("-d", "--delete", action="store_true", help="USUŃ plik .wmt po udanej konwersji") ap.add_argument("--out-suffix", default=".csv", help="Sufiks CSV") ap.add_argument("--concat", action="store_true", help="Scal dane do jednego pliku") ap.add_argument("--no-header", action="store_true", help="CSV bez nagłówka kolumn") args = ap.parse_args() input_path = pathlib.Path(args.input) files_to_process = [] if args.all: if not input_path.is_dir(): print(f"[ERROR] Ścieżka {input_path} nie jest katalogiem!") return files_to_process = sorted(list(input_path.rglob("*.wmt") if args.recursive else input_path.glob("*.wmt"))) else: if input_path.is_dir(): print(f"[ERROR] Podano katalog, ale brakuje -a.") return files_to_process = [input_path] if not files_to_process: print(f"[INFO] Brak plików do przetworzenia.") return dfs = [] combined_meta = [] successfully_processed = [] for p in files_to_process: csv_check = p.with_suffix(args.out_suffix) json_check = p.with_suffix(".json") if not args.overwrite and not args.concat: if csv_check.exists() or json_check.exists(): print(f"[SKIP] Pominięto {p.name} - pliki wyjściowe już istnieją.") continue try: df, meta = read_wmt_one(p) if args.concat: df["_source_path"] = str(p.relative_to(input_path)) dfs.append(df) combined_meta.append(meta) successfully_processed.append(p) else: save_outputs(df, p, meta, args.no_header, args.out_suffix) print(f"Przetworzono: {p.name}") if args.delete: p.unlink() print(f" [DEL] Usunięto plik źródłowy: {p.name}") except Exception as e: print(f"[ERROR] Błąd w pliku {p.name}: {e}") # Obsługa zapisu zbiorczego (concat) if args.concat and dfs: out_base = input_path / input_path.name if input_path.is_dir() else files_to_process[0] big_df = pd.concat(dfs, ignore_index=True) meta_summary = { "description": "Scalony zestaw danych", "files_count": len(combined_meta), "sources": combined_meta } try: save_outputs(big_df, out_base, meta_summary, args.no_header, args.out_suffix) print(f"\nZapisano scalone dane do {out_base.with_suffix(args.out_suffix)}") # Usuwamy pliki źródłowe dopiero po udanym scaleniu if args.delete: for p in successfully_processed: p.unlink() print(f" [DEL] Usunięto {len(successfully_processed)} plików źródłowych po scaleniu.") except Exception as e: print(f"[ERROR] Błąd zapisu pliku scalonego: {e}") if __name__ == "__main__": main()