@ -7,6 +7,8 @@ middleware instance or the fallback non-atomic path is exercised via the
mock cache .
mock cache .
"""
"""
import time
import pytest
import pytest
from unittest . mock import MagicMock , patch
from unittest . mock import MagicMock , patch
from django . test import RequestFactory
from django . test import RequestFactory
@ -27,6 +29,7 @@ from sapl.middleware.ratelimit import (
RL_API_IP_REQUESTS ,
RL_API_IP_REQUESTS ,
RL_INDEX_BLOCKED_IPS ,
RL_INDEX_BLOCKED_IPS ,
RL_IP_BLOCKED ,
RL_IP_BLOCKED ,
RL_IP_PREFIX_BLOCKLIST ,
RL_USER_BLOCKED ,
RL_USER_BLOCKED ,
smart_key ,
smart_key ,
smart_rate ,
smart_rate ,
@ -100,6 +103,7 @@ def _make_middleware(
mock_settings . API_QUOTA_DAILY = 999999
mock_settings . API_QUOTA_DAILY = 999999
mock_settings . API_QUOTA_WEEKLY = 999999
mock_settings . API_QUOTA_WEEKLY = 999999
mock_settings . RATE_LIMITER_UA_BLOCKLIST_REFRESH = 60
mock_settings . RATE_LIMITER_UA_BLOCKLIST_REFRESH = 60
mock_settings . RATE_LIMITER_IP_PREFIX_BLOCKLIST_REFRESH = 60
mock_settings . API_RATE_LIMIT_ENABLED = api_rate_limit_enabled
mock_settings . API_RATE_LIMIT_ENABLED = api_rate_limit_enabled
mock_settings . API_RATE_LIMIT_THRESHOLD = api_threshold
mock_settings . API_RATE_LIMIT_THRESHOLD = api_threshold
mock_settings . API_RATE_LIMIT_WINDOW_SECONDS = api_window
mock_settings . API_RATE_LIMIT_WINDOW_SECONDS = api_window
@ -273,6 +277,101 @@ def test_index_shard_distributes_across_shards():
assert shards_seen == { ' 0 ' , ' 1 ' , ' 2 ' }
assert shards_seen == { ' 0 ' , ' 1 ' , ' 2 ' }
# ---------------------------------------------------------------------------
# Check 0 — IP-prefix blocklist (operator-curated SET, dot-anchored matching)
# ---------------------------------------------------------------------------
@pytest . fixture
def _seed_prefix_blocklist ( ) :
"""
Seed RateLimitMiddleware . _ip_prefix_blocklist for the test and restore the
previous class - level state afterwards ( it ' s shared across instances, like
the UA blocklist ) .
"""
saved_list = RateLimitMiddleware . _ip_prefix_blocklist
saved_fetched_at = RateLimitMiddleware . _ip_prefix_blocklist_fetched_at
def _seed ( prefixes ) :
RateLimitMiddleware . _ip_prefix_blocklist = set ( prefixes )
RateLimitMiddleware . _ip_prefix_blocklist_fetched_at = time . time ( )
yield _seed
RateLimitMiddleware . _ip_prefix_blocklist = saved_list
RateLimitMiddleware . _ip_prefix_blocklist_fetched_at = saved_fetched_at
def test_is_ip_prefix_blocked_matches_dot_boundary ( _seed_prefix_blocklist ) :
mw , _ = _make_middleware ( )
_seed_prefix_blocklist ( [ ' 103.124.225 ' ] )
assert mw . _is_ip_prefix_blocked ( ' 103.124.225.7 ' ) is True
# must not match on raw substring — only on a full-octet boundary
assert mw . _is_ip_prefix_blocked ( ' 103.124.2250.1 ' ) is False
assert mw . _is_ip_prefix_blocked ( ' 103.124.2255 ' ) is False
def test_is_ip_prefix_blocked_exact_match ( _seed_prefix_blocklist ) :
mw , _ = _make_middleware ( )
_seed_prefix_blocklist ( [ ' 103.124.225 ' ] )
assert mw . _is_ip_prefix_blocked ( ' 103.124.225 ' ) is True
def test_is_ip_prefix_blocked_full_address_matches_only_exactly ( _seed_prefix_blocklist ) :
mw , _ = _make_middleware ( )
_seed_prefix_blocklist ( [ ' 103.124.225.7 ' ] )
assert mw . _is_ip_prefix_blocked ( ' 103.124.225.7 ' ) is True
# a full dotted-quad entry must not be treated as a prefix of a longer string
assert mw . _is_ip_prefix_blocked ( ' 103.124.225.70 ' ) is False
def test_is_ip_prefix_blocked_trailing_dot_in_stored_prefix ( _seed_prefix_blocklist ) :
mw , _ = _make_middleware ( )
_seed_prefix_blocklist ( [ ' 103.124.225. ' ] )
# anchoring must not double the dot ('103.124.225..') and break the match
assert mw . _is_ip_prefix_blocked ( ' 103.124.225.7 ' ) is True
def test_is_ip_prefix_blocked_empty_list_passes ( _seed_prefix_blocklist ) :
mw , _ = _make_middleware ( )
_seed_prefix_blocklist ( [ ] )
assert mw . _is_ip_prefix_blocked ( ' 1.2.3.4 ' ) is False
def test_is_ip_prefix_blocked_no_match_passes ( _seed_prefix_blocklist ) :
mw , _ = _make_middleware ( )
_seed_prefix_blocklist ( [ ' 103.124.225 ' , ' 45.177.154 ' ] )
assert mw . _is_ip_prefix_blocked ( ' 1.2.3.4 ' ) is False
def test_evaluate_blocks_on_ip_prefix_before_other_checks ( _seed_prefix_blocklist ) :
mw , _ = _make_middleware ( )
_seed_prefix_blocklist ( [ ' 1.2.3 ' ] )
result = mw . _evaluate ( _anon_req ( ip = ' 1.2.3.4 ' , ua = ' GPTBot ' ) )
# would otherwise match the known_ua check — prefix block must win
assert result == { ' action ' : ' block ' , ' reason ' : ' ip_prefix_blocked ' , ' ip ' : ' 1.2.3.4 ' }
def test_evaluate_passes_through_when_ip_not_in_prefix_list ( _seed_prefix_blocklist ) :
mw , _ = _make_middleware ( )
_seed_prefix_blocklist ( [ ' 9.9.9 ' ] )
mw . _incr_with_ttl = MagicMock ( return_value = 1 )
result = mw . _evaluate ( _anon_req ( ip = ' 1.2.3.4 ' ) )
assert result [ ' action ' ] == ' pass '
def test_refresh_ip_prefix_blocklist_populates_set ( _seed_prefix_blocklist ) :
mw , _ = _make_middleware ( )
_seed_prefix_blocklist ( [ ] ) # start empty so the refresh result is observable
mock_client = MagicMock ( )
mock_client . smembers . return_value = { b ' 103.124.225 ' , b ' 45.177.154 ' }
with patch ( ' django_redis.get_redis_connection ' , return_value = mock_client ) :
mw . _refresh_ip_prefix_blocklist ( )
mock_client . smembers . assert_called_once_with ( RL_IP_PREFIX_BLOCKLIST )
assert RateLimitMiddleware . _ip_prefix_blocklist == { ' 103.124.225 ' , ' 45.177.154 ' }
# ---------------------------------------------------------------------------
# ---------------------------------------------------------------------------
# Check 1 — known bot User-Agent
# Check 1 — known bot User-Agent
# ---------------------------------------------------------------------------
# ---------------------------------------------------------------------------
@ -548,6 +647,38 @@ def test_api_malicious_origin_is_not_same_origin():
mw . _incr_with_ttl . assert_called_once ( )
mw . _incr_with_ttl . assert_called_once ( )
# ---------------------------------------------------------------------------
# _handle_api — Check 3a: IP-prefix block (operator-curated deny list)
# ---------------------------------------------------------------------------
def test_api_blocks_on_ip_prefix_before_quota_checks ( _seed_prefix_blocklist ) :
mw , mock_cache = _make_middleware ( )
_seed_prefix_blocklist ( [ ' 1.2.3 ' ] )
mw . _check_api_quota = MagicMock ( return_value = None )
mw . _incr_with_ttl = MagicMock ( )
request = _api_req ( ip = ' 1.2.3.4 ' )
response = mw ( request )
mw . get_response . assert_not_called ( )
mw . _check_api_quota . assert_not_called ( )
mw . _incr_with_ttl . assert_not_called ( )
assert response . status_code == 429
assert response [ ' X-RateLimit-Reason ' ] == ' ip_prefix_blocked '
def test_api_passes_through_when_ip_not_in_prefix_list ( _seed_prefix_blocklist ) :
mw , _ = _make_middleware ( api_threshold = 999 )
_seed_prefix_blocklist ( [ ' 9.9.9 ' ] )
mw . _check_api_quota = MagicMock ( return_value = None )
mw . _incr_with_ttl = MagicMock ( return_value = 1 )
request = _api_req ( ip = ' 1.2.3.4 ' )
mw ( request )
mw . get_response . assert_called_once_with ( request )
# ---------------------------------------------------------------------------
# ---------------------------------------------------------------------------
# _handle_api — rate limiting and block key isolation
# _handle_api — rate limiting and block key isolation
# ---------------------------------------------------------------------------
# ---------------------------------------------------------------------------