""" Telemetry test client for stratoflights. Reads a trajectory file and sends one packet every N seconds to the station WebSocket endpoint: ws:///api/ws/station//telemetry/?token= Supported trajectory formats ----------------------------- JSON array (default): [ {"lat": 62.1, "lon": 129.4, "alt": 500.0, "timestamp": 1716000000}, ... ] CSV (first row = header): lat,lon,alt,timestamp 62.1,129.4,500.0,1716000000 Balloon log (.log): Supports both "beacon" and "tracking" log variants produced by the on-board firmware. Each record has a bracketed position header, followed by angle-bracket telemetry lines. Tracking logs may also embed a Python- style dict with already-decoded coordinates (used as a cross-check). Header: [1;;;;;] Telemetry: <1;;;;<...more fields...>;> Optional: {'lat': 62.1, 'lon': 129.4, 'alt': 500} (tracking only) Coordinate decoding: lat = lat_raw / 1_000_000 lon = lon_raw / 1_000_000 alt = alt_m (metres, integer) Records with no GPS fix (lat_raw == 0 and lon_raw == 0) are skipped. Exact duplicate positions (same lat / lon / alt triple) are removed. Usage ----- python client.py --server ws://localhost:8000 \\ --satellite \\ --token \\ --file trajectory.json \\ --interval 10 # Balloon log files: python client.py --satellite --token --file beacon-20250406.log python client.py --satellite --token --file tracking-20250406-02.log # Generate a sample trajectory file then run: python client.py --generate sample.json python client.py --satellite --token --file sample.json """ import argparse import asyncio import csv import json import math import re import sys import time from pathlib import Path # 2025-04-06 03:00:00 UTC BASE_TS = 1743908400 # --------------------------------------------------------------------------- # Trajectory loaders # --------------------------------------------------------------------------- def load_json(path: Path) -> list[dict]: data = json.loads(path.read_text()) if not isinstance(data, list): # Support Tawhiri-style nested format if "prediction" in data: points = [] for stage in data["prediction"]: for p in stage.get("trajectory", []): points.append({ "lat": p["latitude"], "lon": p["longitude"], "alt": p["altitude"], "timestamp": p.get("time", int(time.time())), }) return points raise ValueError("JSON file must be an array or a Tawhiri prediction object") return data def load_csv(path: Path) -> list[dict]: points = [] with path.open(newline="") as f: reader = csv.DictReader(f) for row in reader: points.append({ "lat": float(row["lat"]), "lon": float(row["lon"]), "alt": float(row["alt"]), "timestamp": int(row.get("timestamp", BASE_TS + len(points) * 10)), }) return points # Matches the position header line: [1;;;;;] _HEADER_RE = re.compile(r'^\[1;(-?\d+);(-?\d+);(-?\d+);(\d+);]') # Matches the start of a telemetry line to extract the sequence number _TELEM_RE = re.compile(r'^<1;(\d+);') def load_log(path: Path) -> list[dict]: """Parse beacon or tracking .log files into a trajectory point list. Duplicate positions (identical lat/lon/alt) are silently dropped. Points with no GPS fix (lat_raw == lon_raw == 0) are also dropped. """ base_ts = BASE_TS points: list[dict] = [] seen: set[tuple] = set() lines = path.read_text(errors="replace").splitlines() i = 0 while i < len(lines): m = _HEADER_RE.match(lines[i].strip()) if not m: i += 1 continue lat_raw = int(m.group(1)) lon_raw = int(m.group(2)) alt_raw = int(m.group(3)) # No GPS fix yet if lat_raw == 0 and lon_raw == 0: i += 1 continue lat = round(lat_raw / 1_000_000, 6) lon = round(lon_raw / 1_000_000, 6) alt = float(alt_raw) # Drop exact-position duplicates key = (lat, lon, alt_raw) if key in seen: i += 1 continue seen.add(key) # Grab the sequence number from the telemetry line that follows seq = len(points) for j in range(i + 1, min(i + 5, len(lines))): tm = _TELEM_RE.match(lines[j].strip()) if tm: seq = int(tm.group(1)) break points.append({ "lat": lat, "lon": lon, "alt": alt, "timestamp": base_ts + seq, }) i += 1 return points def load_trajectory(path: Path) -> list[dict]: suffix = path.suffix.lower() if suffix == ".csv": return load_csv(path) if suffix == ".log": return load_log(path) return load_json(path) def deduplicate(points: list[dict]) -> list[dict]: """Remove points with identical (lat, lon, alt), preserving order.""" seen: set[tuple] = set() result = [] for p in points: key = (round(float(p.get("lat", 0)), 6), round(float(p.get("lon", 0)), 6), round(float(p.get("alt", 0)), 1)) if key not in seen: seen.add(key) result.append(p) return result # --------------------------------------------------------------------------- # Sample generator # --------------------------------------------------------------------------- def generate_sample(path: Path, n: int = 50) -> None: """Generate a simple ascending balloon trajectory.""" base_lat, base_lon = 62.0, 129.5 base_ts = BASE_TS points = [] for i in range(n): angle = i * 0.05 points.append({ "lat": round(base_lat + math.sin(angle) * 0.02, 6), "lon": round(base_lon + i * 0.003, 6), "alt": round(500.0 + i * 200.0, 1), "timestamp": base_ts + i * 10, }) path.write_text(json.dumps(points, indent=2)) print(f"Sample trajectory written to {path} ({n} points)") # --------------------------------------------------------------------------- # WebSocket sender # --------------------------------------------------------------------------- async def run_ws(server: str, satellite: str, token: str, points: list[dict], interval: float, loop: bool) -> None: try: import websockets except ImportError: sys.exit("websockets package not found — run: pip install websockets") url = f"{server.rstrip('/')}/api/ws/station/{satellite}/telemetry/?token={token}" print(f"Connecting to {url}") iteration = 0 abs_idx = 0 # never resets across loop iterations — drives advancing timestamps while True: try: async with websockets.connect(url) as ws: print("Connected.") for point in points: packet = { "lat": float(point.get("lat", 0)), "lon": float(point.get("lon", 0)), "alt": float(point.get("alt", 0)), "timestamp": BASE_TS + abs_idx * int(interval), "payload": point.get("payload", {}), "raw_data": point.get("raw_data", {}), } abs_idx += 1 await ws.send(json.dumps(packet)) print( f"[{abs_idx}/{len(points)}] " f"lat={packet['lat']:.5f} " f"lon={packet['lon']:.5f} " f"alt={packet['alt']:.1f} m " f"ts={packet['timestamp']}" ) try: reply = await asyncio.wait_for(ws.recv(), timeout=2.0) data = json.loads(reply) if "error" in data: print(f" Server error: {data['error']}") except asyncio.TimeoutError: pass # server only responds on errors await asyncio.sleep(interval) print("All points sent.") iteration += 1 if not loop: break print(f"Looping (iteration {iteration+1})...") await asyncio.sleep(interval) except Exception as e: print(f"Connection error: {e}. Retrying in 5s...") await asyncio.sleep(5) if not loop: break # --------------------------------------------------------------------------- # REST sender (fallback, no auth needed per stratoflights AllowAny policy) # --------------------------------------------------------------------------- async def run_rest(server: str, satellite: str, points: list[dict], interval: float, loop: bool) -> None: try: import aiohttp except ImportError: sys.exit("aiohttp package not found — run: pip install aiohttp") url = f"{server.rstrip('/')}/api/{satellite}/telemetry/" print(f"Sending {len(points)} points to {url} in order, no delay...") async with aiohttp.ClientSession() as session: while True: for i, point in enumerate(points): packet = { "lat": float(point.get("lat", 0)), "lon": float(point.get("lon", 0)), "alt": float(point.get("alt", 0)), "timestamp": int(point.get("timestamp", BASE_TS + i * int(interval))), "payload": point.get("payload", {}), } async with session.post(url, json=packet) as resp: print( f"[{i+1}/{len(points)}] " f"lat={packet['lat']:.5f} lon={packet['lon']:.5f} " f"alt={packet['alt']:.1f} m → HTTP {resp.status}" ) print("All points sent.") if not loop: break print("Looping...") await asyncio.sleep(interval) # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- def main() -> None: parser = argparse.ArgumentParser(description="Stratoflights telemetry test client") parser.add_argument("--server", default="ws://localhost:8000", help="Base server URL (default: ws://localhost:8000)") parser.add_argument("--satellite", help="Satellite UUID") parser.add_argument("--token", help="Auth token (required for WebSocket mode)") parser.add_argument("--file", nargs='+', metavar="FILE", help="One or more trajectory files (.json, .csv, or .log); merged in order") parser.add_argument("--interval", type=float, default=5.0, help="Seconds between packets (default: 10)") parser.add_argument("--mode", choices=["ws", "rest"], default="ws", help="Transport: ws (WebSocket, default) or rest (HTTP POST)") parser.add_argument("--loop", action="store_true", help="Replay the trajectory in a loop indefinitely") parser.add_argument("--generate", metavar="FILE", help="Generate a sample trajectory JSON and exit") args = parser.parse_args() if args.generate: generate_sample(Path(args.generate)) return if not args.satellite: parser.error("--satellite is required") if not args.file: parser.error("--file is required") if args.mode == "ws" and not args.token: parser.error("--token is required for WebSocket mode") merged: list[dict] = [] for f in args.file: path = Path(f) if not path.exists(): sys.exit(f"File not found: {path}") loaded = load_trajectory(path) print(f" {path}: {len(loaded)} points") merged.extend(loaded) before = len(merged) points = deduplicate(merged) removed = before - len(points) if not points: sys.exit("No valid points found in the provided files") print(f"Total: {before} points" + (f", removed {removed} duplicates → {len(points)} unique" if removed else f" ({len(points)} unique)")) if args.mode == "ws": # Swap http(s) → ws(s) if the user passed an HTTP URL server = args.server.replace("http://", "ws://").replace("https://", "wss://") asyncio.run(run_ws(server, args.satellite, args.token, points, args.interval, args.loop)) else: server = args.server.replace("ws://", "http://").replace("wss://", "https://") asyncio.run(run_rest(server, args.satellite, points, args.interval, args.loop)) if __name__ == "__main__": main()