from django.test import TestCase import json import pytest from channels.testing import WebsocketCommunicator from django.contrib.auth import get_user_model from rest_framework.authtoken.models import Token from stratoflights.asgi import application from api.models import TelemetryPacket, Satellite User = get_user_model() @pytest.mark.asyncio @pytest.mark.django_db(transaction=True) async def test_satellite_consumer_read_only(): satellite = Satellite.objects.create(name="TestSat") communicator = WebsocketCommunicator( application, f"/ws/{satellite.id}/satellite/" ) connected, _ = await communicator.connect() assert connected # Отправка данных в режиме read-only await communicator.send_json_to({ "temperature": 42 }) # Данных в БД не должно появиться assert TelemetryPacket.objects.count() == 0 await communicator.disconnect() @pytest.mark.asyncio @pytest.mark.django_db(transaction=True) async def test_station_consumer_with_auth(): satellite = Satellite.objects.create(name="TestSat") user = User.objects.create_user(username="station1", password="pass") token = Token.objects.create(user=user) communicator = WebsocketCommunicator( application, f"/ws/2db8e0cc-ea56-4e13-88d3-c248ef40cd67/station/?token=9f04ae202ea380915dd80ffe34151f5921897251" ) connected, _ = await communicator.connect() assert connected # Отправляем данные await communicator.send_json_to({ "temperature": 99 }) # Должен появиться один пакет телеметрии packets = list(TelemetryPacket.objects.all()) assert len(packets) == 1 assert packets[0].temperature == 99 assert packets[0].satellite == satellite await communicator.disconnect()