From d9a92569f0fcc53477d73e222006ce8bdff75e85 Mon Sep 17 00:00:00 2001 From: "afanasyev.aa" Date: Fri, 15 Aug 2025 02:28:53 +0900 Subject: [PATCH] fixed broadcast roles and permissions --- stratoflights_api/consumers.py | 57 +++++++++++++++++-------------- stratoflights_api/tests.py | 61 ++++++++++++++++++++++++++++++++- stratoflights_api/ws_client.py | 62 ---------------------------------- 3 files changed, 91 insertions(+), 89 deletions(-) delete mode 100644 stratoflights_api/ws_client.py diff --git a/stratoflights_api/consumers.py b/stratoflights_api/consumers.py index 5db343d..66dce9b 100644 --- a/stratoflights_api/consumers.py +++ b/stratoflights_api/consumers.py @@ -9,31 +9,13 @@ class TelemetryConsumer(AsyncWebsocketConsumer): write_enabled = False async def connect(self): - User = get_user_model() - from rest_framework.authtoken.models import Token - from .models import Satellite - - token_key = self.scope["query_string"].decode().split("token=")[-1] self.satellite_id = self.scope["url_route"]["kwargs"]["pk"] self.group_name = f"telemetry_{self.satellite_id}" - user = await self.get_user_from_token(token_key, Token) - if not user: - await self.send(text_data=json.dumps({"error": "Invalid token"})) - await self.close() - return - self.scope["user"] = user - - try: - self.satellite = await database_sync_to_async(Satellite.objects.get)(id=self.satellite_id) - except Satellite.DoesNotExist: - await self.send(text_data=json.dumps({"error": "Invalid satellite"})) - await self.close() - return - + # Присоединяемся к группе await self.channel_layer.group_add(self.group_name, self.channel_name) await self.accept() - + async def receive(self, text_data): from .serializers import TelemetryPacketSerializer @@ -69,9 +51,9 @@ class TelemetryConsumer(AsyncWebsocketConsumer): async def telemetry_message(self, event): await self.send(text_data=json.dumps(event["data"])) - @database_sync_to_async def get_user_from_token(self, token_key, Token): + from rest_framework.authtoken.models import Token User = get_user_model() try: token = Token.objects.select_related("user").get(key=token_key) @@ -81,10 +63,12 @@ class TelemetryConsumer(AsyncWebsocketConsumer): @database_sync_to_async def save_telemetry(self, data): + from .models import Satellite User = get_user_model() from .models import TelemetryPacket + satellite = Satellite.objects.get(id=self.satellite_id) packet = TelemetryPacket.objects.create( - satellite=self.satellite, + satellite=satellite, user=self.scope["user"], timestamp=data.get("timestamp", int(time.time())), lat=data.get("lat", 0.0), @@ -97,10 +81,31 @@ class TelemetryConsumer(AsyncWebsocketConsumer): class SatelliteTelemetryConsumer(TelemetryConsumer): - group_prefix = "satellite" - write_enabled = True + write_enabled = False + + @classmethod + async def broadcast_to_satellite_group(cls, satellite_id, data, channel_layer): + group_name = f"telemetry_{satellite_id}" + await channel_layer.group_send( + group_name, + { + "type": "telemetry_message", + "data": data + } + ) class StationTelemetryConsumer(TelemetryConsumer): - group_prefix = "station" - write_enabled = False \ No newline at end of file + write_enabled = True + + async def connect(self): + from rest_framework.authtoken.models import Token + token_key = self.scope["query_string"].decode().split("token=")[-1] + try: + token = await database_sync_to_async(Token.objects.select_related("user").get)(key=token_key) + self.scope["user"] = token.user + except Token.DoesNotExist: + await self.close() + return + + await super().connect() diff --git a/stratoflights_api/tests.py b/stratoflights_api/tests.py index 7ce503c..9510c65 100644 --- a/stratoflights_api/tests.py +++ b/stratoflights_api/tests.py @@ -1,3 +1,62 @@ from django.test import TestCase -# Create your tests here. +import json +import pytest +from channels.testing import WebsocketCommunicator +from django.contrib.auth import get_user_model +from rest_framework.authtoken.models import Token +from stratoflights.asgi import application +from api.models import TelemetryPacket, Satellite + +User = get_user_model() + + +@pytest.mark.asyncio +@pytest.mark.django_db(transaction=True) +async def test_satellite_consumer_read_only(): + satellite = Satellite.objects.create(name="TestSat") + + communicator = WebsocketCommunicator( + application, + f"/ws/{satellite.id}/satellite/" + ) + connected, _ = await communicator.connect() + assert connected + + # Отправка данных в режиме read-only + await communicator.send_json_to({ + "temperature": 42 + }) + + # Данных в БД не должно появиться + assert TelemetryPacket.objects.count() == 0 + + await communicator.disconnect() + + +@pytest.mark.asyncio +@pytest.mark.django_db(transaction=True) +async def test_station_consumer_with_auth(): + satellite = Satellite.objects.create(name="TestSat") + user = User.objects.create_user(username="station1", password="pass") + token = Token.objects.create(user=user) + + communicator = WebsocketCommunicator( + application, + f"/ws/2db8e0cc-ea56-4e13-88d3-c248ef40cd67/station/?token=9f04ae202ea380915dd80ffe34151f5921897251" + ) + connected, _ = await communicator.connect() + assert connected + + # Отправляем данные + await communicator.send_json_to({ + "temperature": 99 + }) + + # Должен появиться один пакет телеметрии + packets = list(TelemetryPacket.objects.all()) + assert len(packets) == 1 + assert packets[0].temperature == 99 + assert packets[0].satellite == satellite + + await communicator.disconnect() diff --git a/stratoflights_api/ws_client.py b/stratoflights_api/ws_client.py deleted file mode 100644 index 816263a..0000000 --- a/stratoflights_api/ws_client.py +++ /dev/null @@ -1,62 +0,0 @@ -import asyncio -import json -import websockets -import uuid -import time - -BASE_URL = "ws://localhost:8000/api/ws" -TOKEN = "ae397229f9ca50cd6320cb0416671ecc780671ac" -PK = "cf1e36ff-c5ce-4852-8b90-046737974b97" - -async def satellite_mode(): - """ - Клиент для отправки данных телеметрии. - """ - url = f"{BASE_URL}/satellite/{PK}/telemetry/?token={TOKEN}" - async with websockets.connect(url) as ws: - print(f"[satellite] Connected to {url}") - - while True: - telemetry_data = { - "timestamp": int(time.time()), - "lat": 55.75, - "lon": 37.61, - "alt": 200.0, - "payload": {"temp": 22.5, "status": "OK"}, - "raw_data": {"sensor": "gps", "accuracy": "high"} - } - await ws.send(json.dumps(telemetry_data)) - print(f"[satellite] Sent: {telemetry_data}") - - try: - response = await asyncio.wait_for(ws.recv(), timeout=2) - print(f"[satellite] Received: {response}") - except asyncio.TimeoutError: - print("[satellite] No response yet") - - await asyncio.sleep(5) - - -async def station_mode(): - """ - Клиент для приёма данных телеметрии. - """ - url = f"{BASE_URL}/station/{PK}/telemetry/?token={TOKEN}" - async with websockets.connect(url) as ws: - print(f"[station] Connected to {url}") - - while True: - try: - message = await ws.recv() - print(f"[station] Received: {message}") - except websockets.ConnectionClosed: - print("[station] Connection closed") - break - - -if __name__ == "__main__": - mode = input("Enter mode (satellite/station): ").strip().lower() - if mode == "satellite": - asyncio.run(satellite_mode()) - else: - asyncio.run(station_mode())