leaflet_svelte/test_client/client.py
2026-06-17 00:20:55 +09:00

372 lines
13 KiB
Python

"""
Telemetry test client for stratoflights.
Reads a trajectory file and sends one packet every N seconds to the station
WebSocket endpoint: ws://<host>/api/ws/station/<satellite_id>/telemetry/?token=<token>
Supported trajectory formats
-----------------------------
JSON array (default):
[
{"lat": 62.1, "lon": 129.4, "alt": 500.0, "timestamp": 1716000000},
...
]
CSV (first row = header):
lat,lon,alt,timestamp
62.1,129.4,500.0,1716000000
Balloon log (.log):
Supports both "beacon" and "tracking" log variants produced by the
on-board firmware. Each record has a bracketed position header, followed
by angle-bracket telemetry lines. Tracking logs may also embed a Python-
style dict with already-decoded coordinates (used as a cross-check).
Header: [1;<lat_raw>;<lon_raw>;<alt_m>;<frame_id>;]
Telemetry: <1;<seq>;<mode>;<alt>;<...more fields...>;>
Optional: {'lat': 62.1, 'lon': 129.4, 'alt': 500} (tracking only)
Coordinate decoding:
lat = lat_raw / 1_000_000
lon = lon_raw / 1_000_000
alt = alt_m (metres, integer)
Records with no GPS fix (lat_raw == 0 and lon_raw == 0) are skipped.
Exact duplicate positions (same lat / lon / alt triple) are removed.
Usage
-----
python client.py --server ws://localhost:8000 \\
--satellite <uuid> \\
--token <auth-token> \\
--file trajectory.json \\
--interval 10
# Balloon log files:
python client.py --satellite <uuid> --token <token> --file beacon-20250406.log
python client.py --satellite <uuid> --token <token> --file tracking-20250406-02.log
# Generate a sample trajectory file then run:
python client.py --generate sample.json
python client.py --satellite <uuid> --token <token> --file sample.json
"""
import argparse
import asyncio
import csv
import json
import math
import re
import sys
import time
from pathlib import Path
# 2025-04-06 03:00:00 UTC
BASE_TS = 1743908400
# ---------------------------------------------------------------------------
# Trajectory loaders
# ---------------------------------------------------------------------------
def load_json(path: Path) -> list[dict]:
data = json.loads(path.read_text())
if not isinstance(data, list):
# Support Tawhiri-style nested format
if "prediction" in data:
points = []
for stage in data["prediction"]:
for p in stage.get("trajectory", []):
points.append({
"lat": p["latitude"],
"lon": p["longitude"],
"alt": p["altitude"],
"timestamp": p.get("time", int(time.time())),
})
return points
raise ValueError("JSON file must be an array or a Tawhiri prediction object")
return data
def load_csv(path: Path) -> list[dict]:
points = []
with path.open(newline="") as f:
reader = csv.DictReader(f)
for row in reader:
points.append({
"lat": float(row["lat"]),
"lon": float(row["lon"]),
"alt": float(row["alt"]),
"timestamp": int(row.get("timestamp", BASE_TS + len(points) * 10)),
})
return points
# Matches the position header line: [1;<lat_raw>;<lon_raw>;<alt_m>;<frame_id>;]
_HEADER_RE = re.compile(r'^\[1;(-?\d+);(-?\d+);(-?\d+);(\d+);]')
# Matches the start of a telemetry line to extract the sequence number
_TELEM_RE = re.compile(r'^<1;(\d+);')
def load_log(path: Path) -> list[dict]:
"""Parse beacon or tracking .log files into a trajectory point list.
Duplicate positions (identical lat/lon/alt) are silently dropped.
Points with no GPS fix (lat_raw == lon_raw == 0) are also dropped.
"""
base_ts = BASE_TS
points: list[dict] = []
seen: set[tuple] = set()
lines = path.read_text(errors="replace").splitlines()
i = 0
while i < len(lines):
m = _HEADER_RE.match(lines[i].strip())
if not m:
i += 1
continue
lat_raw = int(m.group(1))
lon_raw = int(m.group(2))
alt_raw = int(m.group(3))
# No GPS fix yet
if lat_raw == 0 and lon_raw == 0:
i += 1
continue
lat = round(lat_raw / 1_000_000, 6)
lon = round(lon_raw / 1_000_000, 6)
alt = float(alt_raw)
# Drop exact-position duplicates
key = (lat, lon, alt_raw)
if key in seen:
i += 1
continue
seen.add(key)
# Grab the sequence number from the telemetry line that follows
seq = len(points)
for j in range(i + 1, min(i + 5, len(lines))):
tm = _TELEM_RE.match(lines[j].strip())
if tm:
seq = int(tm.group(1))
break
points.append({
"lat": lat,
"lon": lon,
"alt": alt,
"timestamp": base_ts + seq,
})
i += 1
return points
def load_trajectory(path: Path) -> list[dict]:
suffix = path.suffix.lower()
if suffix == ".csv":
return load_csv(path)
if suffix == ".log":
return load_log(path)
return load_json(path)
def deduplicate(points: list[dict]) -> list[dict]:
"""Remove points with identical (lat, lon, alt), preserving order."""
seen: set[tuple] = set()
result = []
for p in points:
key = (round(float(p.get("lat", 0)), 6),
round(float(p.get("lon", 0)), 6),
round(float(p.get("alt", 0)), 1))
if key not in seen:
seen.add(key)
result.append(p)
return result
# ---------------------------------------------------------------------------
# Sample generator
# ---------------------------------------------------------------------------
def generate_sample(path: Path, n: int = 50) -> None:
"""Generate a simple ascending balloon trajectory."""
base_lat, base_lon = 62.0, 129.5
base_ts = BASE_TS
points = []
for i in range(n):
angle = i * 0.05
points.append({
"lat": round(base_lat + math.sin(angle) * 0.02, 6),
"lon": round(base_lon + i * 0.003, 6),
"alt": round(500.0 + i * 200.0, 1),
"timestamp": base_ts + i * 10,
})
path.write_text(json.dumps(points, indent=2))
print(f"Sample trajectory written to {path} ({n} points)")
# ---------------------------------------------------------------------------
# WebSocket sender
# ---------------------------------------------------------------------------
async def run_ws(server: str, satellite: str, token: str,
points: list[dict], interval: float, loop: bool) -> None:
try:
import websockets
except ImportError:
sys.exit("websockets package not found — run: pip install websockets")
url = f"{server.rstrip('/')}/api/ws/station/{satellite}/telemetry/?token={token}"
print(f"Connecting to {url}")
iteration = 0
abs_idx = 0 # never resets across loop iterations — drives advancing timestamps
while True:
try:
async with websockets.connect(url) as ws:
print("Connected.")
for point in points:
packet = {
"lat": float(point.get("lat", 0)),
"lon": float(point.get("lon", 0)),
"alt": float(point.get("alt", 0)),
"timestamp": BASE_TS + abs_idx * int(interval),
"payload": point.get("payload", {}),
"raw_data": point.get("raw_data", {}),
}
abs_idx += 1
await ws.send(json.dumps(packet))
print(
f"[{abs_idx}/{len(points)}] "
f"lat={packet['lat']:.5f} "
f"lon={packet['lon']:.5f} "
f"alt={packet['alt']:.1f} m "
f"ts={packet['timestamp']}"
)
try:
reply = await asyncio.wait_for(ws.recv(), timeout=2.0)
data = json.loads(reply)
if "error" in data:
print(f" Server error: {data['error']}")
except asyncio.TimeoutError:
pass # server only responds on errors
await asyncio.sleep(interval)
print("All points sent.")
iteration += 1
if not loop:
break
print(f"Looping (iteration {iteration+1})...")
await asyncio.sleep(interval)
except Exception as e:
print(f"Connection error: {e}. Retrying in 5s...")
await asyncio.sleep(5)
if not loop:
break
# ---------------------------------------------------------------------------
# REST sender (fallback, no auth needed per stratoflights AllowAny policy)
# ---------------------------------------------------------------------------
async def run_rest(server: str, satellite: str,
points: list[dict], interval: float, loop: bool) -> None:
try:
import aiohttp
except ImportError:
sys.exit("aiohttp package not found — run: pip install aiohttp")
url = f"{server.rstrip('/')}/api/{satellite}/telemetry/"
print(f"Sending {len(points)} points to {url} in order, no delay...")
async with aiohttp.ClientSession() as session:
while True:
for i, point in enumerate(points):
packet = {
"lat": float(point.get("lat", 0)),
"lon": float(point.get("lon", 0)),
"alt": float(point.get("alt", 0)),
"timestamp": int(point.get("timestamp", BASE_TS + i * int(interval))),
"payload": point.get("payload", {}),
}
async with session.post(url, json=packet) as resp:
print(
f"[{i+1}/{len(points)}] "
f"lat={packet['lat']:.5f} lon={packet['lon']:.5f} "
f"alt={packet['alt']:.1f} m → HTTP {resp.status}"
)
print("All points sent.")
if not loop:
break
print("Looping...")
await asyncio.sleep(interval)
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main() -> None:
parser = argparse.ArgumentParser(description="Stratoflights telemetry test client")
parser.add_argument("--server", default="ws://localhost:8000",
help="Base server URL (default: ws://localhost:8000)")
parser.add_argument("--satellite", help="Satellite UUID")
parser.add_argument("--token", help="Auth token (required for WebSocket mode)")
parser.add_argument("--file", nargs='+', metavar="FILE",
help="One or more trajectory files (.json, .csv, or .log); merged in order")
parser.add_argument("--interval", type=float, default=5.0,
help="Seconds between packets (default: 10)")
parser.add_argument("--mode", choices=["ws", "rest"], default="ws",
help="Transport: ws (WebSocket, default) or rest (HTTP POST)")
parser.add_argument("--loop", action="store_true",
help="Replay the trajectory in a loop indefinitely")
parser.add_argument("--generate", metavar="FILE",
help="Generate a sample trajectory JSON and exit")
args = parser.parse_args()
if args.generate:
generate_sample(Path(args.generate))
return
if not args.satellite:
parser.error("--satellite is required")
if not args.file:
parser.error("--file is required")
if args.mode == "ws" and not args.token:
parser.error("--token is required for WebSocket mode")
merged: list[dict] = []
for f in args.file:
path = Path(f)
if not path.exists():
sys.exit(f"File not found: {path}")
loaded = load_trajectory(path)
print(f" {path}: {len(loaded)} points")
merged.extend(loaded)
before = len(merged)
points = deduplicate(merged)
removed = before - len(points)
if not points:
sys.exit("No valid points found in the provided files")
print(f"Total: {before} points" +
(f", removed {removed} duplicates → {len(points)} unique" if removed else f" ({len(points)} unique)"))
if args.mode == "ws":
# Swap http(s) → ws(s) if the user passed an HTTP URL
server = args.server.replace("http://", "ws://").replace("https://", "wss://")
asyncio.run(run_ws(server, args.satellite, args.token, points, args.interval, args.loop))
else:
server = args.server.replace("ws://", "http://").replace("wss://", "https://")
asyncio.run(run_rest(server, args.satellite, points, args.interval, args.loop))
if __name__ == "__main__":
main()