test: add unit tests for checker pipeline execution

This commit is contained in:
agatha 2026-03-15 15:27:21 -04:00
parent 1f75867e7a
commit b33d7130f8

80
tests/unit/test_tasks.py Normal file
View File

@ -0,0 +1,80 @@
import uuid
from datetime import datetime
from unittest.mock import AsyncMock, MagicMock
import pytest
from proxy_pool.plugins.protocols import CheckContext, CheckResult
from proxy_pool.worker.tasks_validate import _run_single_check
class TestRunSingleCheck:
async def test_successful_check(self):
checker = MagicMock()
checker.name = "test"
checker.timeout = 5.0
checker.check = AsyncMock(
return_value=CheckResult(passed=True, detail="ok")
)
proxy = MagicMock()
proxy.ip = "1.2.3.4"
proxy.port = 8080
proxy.protocol = MagicMock(value="http")
context = CheckContext(
started_at=datetime.now(),
http_client=None,
)
result = await _run_single_check(checker, proxy, context)
assert result.passed is True
assert result.detail == "ok"
async def test_timeout_returns_failed(self):
async def slow_check(*args, **kwargs):
import asyncio
await asyncio.sleep(10)
checker = MagicMock()
checker.name = "slow"
checker.timeout = 0.1
checker.check = slow_check
proxy = MagicMock()
proxy.ip = "1.2.3.4"
proxy.port = 8080
proxy.protocol = MagicMock(value="http")
context = CheckContext(
started_at=datetime.now(),
http_client=None,
)
result = await _run_single_check(checker, proxy, context)
assert result.passed is False
assert "timed out" in result.detail
async def test_exception_returns_failed(self):
checker = MagicMock()
checker.name = "broken"
checker.timeout = 5.0
checker.check = AsyncMock(side_effect=ConnectionError("boom"))
proxy = MagicMock()
proxy.ip = "1.2.3.4"
proxy.port = 8080
proxy.protocol = MagicMock(value="http")
context = CheckContext(
started_at=datetime.now(),
http_client=None,
)
result = await _run_single_check(checker, proxy, context)
assert result.passed is False
assert "ConnectionError" in result.detail
assert "boom" in result.detail