stratoflights/stratoflights_api/consumers.py
2025-08-15 02:28:53 +09:00

111 lines
3.7 KiB
Python

import json
import time
from channels.generic.websocket import AsyncWebsocketConsumer
from channels.db import database_sync_to_async
from django.contrib.auth import get_user_model
class TelemetryConsumer(AsyncWebsocketConsumer):
group_prefix = None
write_enabled = False
async def connect(self):
self.satellite_id = self.scope["url_route"]["kwargs"]["pk"]
self.group_name = f"telemetry_{self.satellite_id}"
# Присоединяемся к группе
await self.channel_layer.group_add(self.group_name, self.channel_name)
await self.accept()
async def receive(self, text_data):
from .serializers import TelemetryPacketSerializer
if not self.write_enabled:
await self.send(text_data=json.dumps({"error": "Read-only mode"}))
return
data = json.loads(text_data)
serializer = TelemetryPacketSerializer(data=data)
if not serializer.is_valid():
await self.send(text_data=json.dumps({"error": serializer.errors}))
return
saved_data = await self.save_telemetry(data)
await self.channel_layer.group_send(
self.group_name,
{
"type": "telemetry_message",
"data": {
"id": str(saved_data.id),
"timestamp": saved_data.timestamp,
"lat": saved_data.lat,
"lon": saved_data.lon,
"alt": saved_data.alt,
"payload": saved_data.payload,
"raw_data": saved_data.raw_data,
},
},
)
async def telemetry_message(self, event):
await self.send(text_data=json.dumps(event["data"]))
@database_sync_to_async
def get_user_from_token(self, token_key, Token):
from rest_framework.authtoken.models import Token
User = get_user_model()
try:
token = Token.objects.select_related("user").get(key=token_key)
return token.user
except Token.DoesNotExist:
return None
@database_sync_to_async
def save_telemetry(self, data):
from .models import Satellite
User = get_user_model()
from .models import TelemetryPacket
satellite = Satellite.objects.get(id=self.satellite_id)
packet = TelemetryPacket.objects.create(
satellite=satellite,
user=self.scope["user"],
timestamp=data.get("timestamp", int(time.time())),
lat=data.get("lat", 0.0),
lon=data.get("lon", 0.0),
alt=data.get("alt", 0.0),
payload=data.get("payload", {}),
raw_data=data.get("raw_data", {}),
)
return packet
class SatelliteTelemetryConsumer(TelemetryConsumer):
write_enabled = False
@classmethod
async def broadcast_to_satellite_group(cls, satellite_id, data, channel_layer):
group_name = f"telemetry_{satellite_id}"
await channel_layer.group_send(
group_name,
{
"type": "telemetry_message",
"data": data
}
)
class StationTelemetryConsumer(TelemetryConsumer):
write_enabled = True
async def connect(self):
from rest_framework.authtoken.models import Token
token_key = self.scope["query_string"].decode().split("token=")[-1]
try:
token = await database_sync_to_async(Token.objects.select_related("user").get)(key=token_key)
self.scope["user"] = token.user
except Token.DoesNotExist:
await self.close()
return
await super().connect()