Files
unkinben 2309e9f43a Initial commit — StreamStack v1
Five-service streaming platform: auth, catalogue, streaming, ingest, thumbnailer.
Includes React frontend served by nginx, NATS JetStream event bus, aiobotocore
async S3, PyAV video metadata + thumbnail extraction, service-to-service JWT auth,
and a full unit + e2e test suite.
2026-05-04 22:16:39 +10:00

51 lines
1.3 KiB
Python

from collections.abc import AsyncGenerator
from unittest.mock import AsyncMock, MagicMock
import pytest
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from streamstack.core.db import Base
@pytest.fixture(scope="function")
async def db_engine():
engine = create_async_engine("sqlite+aiosqlite:///:memory:")
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield engine
await engine.dispose()
@pytest.fixture(scope="function")
async def db_session(db_engine) -> AsyncGenerator[AsyncSession, None]:
session_factory = async_sessionmaker(db_engine, expire_on_commit=False)
async with session_factory() as session:
yield session
@pytest.fixture
def mock_nats():
nc = AsyncMock()
js = AsyncMock()
kv = AsyncMock()
nc.jetstream.return_value = js
js.key_value.return_value = kv
js.create_key_value.return_value = kv
js.find_key_value.return_value = kv
return nc
@pytest.fixture
def mock_s3_client():
client = MagicMock()
client.head_object.return_value = {
"ContentLength": 1024 * 1024,
"ContentType": "video/mp4",
}
return client
@pytest.fixture
def s3_content() -> bytes:
return b"fake video content " * 1000