import logging import os from gunicorn.glogging import Logger TRUSTED_PROXIES = { ip.strip() for ip in os.environ.get("TRUSTED_PROXIES", "").split(",") if ip.strip() } class HealthCheckFilter(logging.Filter): def filter(self, record): message = record.getMessage() return "kube-probe" not in message class CustomLogger(Logger): def atoms(self, resp, req, environ, request_time): atoms = super().atoms(resp, req, environ, request_time) atoms["{client-ip}e"] = self._get_client_ip(environ) return atoms @staticmethod def _get_client_ip(environ): remote_addr = environ.get("REMOTE_ADDR", "-") xff = environ.get("HTTP_X_FORWARDED_FOR", "") if not xff: return remote_addr # Walk the chain from right to left, skipping trusted proxies ips = [ip.strip() for ip in xff.split(",")] for ip in reversed(ips): if ip not in TRUSTED_PROXIES: return ip # All IPs in the chain are trusted; fall back to the leftmost return ips[0] logger_class = CustomLogger access_log_format = '%({client-ip}e)s %(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s"' logconfig_dict = { "version": 1, "disable_existing_loggers": False, "filters": { "health_check": { "()": HealthCheckFilter, }, }, "handlers": { "console": { "class": "logging.StreamHandler", "stream": "ext://sys.stderr", }, "access_console": { "class": "logging.StreamHandler", "filters": ["health_check"], "stream": "ext://sys.stdout", }, }, "root": { "level": "INFO", "handlers": ["console"], }, "loggers": { "gunicorn.error": { "level": "INFO", "handlers": ["console"], "propagate": False, }, "gunicorn.access": { "level": "INFO", "handlers": ["access_console"], "propagate": False, }, }, }