proxy-pool/tests/integration/test_source_routes.py

88 lines
2.5 KiB
Python

import pytest
from httpx import ASGITransport, AsyncClient
from proxy_pool.app import create_app
@pytest.fixture
async def client():
app = create_app()
async with app.router.lifespan_context(app):
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
yield client
class TestSourceRoutes:
async def test_create_source(self, client):
response = await client.post(
"/sources",
json={
"url": "https://example.com/proxies.txt",
"parser_name": "plaintext",
},
)
assert response.status_code == 201
data = response.json()
assert data["url"] == "https://example.com/proxies.txt"
assert data["parser_name"] == "plaintext"
assert data["is_active"] is True
assert data["id"] is not None
async def test_create_source_invalid_parser(self, client):
response = await client.post(
"/sources",
json={
"url": "https://example.com/proxies.txt",
"parser_name": "nonexistent",
},
)
assert response.status_code == 422
async def test_list_sources(self, client):
await client.post(
"/sources",
json={
"url": "https://example.com/list1.txt",
"parser_name": "plaintext",
},
)
await client.post(
"/sources",
json={
"url": "https://example.com/list2.txt",
"parser_name": "plaintext",
},
)
response = await client.get("/sources")
assert response.status_code == 200
data = response.json()
assert len(data) >= 2
async def test_get_source_not_found(self, client):
response = await client.get(
"/sources/00000000-0000-0000-0000-000000000000"
)
assert response.status_code == 404
async def test_delete_source(self, client):
create_response = await client.post(
"/sources",
json={
"url": "https://example.com/delete-me.txt",
"parser_name": "plaintext",
},
)
source_id = create_response.json()["id"]
delete_response = await client.delete(f"/sources/{source_id}")
assert delete_response.status_code == 204
get_response = await client.get(f"/sources/{source_id}")
assert get_response.status_code == 404