#!/usr/bin/env python3 """ ESPHome Gitea Sync Service This service synchronizes ESPHome device configurations between Gitea and ESPHome. The actual compilation and upload operations are handled by the ESPHome container. Provides: 1. File watching for detecting YAML changes 2. RESTful API for device management and sync operations 3. Webhook endpoints for Gitea integration """ import os import time import logging import subprocess from pathlib import Path from threading import Thread, Lock from datetime import datetime from flask import Flask, request, jsonify from watchdog.observers import Observer from watchdog.observers.polling import PollingObserver from watchdog.events import FileSystemEventHandler # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) app = Flask(__name__) # Configuration ESPHOME_CONFIG_DIR = os.environ.get('ESPHOME_CONFIG_DIR', '/config') DEBOUNCE_SECONDS = int(os.environ.get('DEBOUNCE_SECONDS', '5')) POLLING_INTERVAL = float(os.environ.get('POLLING_INTERVAL', '1.0')) # Seconds between polls USE_POLLING = os.environ.get('USE_POLLING', 'true').lower() == 'true' # Use polling for Docker compatibility # Gitea configuration GITEA_URL = os.environ.get('GITEA_URL', '') GITEA_REPO = os.environ.get('GITEA_REPO', '') GITEA_TOKEN = os.environ.get('GITEA_TOKEN', '') GITEA_BRANCH = os.environ.get('GITEA_BRANCH', 'main') AUTO_PUSH = os.environ.get('AUTO_PUSH', 'false').lower() == 'true' # Git user configuration GIT_USER_NAME = os.environ.get('GIT_USER_NAME', 'ESPHome Sync Service') GIT_USER_EMAIL = os.environ.get('GIT_USER_EMAIL', 'esphome-sync@localhost') # Track ongoing operations to prevent duplicate triggers operation_lock = Lock() pending_operations = {} class ESPHomeFileHandler(FileSystemEventHandler): """Handles file system events for ESPHome YAML files""" def __init__(self): self.last_modified = {} def _handle_file_change(self, event): """Common handler for file modifications and creations""" if event.is_directory: return # Only watch YAML files if not event.src_path.endswith(('.yaml', '.yml')): return # Debounce - ignore rapid repeated events now = time.time() if event.src_path in self.last_modified: if now - self.last_modified[event.src_path] < DEBOUNCE_SECONDS: logger.debug(f"Debouncing {event.src_path} (too soon)") return self.last_modified[event.src_path] = now logger.info(f"Detected change in {event.src_path}") # Trigger git push if AUTO_PUSH is enabled device_path = Path(event.src_path) # Check if this is a device config file directly in the config directory if device_path.parent == Path(ESPHOME_CONFIG_DIR): device_name = device_path.stem # Get filename without extension logger.info(f"Change detected in device: {device_name}") if AUTO_PUSH: logger.info(f"AUTO_PUSH enabled, pushing changes to Gitea") Thread(target=git_push, args=(f"Auto-sync: {device_name}.yaml changed",)).start() else: logger.info(f"AUTO_PUSH disabled, skipping push to Gitea") else: logger.debug(f"Ignoring file outside config directory: {event.src_path}") def on_modified(self, event): """Handle file modification events""" self._handle_file_change(event) def on_created(self, event): """Handle file creation events""" self._handle_file_change(event) def on_deleted(self, event): """Handle file deletion events""" if event.is_directory: return # Only watch YAML files if not event.src_path.endswith(('.yaml', '.yml')): return logger.info(f"Detected deletion of {event.src_path}") # Trigger git push if AUTO_PUSH is enabled device_path = Path(event.src_path) # Check if this is a device config file directly in the config directory if device_path.parent == Path(ESPHOME_CONFIG_DIR): device_name = device_path.stem # Get filename without extension logger.info(f"Deletion detected for device: {device_name}") if AUTO_PUSH: logger.info(f"AUTO_PUSH enabled, pushing deletion to Gitea") Thread(target=git_push, args=(f"Auto-sync: {device_name}.yaml deleted",)).start() else: logger.info(f"AUTO_PUSH disabled, skipping push to Gitea") else: logger.debug(f"Ignoring file outside config directory: {event.src_path}") def on_moved(self, event): """Handle file move/rename events""" if event.is_directory: return # Only watch YAML files if not event.src_path.endswith(('.yaml', '.yml')) and not event.dest_path.endswith(('.yaml', '.yml')): return src_path = Path(event.src_path) dest_path = Path(event.dest_path) config_dir = Path(ESPHOME_CONFIG_DIR) src_in_config = src_path.parent == config_dir dest_in_config = dest_path.parent == config_dir logger.info(f"Detected move from {event.src_path} to {event.dest_path}") if src_in_config and dest_in_config: # Rename within config directory - treat as modification device_name = dest_path.stem logger.info(f"Rename detected: {src_path.name} -> {dest_path.name}") if AUTO_PUSH: logger.info(f"AUTO_PUSH enabled, pushing rename to Gitea") Thread(target=git_push, args=(f"Auto-sync: Renamed {src_path.name} to {dest_path.name}",)).start() else: logger.info(f"AUTO_PUSH disabled, skipping push to Gitea") elif src_in_config and not dest_in_config: # Moved OUT of config directory (e.g., to archive folder) - treat as deletion device_name = src_path.stem logger.info(f"Device moved to archive/subdirectory: {device_name}") if AUTO_PUSH: logger.info(f"AUTO_PUSH enabled, pushing archived file as deletion to Gitea") Thread(target=git_push, args=(f"Auto-sync: {src_path.name} archived",)).start() else: logger.info(f"AUTO_PUSH disabled, skipping push to Gitea") elif not src_in_config and dest_in_config: # Moved INTO config directory - treat as creation device_name = dest_path.stem logger.info(f"Device moved from subdirectory to config root: {device_name}") if AUTO_PUSH: logger.info(f"AUTO_PUSH enabled, pushing new file to Gitea") Thread(target=git_push, args=(f"Auto-sync: {dest_path.name} restored from archive",)).start() else: logger.info(f"AUTO_PUSH disabled, skipping push to Gitea") def find_device_config(device_name): """Find the YAML config for a given device""" # Look for .yaml directly in the config directory config_path = Path(ESPHOME_CONFIG_DIR) / device_name if config_path.suffix not in ['.yaml', '.yml']: config_path = config_path.with_suffix('.yaml') if not config_path.exists(): raise FileNotFoundError(f"Config not found for device: {device_name}") return str(config_path) # Git Operations def run_git_command(args, cwd=None): """Execute a git command and return the result""" if cwd is None: cwd = ESPHOME_CONFIG_DIR cmd = ['git'] + args logger.info(f"Running git command in {cwd}: {' '.join(cmd)}") try: result = subprocess.run( cmd, cwd=cwd, capture_output=True, text=True, timeout=60 ) if result.returncode != 0: logger.error(f"Git command failed: {result.stderr}") return {'success': False, 'error': result.stderr, 'stdout': result.stdout} return {'success': True, 'stdout': result.stdout, 'stderr': result.stderr} except subprocess.TimeoutExpired: return {'success': False, 'error': 'Git command timed out'} except Exception as e: return {'success': False, 'error': str(e)} def is_git_repo(): """Check if the config directory is a git repository""" git_dir = Path(ESPHOME_CONFIG_DIR) / '.git' return git_dir.exists() and git_dir.is_dir() def get_git_remote_url(): """Build the git remote URL with authentication token""" if not GITEA_URL or not GITEA_REPO: return None # Parse URL and inject token # Format: https://token@gitea.com/user/repo.git url = GITEA_URL.rstrip('/') if url.startswith('https://'): url = url.replace('https://', f'https://{GITEA_TOKEN}@', 1) elif url.startswith('http://'): url = url.replace('http://', f'http://{GITEA_TOKEN}@', 1) return f"{url}/{GITEA_REPO}.git" def git_clone(): """Clone the Gitea repository to the config directory""" if is_git_repo(): logger.info("Config directory is already a git repository") return {'success': True, 'message': 'Already a git repository'} remote_url = get_git_remote_url() if not remote_url: return {'success': False, 'error': 'Gitea URL, repo, or token not configured'} logger.info(f"Cloning repository from {GITEA_URL}/{GITEA_REPO}") # Clone into a temporary directory, then move contents parent_dir = Path(ESPHOME_CONFIG_DIR).parent temp_dir = parent_dir / 'temp_clone' try: # Clone to temp directory result = run_git_command(['clone', '-b', GITEA_BRANCH, remote_url, str(temp_dir)], cwd=parent_dir) if not result['success']: return result # Move contents to config directory import shutil for item in temp_dir.iterdir(): dest = Path(ESPHOME_CONFIG_DIR) / item.name if dest.exists(): if dest.is_dir(): shutil.rmtree(dest) else: dest.unlink() shutil.move(str(item), str(dest)) # Remove temp directory temp_dir.rmdir() # Configure git user run_git_command(['config', 'user.name', GIT_USER_NAME]) run_git_command(['config', 'user.email', GIT_USER_EMAIL]) logger.info("Repository cloned successfully") return {'success': True, 'message': 'Repository cloned'} except Exception as e: logger.exception(f"Error during git clone: {e}") return {'success': False, 'error': str(e)} def git_pull(): """Pull latest changes from Gitea""" # Ensure safe.directory is set before git operations run_git_command(['config', '--global', '--add', 'safe.directory', ESPHOME_CONFIG_DIR], cwd='/tmp') # Ensure git user is configured (needed for merge commits) run_git_command(['config', 'user.name', GIT_USER_NAME]) run_git_command(['config', 'user.email', GIT_USER_EMAIL]) if not is_git_repo(): logger.warning("Not a git repository, attempting to clone") return git_clone() logger.info("Pulling latest changes from Gitea") # Fetch and pull result = run_git_command(['fetch', 'origin', GITEA_BRANCH]) if not result['success']: return result result = run_git_command(['pull', 'origin', GITEA_BRANCH]) if result['success']: logger.info("Git pull completed successfully") return result def git_push(commit_message="Auto-sync from ESPHome"): """Push local changes to Gitea""" # Ensure safe.directory is set before git operations run_git_command(['config', '--global', '--add', 'safe.directory', ESPHOME_CONFIG_DIR], cwd='/tmp') # Ensure git user is configured run_git_command(['config', 'user.name', GIT_USER_NAME]) run_git_command(['config', 'user.email', GIT_USER_EMAIL]) if not is_git_repo(): return {'success': False, 'error': 'Not a git repository'} logger.info("Pushing changes to Gitea") # Debug: Check status before adding status_before = run_git_command(['status', '--porcelain']) logger.info(f"Git status BEFORE add: {status_before.get('stdout', '').strip()}") # Add all changes result = run_git_command(['add', '-A']) if not result['success']: logger.error(f"Git add failed: {result.get('error')}") return result logger.info(f"Git add output: {result.get('stdout', '').strip()}") # Check if there are changes to commit status_result = run_git_command(['status', '--porcelain']) logger.info(f"Git status AFTER add: {status_result.get('stdout', '').strip()}") if status_result['success'] and not status_result['stdout'].strip(): logger.info("No changes to commit") return {'success': True, 'message': 'No changes to commit'} # Commit result = run_git_command(['commit', '-m', commit_message]) if not result['success']: return result # Push result = run_git_command(['push', 'origin', GITEA_BRANCH]) if result['success']: logger.info("Changes pushed to Gitea successfully") return result def initialize_git_repo(): """Initialize or verify git repository on startup""" logger.info("Initializing git repository...") # Mark the config directory as safe to prevent "dubious ownership" errors # Run from /tmp to avoid git checking the dubious repository run_git_command(['config', '--global', '--add', 'safe.directory', ESPHOME_CONFIG_DIR], cwd='/tmp') if not GITEA_URL or not GITEA_REPO or not GITEA_TOKEN: logger.warning("Gitea configuration incomplete, skipping git initialization") return if not is_git_repo(): logger.info("Config directory is not a git repository, cloning...") result = git_clone() if not result['success']: logger.error(f"Failed to clone repository: {result.get('error')}") else: logger.info("Config directory is already a git repository") # Ensure git user is configured run_git_command(['config', 'user.name', GIT_USER_NAME]) run_git_command(['config', 'user.email', GIT_USER_EMAIL]) # Pull latest changes logger.info("Pulling latest changes on startup...") result = git_pull() if not result['success']: logger.error(f"Failed to pull changes: {result.get('error')}") # REST API Endpoints @app.route('/health', methods=['GET']) def health_check(): """Health check endpoint""" return jsonify({ 'status': 'healthy', 'timestamp': datetime.utcnow().isoformat(), 'config_dir': ESPHOME_CONFIG_DIR, 'service': 'gitea-sync' }) @app.route('/devices', methods=['GET']) def list_devices(): """List all available devices""" config_dir = Path(ESPHOME_CONFIG_DIR) devices = [] # Find all .yaml/.yml files directly in the config directory for item in config_dir.iterdir(): if item.is_file() and item.suffix in ['.yaml', '.yml']: devices.append({ 'name': item.stem, # Device name is the filename without extension 'config_path': str(item), 'last_modified': datetime.fromtimestamp(item.stat().st_mtime).isoformat() }) return jsonify({'devices': devices}) @app.route('/webhook/gitea', methods=['POST']) def gitea_webhook(): """Webhook endpoint for Gitea push events""" logger.info("Received Gitea webhook") try: # Parse Gitea webhook payload payload = request.json if not payload: return jsonify({'error': 'No payload received'}), 400 # Log the event event_type = request.headers.get('X-Gitea-Event', 'unknown') logger.info(f"Gitea event type: {event_type}") # For push events, pull the changes if event_type == 'push': result = git_pull() if result['success']: return jsonify({'status': 'success', 'message': 'Changes pulled successfully'}), 200 else: return jsonify({'status': 'error', 'error': result.get('error')}), 500 return jsonify({'status': 'ignored', 'message': f'Event type {event_type} not handled'}), 200 except Exception as e: logger.exception(f"Error processing Gitea webhook: {e}") return jsonify({'error': str(e)}), 500 @app.route('/sync/pull', methods=['POST']) def manual_pull(): """Manually trigger a git pull""" logger.info("Manual git pull triggered") try: result = git_pull() if result['success']: return jsonify({'status': 'success', 'message': 'Changes pulled successfully'}), 200 else: return jsonify({'status': 'error', 'error': result.get('error')}), 500 except Exception as e: logger.exception(f"Error during manual pull: {e}") return jsonify({'error': str(e)}), 500 @app.route('/sync/push', methods=['POST']) def manual_push(): """Manually trigger a git push""" logger.info("Manual git push triggered") try: commit_message = request.json.get('message', 'Manual sync from ESPHome') if request.json else 'Manual sync from ESPHome' result = git_push(commit_message) if result['success']: return jsonify({'status': 'success', 'message': result.get('message', 'Changes pushed successfully')}), 200 else: return jsonify({'status': 'error', 'error': result.get('error')}), 500 except Exception as e: logger.exception(f"Error during manual push: {e}") return jsonify({'error': str(e)}), 500 def start_file_watcher(): """Start the file system watcher""" logger.info(f"Initializing file watcher for directory: {ESPHOME_CONFIG_DIR}") logger.info(f"Watching for .yaml and .yml files with {DEBOUNCE_SECONDS}s debounce") logger.info(f"AUTO_PUSH is {'enabled' if AUTO_PUSH else 'disabled'}") # Select observer type based on configuration if USE_POLLING: logger.info(f"Using PollingObserver (interval: {POLLING_INTERVAL}s) for Docker bind mount compatibility") else: logger.info("Using native filesystem observer (inotify)") # Verify the directory exists config_path = Path(ESPHOME_CONFIG_DIR) if not config_path.exists(): logger.error(f"Config directory does not exist: {ESPHOME_CONFIG_DIR}") return if not config_path.is_dir(): logger.error(f"Config path is not a directory: {ESPHOME_CONFIG_DIR}") return event_handler = ESPHomeFileHandler() # Use PollingObserver for Docker compatibility, or native Observer for local development if USE_POLLING: observer = PollingObserver(timeout=POLLING_INTERVAL) else: observer = Observer() observer.schedule(event_handler, ESPHOME_CONFIG_DIR, recursive=True) observer.start() logger.info(f"File watcher started successfully on {ESPHOME_CONFIG_DIR}") # Log existing YAML files being watched yaml_files = list(config_path.glob('*.yaml')) + list(config_path.glob('*.yml')) logger.info(f"Currently watching {len(yaml_files)} YAML files: {[f.name for f in yaml_files]}") try: while True: time.sleep(1) except KeyboardInterrupt: logger.info("File watcher interrupted, stopping...") observer.stop() observer.join() logger.info("File watcher stopped") if __name__ == '__main__': logger.info("Starting ESPHome Gitea Sync Service") logger.info(f"Config directory: {ESPHOME_CONFIG_DIR}") logger.info(f"Debounce seconds: {DEBOUNCE_SECONDS}") logger.info(f"Auto-push: {AUTO_PUSH}") # Initialize git repository initialize_git_repo() # Start file watcher in a separate thread watcher_thread = Thread(target=start_file_watcher, daemon=True) watcher_thread.start() # Start Flask app app.run(host='0.0.0.0', port=5000, debug=False)