46 lines
1.3 KiB
Python
46 lines
1.3 KiB
Python
import logging
|
|
import os
|
|
|
|
from gunicorn.glogging import Logger
|
|
|
|
TRUSTED_PROXIES = {
|
|
ip.strip()
|
|
for ip in os.environ.get("TRUSTED_PROXIES", "").split(",")
|
|
if ip.strip()
|
|
}
|
|
|
|
|
|
class HealthCheckFilter(logging.Filter):
|
|
def filter(self, record):
|
|
message = record.getMessage()
|
|
return "kube-probe" not in message
|
|
|
|
|
|
class CustomLogger(Logger):
|
|
def setup(self, cfg):
|
|
super().setup(cfg)
|
|
self.access_log.addFilter(HealthCheckFilter())
|
|
|
|
def atoms(self, resp, req, environ, request_time):
|
|
atoms = super().atoms(resp, req, environ, request_time)
|
|
atoms["{client-ip}e"] = self._get_client_ip(environ)
|
|
return atoms
|
|
|
|
@staticmethod
|
|
def _get_client_ip(environ):
|
|
remote_addr = environ.get("REMOTE_ADDR", "-")
|
|
xff = environ.get("HTTP_X_FORWARDED_FOR", "")
|
|
if not xff:
|
|
return remote_addr
|
|
# Walk the chain from right to left, skipping trusted proxies
|
|
ips = [ip.strip() for ip in xff.split(",")]
|
|
for ip in reversed(ips):
|
|
if ip not in TRUSTED_PROXIES:
|
|
return ip
|
|
# All IPs in the chain are trusted; fall back to the leftmost
|
|
return ips[0]
|
|
|
|
|
|
logger_class = CustomLogger
|
|
access_log_format = '%({client-ip}e)s %(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s"'
|