Files
streamstack/tests/core/test_auth.py
T
unkinben 2309e9f43a Initial commit — StreamStack v1
Five-service streaming platform: auth, catalogue, streaming, ingest, thumbnailer.
Includes React frontend served by nginx, NATS JetStream event bus, aiobotocore
async S3, PyAV video metadata + thumbnail extraction, service-to-service JWT auth,
and a full unit + e2e test suite.
2026-05-04 22:16:39 +10:00

61 lines
2.1 KiB
Python

import subprocess
from unittest.mock import patch
import pytest
from streamstack.core.auth import create_access_token, create_refresh_token, decode_token
@pytest.fixture(scope="module")
def rsa_key_pair(tmp_path_factory):
tmp = tmp_path_factory.mktemp("keys")
priv = tmp / "private.pem"
pub = tmp / "public.pem"
subprocess.run(
["openssl", "genrsa", "-out", str(priv), "2048"],
check=True,
capture_output=True,
)
subprocess.run(
["openssl", "rsa", "-in", str(priv), "-pubout", "-out", str(pub)],
check=True,
capture_output=True,
)
return str(priv), str(pub)
def test_access_token_roundtrip(rsa_key_pair):
priv, pub = rsa_key_pair
with patch("streamstack.core.auth.settings") as mock_settings:
mock_settings.jwt_private_key_path = priv
mock_settings.jwt_public_key_path = pub
mock_settings.jwt_algorithm = "RS256"
mock_settings.jwt_expire_minutes = 30
token = create_access_token("user-123", "test@example.com", ["viewer"])
with patch("streamstack.core.auth.settings") as mock_settings:
mock_settings.jwt_public_key_path = pub
mock_settings.jwt_algorithm = "RS256"
payload = decode_token(token)
assert payload["sub"] == "user-123"
assert payload["email"] == "test@example.com"
assert payload["roles"] == ["viewer"]
assert "jti" in payload
assert "exp" in payload
def test_refresh_token_has_type(rsa_key_pair):
priv, pub = rsa_key_pair
with patch("streamstack.core.auth.settings") as mock_settings:
mock_settings.jwt_private_key_path = priv
mock_settings.jwt_public_key_path = pub
mock_settings.jwt_algorithm = "RS256"
mock_settings.jwt_expire_minutes = 30
mock_settings.jwt_refresh_expire_days = 7
token = create_refresh_token("user-123")
with patch("streamstack.core.auth.settings") as mock_settings:
mock_settings.jwt_public_key_path = pub
mock_settings.jwt_algorithm = "RS256"
payload = decode_token(token)
assert payload["type"] == "refresh"
assert payload["sub"] == "user-123"