refactor: centralize test fixtures with transactional cleanup
This commit is contained in:
parent
c215b8e45f
commit
0614770e17
@ -2,12 +2,31 @@ import asyncio
|
||||
from collections.abc import AsyncGenerator
|
||||
|
||||
import pytest
|
||||
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
|
||||
from httpx import ASGITransport, AsyncClient
|
||||
from sqlalchemy import text
|
||||
from sqlalchemy.ext.asyncio import (
|
||||
AsyncSession,
|
||||
async_sessionmaker,
|
||||
create_async_engine,
|
||||
)
|
||||
|
||||
from proxy_pool.config import Settings, DatabaseSettings, RedisSettings
|
||||
from proxy_pool.app import create_app
|
||||
from proxy_pool.config import DatabaseSettings, Settings, RedisSettings
|
||||
from proxy_pool.db.base import Base
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def client() -> AsyncGenerator[AsyncClient, None]:
|
||||
app = create_app()
|
||||
async with app.router.lifespan_context(app):
|
||||
transport = ASGITransport(app=app)
|
||||
async with AsyncClient(
|
||||
transport=transport,
|
||||
base_url="http://test",
|
||||
) as client:
|
||||
yield client
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def event_loop():
|
||||
loop = asyncio.new_event_loop()
|
||||
@ -35,17 +54,18 @@ def test_settings() -> Settings:
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
async def engine(test_settings: Settings):
|
||||
engine = create_async_engine(test_settings.db.url)
|
||||
yield engine
|
||||
await engine.dispose()
|
||||
eng = create_async_engine(test_settings.db.url)
|
||||
yield eng
|
||||
await eng.dispose()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def db_session(engine) -> AsyncGenerator[AsyncSession, None]:
|
||||
async with engine.begin() as conn:
|
||||
await conn.run_sync(Base.metadata.create_all)
|
||||
async with engine.connect() as conn:
|
||||
transaction = await conn.begin()
|
||||
session = AsyncSession(bind=conn, expire_on_commit=False)
|
||||
|
||||
session_factory = async_sessionmaker(engine, expire_on_commit=False)
|
||||
async with session_factory() as session:
|
||||
yield session
|
||||
await session.rollback()
|
||||
|
||||
await session.close()
|
||||
await transaction.rollback()
|
||||
@ -8,15 +8,6 @@ from proxy_pool.app import create_app
|
||||
from proxy_pool.proxy.models import Proxy, ProxyProtocol, ProxySource, ProxyStatus
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def client():
|
||||
app = create_app()
|
||||
async with app.router.lifespan_context(app):
|
||||
transport = ASGITransport(app=app)
|
||||
async with AsyncClient(transport=transport, base_url="http://test") as client:
|
||||
yield client
|
||||
|
||||
|
||||
async def register_user(client):
|
||||
email = f"test-{uuid.uuid4().hex[:8]}@example.com"
|
||||
resp = await client.post(
|
||||
|
||||
@ -4,15 +4,6 @@ from httpx import ASGITransport, AsyncClient
|
||||
from proxy_pool.app import create_app
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def client():
|
||||
app = create_app()
|
||||
async with app.router.lifespan_context(app):
|
||||
transport = ASGITransport(app=app)
|
||||
async with AsyncClient(transport=transport, base_url="http://test") as client:
|
||||
yield client
|
||||
|
||||
|
||||
async def register_user(client, email=None):
|
||||
"""Helper to register and return (user_data, raw_api_key)."""
|
||||
import uuid
|
||||
|
||||
@ -9,15 +9,6 @@ from proxy_pool.app import create_app
|
||||
from proxy_pool.proxy.models import Proxy, ProxyProtocol, ProxySource, ProxyStatus
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def client():
|
||||
app = create_app()
|
||||
async with app.router.lifespan_context(app):
|
||||
transport = ASGITransport(app=app)
|
||||
async with AsyncClient(transport=transport, base_url="http://test") as client:
|
||||
yield client
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def seeded_client(client):
|
||||
source_resp = await client.post(
|
||||
|
||||
@ -4,15 +4,6 @@ from httpx import ASGITransport, AsyncClient
|
||||
from proxy_pool.app import create_app
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def client():
|
||||
app = create_app()
|
||||
async with app.router.lifespan_context(app):
|
||||
transport = ASGITransport(app=app)
|
||||
async with AsyncClient(transport=transport, base_url="http://test") as client:
|
||||
yield client
|
||||
|
||||
|
||||
class TestSourceRoutes:
|
||||
async def test_create_source(self, client):
|
||||
response = await client.post(
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user