Files
reactbin/api/tests/unit/test_rate_limiter.py
agatha 9021f4816a Fix: Prefer X-Real-IP over XFF[0] in get_client_ip to close spoof bypass
XFF[0] is attacker-controllable; a crafted X-Forwarded-For header could
attribute login failures to a victim IP, triggering their lockout while
the attacker accumulates none. ingress-nginx sets X-Real-IP via its
realip module using an authoritative CIDR allowlist and overwrites any
client-supplied value, making it spoof-resistant. Fallback to XFF[0]
is retained for defence in depth but now emits a warning if reached.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-09 17:52:05 +00:00

106 lines
3.3 KiB
Python

import ipaddress
from unittest.mock import MagicMock
from starlette.requests import Request
from app.auth.rate_limiter import LoginRateLimiter, get_client_ip
# ---------------------------------------------------------------------------
# LoginRateLimiter tests
# ---------------------------------------------------------------------------
def make_limiter():
return LoginRateLimiter(max_failures=3, window_seconds=60, cooldown_seconds=300)
def test_not_blocked_initially():
assert make_limiter().is_blocked("1.2.3.4") is False
def test_blocked_after_threshold():
limiter = make_limiter()
for _ in range(3):
limiter.record_failure("1.2.3.4")
assert limiter.is_blocked("1.2.3.4") is True
def test_success_clears_failures():
limiter = make_limiter()
limiter.record_failure("1.2.3.4")
limiter.record_failure("1.2.3.4")
limiter.record_success("1.2.3.4")
assert limiter.is_blocked("1.2.3.4") is False
def test_ips_are_isolated():
limiter = make_limiter()
for _ in range(3):
limiter.record_failure("1.1.1.1")
assert limiter.is_blocked("2.2.2.2") is False
def test_window_resets_after_expiry():
import time
limiter = LoginRateLimiter(max_failures=3, window_seconds=0, cooldown_seconds=300)
limiter.record_failure("1.2.3.4")
limiter.record_failure("1.2.3.4")
time.sleep(0.01)
limiter.record_failure("1.2.3.4")
# window expired — counter reset on third call, so failures = 1, not 3
assert limiter.is_blocked("1.2.3.4") is False
def test_log_warning_on_lockout(caplog):
import logging
limiter = make_limiter()
with caplog.at_level(logging.WARNING, logger="app.auth.rate_limiter"):
for _ in range(3):
limiter.record_failure("5.6.7.8")
assert "Login blocked" in caplog.text
assert "5.6.7.8" in caplog.text
# ---------------------------------------------------------------------------
# get_client_ip tests
# ---------------------------------------------------------------------------
def make_request(peer: str, headers: dict) -> MagicMock:
req = MagicMock(spec=Request)
req.client.host = peer
req.headers = headers
return req
def test_get_client_ip_no_trusted_networks_returns_peer():
req = make_request("203.0.113.1", {"X-Forwarded-For": "10.0.0.1"})
assert get_client_ip(req, []) == "203.0.113.1"
def test_get_client_ip_trusted_peer_uses_real_ip():
req = make_request("10.0.0.1", {"X-Real-IP": "203.0.113.9"})
nets = [ipaddress.ip_network("10.0.0.0/8")]
assert get_client_ip(req, nets) == "203.0.113.9"
def test_get_client_ip_real_ip_wins_over_xff():
# Regression: spoofed XFF must not override nginx-set X-Real-IP.
req = make_request("10.0.0.1", {"X-Real-IP": "203.0.113.9", "X-Forwarded-For": "1.2.3.4"})
nets = [ipaddress.ip_network("10.0.0.0/8")]
assert get_client_ip(req, nets) == "203.0.113.9"
def test_get_client_ip_untrusted_peer_ignores_xff():
req = make_request("8.8.8.8", {"X-Forwarded-For": "203.0.113.5"})
nets = [ipaddress.ip_network("10.0.0.0/8")]
assert get_client_ip(req, nets) == "8.8.8.8"
def test_get_client_ip_trusted_peer_falls_back_to_xff_when_no_real_ip():
req = make_request("10.0.0.1", {"X-Forwarded-For": "203.0.113.5"})
nets = [ipaddress.ip_network("10.0.0.0/8")]
assert get_client_ip(req, nets) == "203.0.113.5"