import time from unittest.mock import AsyncMock, MagicMock import pytest from streamstack.streaming.tokens import ( TOKEN_TTL_SECONDS, create_stream_token, resolve_stream_token, ) @pytest.fixture def mock_nc(): nc = AsyncMock() js = AsyncMock() kv = AsyncMock() # jetstream() is a synchronous call in nats.py nc.jetstream = MagicMock(return_value=js) js.key_value = AsyncMock(return_value=kv) return nc, kv @pytest.mark.asyncio async def test_create_returns_token(mock_nc): nc, kv = mock_nc token = await create_stream_token(nc, "media-uuid", "user-123") assert len(token) > 20 kv.put.assert_called_once() key_used = kv.put.call_args[0][0] assert key_used == token @pytest.mark.asyncio async def test_resolve_valid_token(mock_nc): nc, kv = mock_nc now = int(time.time()) entry = MagicMock() entry.value = f"media-uuid|user-123|{now}".encode() kv.get.return_value = entry result = await resolve_stream_token(nc, "some-token") assert result == ("media-uuid", "user-123") @pytest.mark.asyncio async def test_resolve_expired_token(mock_nc): nc, kv = mock_nc expired_time = int(time.time()) - TOKEN_TTL_SECONDS - 1 entry = MagicMock() entry.value = f"media-uuid|user-123|{expired_time}".encode() kv.get.return_value = entry result = await resolve_stream_token(nc, "some-token") assert result is None kv.delete.assert_called_once_with("some-token") @pytest.mark.asyncio async def test_resolve_missing_token(mock_nc): nc, kv = mock_nc kv.get.side_effect = Exception("not found") result = await resolve_stream_token(nc, "nonexistent") assert result is None @pytest.mark.asyncio async def test_resolve_malformed_token(mock_nc): nc, kv = mock_nc entry = MagicMock() entry.value = b"bad-data" kv.get.return_value = entry result = await resolve_stream_token(nc, "some-token") assert result is None