2309e9f43a
Five-service streaming platform: auth, catalogue, streaming, ingest, thumbnailer. Includes React frontend served by nginx, NATS JetStream event bus, aiobotocore async S3, PyAV video metadata + thumbnail extraction, service-to-service JWT auth, and a full unit + e2e test suite.
51 lines
1.3 KiB
Python
51 lines
1.3 KiB
Python
from collections.abc import AsyncGenerator
|
|
from unittest.mock import AsyncMock, MagicMock
|
|
|
|
import pytest
|
|
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
|
|
|
|
from streamstack.core.db import Base
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
async def db_engine():
|
|
engine = create_async_engine("sqlite+aiosqlite:///:memory:")
|
|
async with engine.begin() as conn:
|
|
await conn.run_sync(Base.metadata.create_all)
|
|
yield engine
|
|
await engine.dispose()
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
async def db_session(db_engine) -> AsyncGenerator[AsyncSession, None]:
|
|
session_factory = async_sessionmaker(db_engine, expire_on_commit=False)
|
|
async with session_factory() as session:
|
|
yield session
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_nats():
|
|
nc = AsyncMock()
|
|
js = AsyncMock()
|
|
kv = AsyncMock()
|
|
nc.jetstream.return_value = js
|
|
js.key_value.return_value = kv
|
|
js.create_key_value.return_value = kv
|
|
js.find_key_value.return_value = kv
|
|
return nc
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_s3_client():
|
|
client = MagicMock()
|
|
client.head_object.return_value = {
|
|
"ContentLength": 1024 * 1024,
|
|
"ContentType": "video/mp4",
|
|
}
|
|
return client
|
|
|
|
|
|
@pytest.fixture
|
|
def s3_content() -> bytes:
|
|
return b"fake video content " * 1000
|