stratoflights/stratoflights_api/tests.py
2025-08-15 02:28:53 +09:00

62 lines
1.8 KiB
Python

from django.test import TestCase
import json
import pytest
from channels.testing import WebsocketCommunicator
from django.contrib.auth import get_user_model
from rest_framework.authtoken.models import Token
from stratoflights.asgi import application
from api.models import TelemetryPacket, Satellite
User = get_user_model()
@pytest.mark.asyncio
@pytest.mark.django_db(transaction=True)
async def test_satellite_consumer_read_only():
satellite = Satellite.objects.create(name="TestSat")
communicator = WebsocketCommunicator(
application,
f"/ws/{satellite.id}/satellite/"
)
connected, _ = await communicator.connect()
assert connected
# Отправка данных в режиме read-only
await communicator.send_json_to({
"temperature": 42
})
# Данных в БД не должно появиться
assert TelemetryPacket.objects.count() == 0
await communicator.disconnect()
@pytest.mark.asyncio
@pytest.mark.django_db(transaction=True)
async def test_station_consumer_with_auth():
satellite = Satellite.objects.create(name="TestSat")
user = User.objects.create_user(username="station1", password="pass")
token = Token.objects.create(user=user)
communicator = WebsocketCommunicator(
application,
f"/ws/2db8e0cc-ea56-4e13-88d3-c248ef40cd67/station/?token=9f04ae202ea380915dd80ffe34151f5921897251"
)
connected, _ = await communicator.connect()
assert connected
# Отправляем данные
await communicator.send_json_to({
"temperature": 99
})
# Должен появиться один пакет телеметрии
packets = list(TelemetryPacket.objects.all())
assert len(packets) == 1
assert packets[0].temperature == 99
assert packets[0].satellite == satellite
await communicator.disconnect()