Sistema de Apoio ao Processo Legislativo
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

271 lines
9.8 KiB

"""
RateLimitMiddleware — cross-pod rate limiting backed by shared Redis.
Decision flow (per request):
1. Known bot UA? → 429
2. IP in blocked set? → 429
3. Authenticated user?
a. User blocked? → 429
b. Suspicious hdrs? → 429
c. User rate ≥ 120? → SET RL_USER_BLOCKED, 429
4. Anonymous:
a. Suspicious hdrs? → 429
b. IP rate ≥ 35/min? → SET RL_IP_BLOCKED, 429
c. NS/IP window hit? → SET RL_IP_BLOCKED, 429
All decisions are no-ops when RATELIMIT_DRY_RUN=True (logged only).
Degrades gracefully to non-atomic counting when Redis is unavailable.
_NAMESPACE is settings.POD_NAMESPACE, resolved once at startup:
- K8s: start.sh reads the k8s namespace from the Downward API env var
or the service-account namespace file, writes it to .env as POD_NAMESPACE.
- Bare-metal / VM / docker-compose: defaults to the machine hostname
(socket.gethostbyname_ex result computed in settings.py).
Since a deployment serves exactly one tenant, this is a startup constant —
no per-request lookup is needed or correct.
"""
import hashlib
import logging
import time
from django.conf import settings
from django.core.cache import caches
from django.http import HttpResponse
logger = logging.getLogger('sapl.ratelimit')
# ---------------------------------------------------------------------------
# Tenant namespace — resolved once at startup from settings.POD_NAMESPACE.
# On K8s: the k8s namespace (e.g. "patobranco-pr"), set by start.sh.
# On bare-metal / VM / docker-compose: the machine hostname (default).
# ---------------------------------------------------------------------------
_NAMESPACE = settings.POD_NAMESPACE
# ---------------------------------------------------------------------------
# Redis key templates — module-level constants, never inline strings
# ---------------------------------------------------------------------------
RL_IP_REQUESTS = 'rl:ip:{ip}:reqs'
RL_IP_BLOCKED = 'rl:ip:{ip}:blocked'
RL_USER_REQUESTS = 'rl:{ns}:user:{uid}:reqs'
RL_USER_BLOCKED = 'rl:{ns}:user:{uid}:blocked'
RL_NS_WINDOW = 'rl:{ns}:ip:{ip}:w:{bucket}'
# ---------------------------------------------------------------------------
# Bot UA fragments
# ---------------------------------------------------------------------------
BOT_UA_FRAGMENTS = [
'GPTBot',
'ClaudeBot',
'PerplexityBot',
'Bytespider',
'AhrefsBot',
'meta-externalagent',
'OAI-SearchBot',
'bingbot',
'SERankingBacklinksBot',
'Chrome/98.0.4758', # known scraper impersonating an old Chrome
]
_INCR_LUA = """
local n = redis.call('INCR', KEYS[1])
if n == 1 then redis.call('EXPIRE', KEYS[1], ARGV[1]) end
return n
"""
def _sha256(s):
return hashlib.sha256(s.encode()).hexdigest()
def get_client_ip(request):
"""
Return the real client IP, applying django-ratelimit's ip_mask so that
IPv6 /64 subnets are collapsed to a single key (prevents per-address
rotation attacks). Also checks HTTP_X_REAL_IP for nginx setups that
use that header instead of X-Forwarded-For.
Canonical source — imported from here by other SAPL modules.
"""
from ratelimit.core import ip_mask
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[0].strip()
else:
ip = (
request.META.get('HTTP_X_REAL_IP')
or request.META.get('REMOTE_ADDR')
or '0.0.0.0'
)
return ip_mask(ip)
def ratelimit_ip(group, request):
"""Key function for django-ratelimit decorators (group param is ignored)."""
return get_client_ip(request)
def smart_key(group, request):
"""
Auth-aware key for @ratelimit decorators.
Authenticated users are keyed by user pk so that office workers sharing
a NAT IP don't count against each other. Anonymous requests fall back to
the masked IP (IPv6 /64 collapsed via ip_mask).
"""
user = getattr(request, 'user', None)
if user is not None and user.is_authenticated:
return str(user.pk)
return ratelimit_ip(group, request)
def smart_rate(group, request):
"""
Auth-aware rate string for @ratelimit decorators.
Returns RATE_LIMITER_RATE_AUTHENTICATED for authenticated users,
RATE_LIMITER_RATE for anonymous users — mirrors the thresholds applied
by RateLimitMiddleware so view-level and middleware-level limits agree.
"""
user = getattr(request, 'user', None)
if user is not None and user.is_authenticated:
return settings.RATE_LIMITER_RATE_AUTHENTICATED
return settings.RATE_LIMITER_RATE
def _is_suspicious_headers(request):
"""Real browsers send Accept-Language + Accept; bots frequently omit them."""
missing = sum([
not request.META.get('HTTP_ACCEPT_LANGUAGE'),
not request.META.get('HTTP_ACCEPT'),
])
return missing >= 2
def _parse_rate(rate_str):
"""Parse '35/m' or '120/m' into (count, seconds)."""
count, period = rate_str.split('/')
count = int(count)
seconds = {'s': 1, 'm': 60, 'h': 3600}.get(period.lower(), 60)
return count, seconds
class RateLimitMiddleware:
BLOCK_TTL = 300 # seconds an IP/user stays blocked after threshold breach
def __init__(self, get_response):
self.get_response = get_response
self.dry_run = settings.RATELIMIT_DRY_RUN
self.anon_threshold, self.anon_window = _parse_rate(settings.RATE_LIMITER_RATE)
self.auth_threshold, self.auth_window = _parse_rate(settings.RATE_LIMITER_RATE_AUTHENTICATED)
self.whitelist = set(settings.RATE_LIMIT_WHITELIST_IPS)
self._rl_cache = caches['ratelimit']
def __call__(self, request):
decision = self._evaluate(request)
if decision['action'] == 'block':
logger.warning(
'ratelimit_block reason=%s ip=%s path=%s dry_run=%s namespace=%s',
decision['reason'],
decision['ip'],
request.path,
self.dry_run,
_NAMESPACE,
extra={'ua': request.META.get('HTTP_USER_AGENT', '')},
)
if not self.dry_run:
return HttpResponse(status=429)
return self.get_response(request)
# ------------------------------------------------------------------
# Evaluation
# ------------------------------------------------------------------
def _evaluate(self, request):
ip = get_client_ip(request)
if ip in self.whitelist:
return {'action': 'pass', 'ip': ip}
# Check 1: known bad UA
ua = request.META.get('HTTP_USER_AGENT', '')
for fragment in BOT_UA_FRAGMENTS:
if fragment.lower() in ua.lower():
return {'action': 'block', 'reason': 'known_ua', 'ip': ip}
# Check 2: IP already blocked
if self._rl_cache.get(RL_IP_BLOCKED.format(ip=ip)):
return {'action': 'block', 'reason': 'ip_blocked', 'ip': ip}
user = getattr(request, 'user', None)
if user is not None and user.is_authenticated:
return self._evaluate_authenticated(request, ip)
return self._evaluate_anonymous(request, ip)
def _evaluate_authenticated(self, request, ip):
uid = str(request.user.pk)
# Check 3a: user already blocked
if self._rl_cache.get(RL_USER_BLOCKED.format(ns=_NAMESPACE, uid=uid)):
return {'action': 'block', 'reason': 'user_blocked', 'ip': ip}
# Check 3b: suspicious headers
if _is_suspicious_headers(request):
return {'action': 'block', 'reason': 'suspicious_headers_auth', 'ip': ip}
# Check 3c: authenticated request rate
count = self._incr_with_ttl(
RL_USER_REQUESTS.format(ns=_NAMESPACE, uid=uid), ttl=self.auth_window
)
if count >= self.auth_threshold:
self._rl_cache.set(
RL_USER_BLOCKED.format(ns=_NAMESPACE, uid=uid), 1, timeout=self.BLOCK_TTL
)
return {'action': 'block', 'reason': 'auth_user_rate', 'ip': ip}
return {'action': 'pass', 'ip': ip}
def _evaluate_anonymous(self, request, ip):
# Check 4a: suspicious headers
if _is_suspicious_headers(request):
return {'action': 'block', 'reason': 'suspicious_headers', 'ip': ip}
# Check 4b: IP request rate
count = self._incr_with_ttl(RL_IP_REQUESTS.format(ip=ip), ttl=self.anon_window)
if count >= self.anon_threshold:
self._rl_cache.set(RL_IP_BLOCKED.format(ip=ip), 1, timeout=self.BLOCK_TTL)
return {'action': 'block', 'reason': 'ip_rate', 'ip': ip}
# Check 4c: per-namespace/IP/window (catches UA rotators behind NAT)
bucket = int(time.time() // self.anon_window)
count = self._incr_with_ttl(
RL_NS_WINDOW.format(ns=_NAMESPACE, ip=ip, bucket=bucket),
ttl=self.anon_window * 2,
)
if count >= self.anon_threshold:
self._rl_cache.set(RL_IP_BLOCKED.format(ip=ip), 1, timeout=self.BLOCK_TTL)
return {'action': 'block', 'reason': 'ua_rotation', 'ip': ip}
return {'action': 'pass', 'ip': ip}
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
def _incr_with_ttl(self, key, ttl):
"""
Atomic INCR + EXPIRE via Redis Lua script.
Falls back to non-atomic cache get/set when Redis is unavailable
(dry-run mode or file-based cache — correct enough for logging).
"""
try:
from django_redis import get_redis_connection
client = get_redis_connection('ratelimit')
return client.eval(_INCR_LUA, 1, key, ttl)
except Exception:
count = (self._rl_cache.get(key) or 0) + 1
self._rl_cache.set(key, count, timeout=ttl)
return count