From aa17f5717aff503ab134b62c2cbc7fbf4b33b72a Mon Sep 17 00:00:00 2001 From: hyaku0121 Date: Thu, 11 Dec 2025 23:46:21 +0900 Subject: [PATCH 1/8] feat: Implement comprehensive system health checks for #128 --- cortex/health/checks/disk.py | 33 ++++ cortex/health/checks/performance.py | 63 +++++++ cortex/health/checks/security.py | 57 ++++++ cortex/health/checks/updates.py | 56 ++++++ scripts/verify_ubuntu_compatibility.py | 245 +++++++++++++++++++++++++ 5 files changed, 454 insertions(+) create mode 100644 cortex/health/checks/disk.py create mode 100644 cortex/health/checks/performance.py create mode 100644 cortex/health/checks/security.py create mode 100644 cortex/health/checks/updates.py create mode 100644 scripts/verify_ubuntu_compatibility.py diff --git a/cortex/health/checks/disk.py b/cortex/health/checks/disk.py new file mode 100644 index 0000000..21dcc94 --- /dev/null +++ b/cortex/health/checks/disk.py @@ -0,0 +1,33 @@ +import shutil +from ..monitor import HealthCheck, CheckResult + +class DiskCheck(HealthCheck): + def run(self) -> CheckResult: + total, used, free = shutil.disk_usage("/") + # Calculate usage percentage + usage_percent = (used / total) * 100 + + score = 100 + status = "OK" + details = f"{usage_percent:.1f}% Used" + rec = None + + # Scoring logic (Spec compliant) + if usage_percent > 90: + score = 0 + status = "CRITICAL" + rec = "Clean package cache (+50 pts)" + elif usage_percent > 80: + score = 50 + status = "WARNING" + rec = "Clean package cache (+10 pts)" + + return CheckResult( + name="Disk Space", + category="disk", + score=score, + status=status, + details=details, + recommendation=rec, + weight=0.15 # 15% + ) \ No newline at end of file diff --git a/cortex/health/checks/performance.py b/cortex/health/checks/performance.py new file mode 100644 index 0000000..9e5e66f --- /dev/null +++ b/cortex/health/checks/performance.py @@ -0,0 +1,63 @@ +import os +import multiprocessing +from ..monitor import HealthCheck, CheckResult + +class PerformanceCheck(HealthCheck): + def run(self) -> CheckResult: + score = 100 + issues = [] + rec = None + + # 1. Load Average (1min) + try: + load1, _, _ = os.getloadavg() + cores = multiprocessing.cpu_count() + # Load ratio against core count + load_ratio = load1 / cores + + if load_ratio > 1.0: + score -= 50 + issues.append(f"High Load ({load1:.2f})") + rec = "Check top processes" + except Exception: + pass # Skip on Windows etc. + + # 2. Memory Usage (Linux /proc/meminfo) + try: + with open('/proc/meminfo', 'r') as f: + meminfo = {} + for line in f: + parts = line.split(':') + if len(parts) == 2: + meminfo[parts[0].strip()] = int(parts[1].strip().split()[0]) + + if 'MemTotal' in meminfo and 'MemAvailable' in meminfo: + total = meminfo['MemTotal'] + avail = meminfo['MemAvailable'] + used_percent = ((total - avail) / total) * 100 + + if used_percent > 80: + penalty = int(used_percent - 80) + score -= penalty + issues.append(f"High Memory ({used_percent:.0f}%)") + except FileNotFoundError: + pass # Non-Linux systems + + # Summary of results + status = "OK" + if score < 50: + status = "CRITICAL" + elif score < 90: + status = "WARNING" + + details = ", ".join(issues) if issues else "Optimal" + + return CheckResult( + name="System Load", + category="performance", + score=max(0, score), + status=status, + details=details, + recommendation=rec, + weight=0.20 # 20% + ) \ No newline at end of file diff --git a/cortex/health/checks/security.py b/cortex/health/checks/security.py new file mode 100644 index 0000000..7e21afb --- /dev/null +++ b/cortex/health/checks/security.py @@ -0,0 +1,57 @@ +import subprocess +import os +from ..monitor import HealthCheck, CheckResult + +class SecurityCheck(HealthCheck): + def run(self) -> CheckResult: + score = 100 + issues = [] + recommendations = [] + + # 1. Firewall (UFW) Check + ufw_active = False + try: + res = subprocess.run( + ["systemctl", "is-active", "ufw"], + capture_output=True, text=True + ) + # Fix: Use exact match to avoid matching "inactive" which contains "active" + if res.returncode == 0 and res.stdout.strip() == "active": + ufw_active = True + except FileNotFoundError: + pass # Environment without systemctl (e.g., Docker or non-systemd) + + if not ufw_active: + score = 0 # Spec: 0 points if Firewall is inactive + issues.append("Firewall Inactive") + recommendations.append("Enable UFW Firewall") + + # 2. SSH Root Login Check + try: + ssh_config = "/etc/ssh/sshd_config" + if os.path.exists(ssh_config): + with open(ssh_config, 'r') as f: + for line in f: + line = line.strip() + # Check for uncommented PermitRootLogin yes + if line.startswith("PermitRootLogin") and "yes" in line.split(): + score -= 50 + issues.append("Root SSH Allowed") + recommendations.append("Disable SSH Root Login in sshd_config") + break + except PermissionError: + pass # Cannot read config, skip check + + status = "OK" + if score < 50: status = "CRITICAL" + elif score < 100: status = "WARNING" + + return CheckResult( + name="Security Posture", + category="security", + score=max(0, score), + status=status, + details=", ".join(issues) if issues else "Secure", + recommendation=", ".join(recommendations) if recommendations else None, + weight=0.35 + ) \ No newline at end of file diff --git a/cortex/health/checks/updates.py b/cortex/health/checks/updates.py new file mode 100644 index 0000000..27c01cb --- /dev/null +++ b/cortex/health/checks/updates.py @@ -0,0 +1,56 @@ +import subprocess +from ..monitor import HealthCheck, CheckResult + +class UpdateCheck(HealthCheck): + def run(self) -> CheckResult: + score = 100 + pkg_count = 0 + sec_count = 0 + rec = None + + # Parse apt list --upgradable + try: + # Execute safely without pipeline + res = subprocess.run( + ["apt", "list", "--upgradable"], + capture_output=True, text=True + ) + + lines = res.stdout.splitlines() + # Skip first line "Listing..." + for line in lines[1:]: + if line.strip(): + pkg_count += 1 + if "security" in line.lower(): + sec_count += 1 + + # Scoring + score -= (pkg_count * 2) # -2 pts per normal package + score -= (sec_count * 10) # -10 pts per security package + + if pkg_count > 0: + rec = f"Install {pkg_count} updates (+{100-score} pts)" + + except FileNotFoundError: + # Skip on non-apt environments (100 pts) + return CheckResult("Updates", "updates", 100, "SKIP", "apt not found", weight=0.30) + except Exception: + pass # Ignore errors + + status = "OK" + if score < 60: status = "CRITICAL" + elif score < 100: status = "WARNING" + + details = f"{pkg_count} pending" + if sec_count > 0: + details += f" ({sec_count} security)" + + return CheckResult( + name="System Updates", + category="updates", + score=max(0, score), + status=status, + details=details, + recommendation=rec, + weight=0.30 # 30% + ) \ No newline at end of file diff --git a/scripts/verify_ubuntu_compatibility.py b/scripts/verify_ubuntu_compatibility.py new file mode 100644 index 0000000..dca1c0b --- /dev/null +++ b/scripts/verify_ubuntu_compatibility.py @@ -0,0 +1,245 @@ +import subprocess +import os +import sys +import json +import datetime +import shutil + +# File name to store history data +HISTORY_FILE = "security_history.json" + +def load_history(): + """Load past execution history""" + if os.path.exists(HISTORY_FILE): + try: + with open(HISTORY_FILE, 'r') as f: + return json.load(f) + except json.JSONDecodeError: + return [] + return [] + +def save_history(score, status, details): + """Save execution result to history""" + history = load_history() + record = { + "timestamp": datetime.datetime.now().isoformat(), + "score": score, + "status": status, + "details": details + } + history.append(record) + # Keep only the latest 10 records + history = history[-10:] + + with open(HISTORY_FILE, 'w') as f: + json.dump(history, f, indent=4) + + return history + +def show_trend(history): + """Show historical trend (Trend Tracking)""" + print("\n=== šŸ“Š Historical Trend Analysis ===") + if not history: + print(" No historical data available yet.") + return + + scores = [h["score"] for h in history] + avg_score = sum(scores) / len(scores) + last_score = scores[-1] + + print(f" History Count: {len(history)} runs") + print(f" Average Score: {avg_score:.1f}") + print(f" Last Run Score: {last_score}") + + if len(scores) > 1: + prev_score = scores[-2] + diff = last_score - prev_score + if diff > 0: + print(f" Trend: šŸ“ˆ Improved by {diff} points since previous run") + elif diff < 0: + print(f" Trend: šŸ“‰ Dropped by {abs(diff)} points since previous run") + else: + print(f" Trend: āž”ļø Stable") + +def fix_firewall(): + """Enable Firewall (Automated Fix)""" + print("\n [Fixing] Enabling UFW Firewall...") + + # Check if ufw is installed using 'which' or checking path + # (Since sudo is used, we check if we can find ufw path) + if not shutil.which("ufw") and not os.path.exists("/usr/sbin/ufw"): + print(" -> āš ļø UFW is not installed. Cannot enable.") + print(" (Try: sudo apt install ufw)") + return False + + try: + # Depends on execution environment, sudo might be required + subprocess.run(["sudo", "ufw", "enable"], check=True) + print(" -> āœ… Success: Firewall enabled.") + return True + except subprocess.CalledProcessError as e: + print(f" -> āŒ Failed to enable firewall: {e}") + return False + +def fix_ssh_config(config_path): + """Disable SSH Root Login (Automated Fix)""" + print(f"\n [Fixing] Disabling Root Login in {config_path}...") + + # Check if file exists before trying to fix + if not os.path.exists(config_path): + print(f" -> āš ļø Config file not found: {config_path}") + return False + + # 1. Create backup + backup_path = config_path + ".bak." + datetime.datetime.now().strftime("%Y%m%d%H%M%S") + try: + shutil.copy2(config_path, backup_path) + print(f" -> Backup created at: {backup_path}") + except PermissionError: + print(" -> āŒ Failed to create backup (Permission denied). Need sudo?") + return False + + # 2. Rewrite configuration + try: + new_lines = [] + with open(config_path, 'r') as f: + lines = f.readlines() + + fixed = False + for line in lines: + if line.strip().startswith("PermitRootLogin") and "yes" in line: + # Comment out and add disabled setting + new_lines.append(f"# {line.strip()} (Disabled by Auto-Fix)\n") + new_lines.append("PermitRootLogin no\n") + fixed = True + else: + new_lines.append(line) + + if fixed: + with open(config_path, 'w') as f: + f.writelines(new_lines) + print(" -> āœ… Success: sshd_config updated.") + + # Attempt to restart SSH service + print(" -> Restarting sshd service...") + subprocess.run(["sudo", "systemctl", "restart", "ssh"], check=False) + return True + else: + print(" -> No changes needed.") + return True + + except PermissionError: + print(" -> āŒ Failed to write config (Permission denied). Need sudo?") + return False + except Exception as e: + print(f" -> āŒ Error during fix: {e}") + return False + +def verify_security_logic(): + print("=== Ubuntu Security Logic Verification ===") + + # --------------------------------------------------------- + # 1. Firewall (UFW) Check Logic + # --------------------------------------------------------- + print("\n[1] Checking Firewall (UFW)...") + ufw_active = False + ufw_needs_fix = False + try: + print(" Running: systemctl is-active ufw") + res = subprocess.run( + ["systemctl", "is-active", "ufw"], + capture_output=True, text=True + ) + output = res.stdout.strip() + print(f" Output: '{output}'") + + if res.returncode == 0 and output == "active": + ufw_active = True + print(" -> JUDGEMENT: Firewall is ACTIVE (Score: 100)") + else: + print(" -> JUDGEMENT: Firewall is INACTIVE (Score: 0)") + ufw_needs_fix = True + + except FileNotFoundError: + print(" -> ERROR: 'systemctl' command not found.") + except Exception as e: + print(f" -> ERROR: {e}") + + # --------------------------------------------------------- + # 2. SSH Root Login Check Logic + # --------------------------------------------------------- + print("\n[2] Checking SSH Configuration...") + ssh_config = "/etc/ssh/sshd_config" + score_penalty = 0 + ssh_needs_fix = False + + if os.path.exists(ssh_config): + print(f" File found: {ssh_config}") + try: + with open(ssh_config, 'r') as f: + found_risky_setting = False + for line in f: + if line.strip().startswith("PermitRootLogin") and "yes" in line: + print(f" -> FOUND RISKY LINE: {line.strip()}") + score_penalty = 50 + found_risky_setting = True + ssh_needs_fix = True + break + + if not found_risky_setting: + print(" -> No 'PermitRootLogin yes' found (Safe)") + + except PermissionError: + print(" -> ERROR: Permission denied. Try running with 'sudo'.") + else: + print(f" -> WARNING: {ssh_config} does not exist.") + + # --------------------------------------------------------- + # Final Report & History + # --------------------------------------------------------- + print("\n=== Summary ===") + final_score = 100 + if not ufw_active: + final_score = 0 + final_score -= score_penalty + final_score = max(0, final_score) + + status = "OK" + if final_score < 50: status = "CRITICAL" + elif final_score < 100: status = "WARNING" + + print(f"Current Score: {final_score}") + print(f"Status: {status}") + + # --- Trend Tracking --- + print("\n... Saving history ...") + details = [] + if ufw_needs_fix: details.append("Firewall Inactive") + if ssh_needs_fix: details.append("Root SSH Allowed") + + history = save_history(final_score, status, ", ".join(details)) + show_trend(history) + + # --------------------------------------------------------- + # Automated Fixes (Interactive) + # --------------------------------------------------------- + if ufw_needs_fix or ssh_needs_fix: + print("\n=== šŸ› ļø Automated Fixes Available ===") + print("Issues detected that can be automatically fixed.") + user_input = input("Do you want to apply fixes now? (y/n): ").strip().lower() + + if user_input == 'y': + if ufw_needs_fix: + fix_firewall() + if ssh_needs_fix: + fix_ssh_config(ssh_config) + + print("\nāœ… Fixes attempt complete. Please re-run script to verify.") + else: + print("Skipping fixes.") + +if __name__ == "__main__": + # Warn that sudo might be required for execution + if os.geteuid() != 0: + print("NOTE: This script works best with 'sudo' for fixing issues.") + verify_security_logic() \ No newline at end of file From bdfcccf6784edd4eb4f7164a6e6859be81782778 Mon Sep 17 00:00:00 2001 From: hyaku0121 Date: Thu, 11 Dec 2025 23:48:31 +0900 Subject: [PATCH 2/8] feat: Add health monitor core logic, CLI integration, and unit tests --- cortex/cli.py | 79 ++++++++++++++++---- cortex/health/__init__.py | 0 cortex/health/monitor.py | 106 +++++++++++++++++++++++++++ tests/test_health_monitor.py | 137 +++++++++++++++++++++++++++++++++++ 4 files changed, 308 insertions(+), 14 deletions(-) create mode 100644 cortex/health/__init__.py create mode 100644 cortex/health/monitor.py create mode 100644 tests/test_health_monitor.py diff --git a/cortex/cli.py b/cortex/cli.py index 17004c6..c120bca 100644 --- a/cortex/cli.py +++ b/cortex/cli.py @@ -38,7 +38,7 @@ validate_installation_id, ValidationError ) -# Import the new Notification Manager +# Import Notification Manager from cortex.notification_manager import NotificationManager @@ -112,10 +112,9 @@ def _clear_line(self): sys.stdout.write('\r\033[K') sys.stdout.flush() - # --- New Notification Method --- + # --- Notification Method --- def notify(self, args): """Handle notification commands""" - # Addressing CodeRabbit feedback: Handle missing subcommand gracefully if not args.notify_action: self._print_error("Please specify a subcommand (config/enable/disable/dnd/send)") return 1 @@ -132,16 +131,14 @@ def notify(self, args): elif args.notify_action == 'enable': mgr.config["enabled"] = True - # Addressing CodeRabbit feedback: Ideally should use a public method instead of private _save_config, - # but keeping as is for a simple fix (or adding a save method to NotificationManager would be best). - mgr._save_config() + mgr.save_config() self._print_success("Notifications enabled") return 0 elif args.notify_action == 'disable': mgr.config["enabled"] = False - mgr._save_config() - cx_print("Notifications disabled (Critical alerts will still show)", "warning") + mgr.save_config() + self._print_success("Notifications disabled (Critical alerts will still show)") return 0 elif args.notify_action == 'dnd': @@ -149,7 +146,6 @@ def notify(self, args): self._print_error("Please provide start and end times (HH:MM)") return 1 - # Addressing CodeRabbit feedback: Add time format validation try: datetime.strptime(args.start, "%H:%M") datetime.strptime(args.end, "%H:%M") @@ -159,7 +155,7 @@ def notify(self, args): mgr.config["dnd_start"] = args.start mgr.config["dnd_end"] = args.end - mgr._save_config() + mgr.save_config() self._print_success(f"DND Window updated: {args.start} - {args.end}") return 0 @@ -174,7 +170,56 @@ def notify(self, args): else: self._print_error("Unknown notify command") return 1 - # ------------------------------- + + # --- New Health Command --- + def health(self, args): + """Run system health checks and show recommendations""" + from cortex.health.monitor import HealthMonitor + + self._print_status("šŸ”", "Running system health checks...") + monitor = HealthMonitor() + report = monitor.run_all() + + # --- Display Results --- + score = report['total_score'] + + # Color code the score + score_color = "green" + if score < 60: score_color = "red" + elif score < 80: score_color = "yellow" + + console.print() + console.print(f"šŸ“Š [bold]System Health Score:[/bold] [{score_color}]{score}/100[/{score_color}]") + console.print() + + console.print("[bold]Factors:[/bold]") + recommendations = [] + + for res in report['results']: + status_icon = "āœ…" + if res['status'] == 'WARNING': status_icon = "āš ļø " + elif res['status'] == 'CRITICAL': status_icon = "āŒ" + + console.print(f" {status_icon} {res['name']:<15}: {res['score']}/100 ({res['details']})") + + if res['recommendation']: + recommendations.append(res['recommendation']) + + console.print() + + if recommendations: + console.print("[bold]Recommendations:[/bold]") + for i, rec in enumerate(recommendations, 1): + console.print(f" {i}. {rec}") + + console.print() + # Note: Auto-fix logic would go here, prompting user to apply specific commands. + # For this iteration, we display actionable advice. + console.print("[dim]Run suggested commands manually to improve your score.[/dim]") + else: + self._print_success("System is in excellent health! No actions needed.") + + return 0 def install(self, software: str, execute: bool = False, dry_run: bool = False): # Validate input first @@ -543,7 +588,8 @@ def show_rich_help(): table.add_row("install ", "Install software") table.add_row("history", "View history") table.add_row("rollback ", "Undo installation") - table.add_row("notify", "Manage desktop notifications") # Added this line + table.add_row("notify", "Manage desktop notifications") + table.add_row("health", "Check system health score") # Added this line console.print(table) console.print() @@ -598,7 +644,7 @@ def main(): edit_pref_parser.add_argument('key', nargs='?') edit_pref_parser.add_argument('value', nargs='?') - # --- New Notify Command --- + # --- Notify Command --- notify_parser = subparsers.add_parser('notify', help='Manage desktop notifications') notify_subs = notify_parser.add_subparsers(dest='notify_action', help='Notify actions') @@ -615,6 +661,9 @@ def main(): send_parser.add_argument('--title', default='Cortex Notification') send_parser.add_argument('--level', choices=['low', 'normal', 'critical'], default='normal') send_parser.add_argument('--actions', nargs='*', help='Action buttons') + + # --- New Health Command --- + health_parser = subparsers.add_parser('health', help='Check system health score') # -------------------------- args = parser.parse_args() @@ -642,9 +691,11 @@ def main(): return cli.check_pref(key=args.key) elif args.command == 'edit-pref': return cli.edit_pref(action=args.action, key=args.key, value=args.value) - # Handle the new notify command elif args.command == 'notify': return cli.notify(args) + # Handle new command + elif args.command == 'health': + return cli.health(args) else: parser.print_help() return 1 diff --git a/cortex/health/__init__.py b/cortex/health/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cortex/health/monitor.py b/cortex/health/monitor.py new file mode 100644 index 0000000..c85b624 --- /dev/null +++ b/cortex/health/monitor.py @@ -0,0 +1,106 @@ +import json +import time +from abc import ABC, abstractmethod +from dataclasses import dataclass, field +from pathlib import Path +from typing import List, Dict, Optional +from rich.console import Console + +console = Console() + +@dataclass +class CheckResult: + """Data class to hold the result of each check""" + name: str # Item name (e.g. "Disk Space") + category: str # Category (security, updates, performance, disk) + score: int # Score 0-100 + status: str # "OK", "WARNING", "CRITICAL" + details: str # Detailed message + recommendation: Optional[str] = None # Recommended action (if any) + weight: float = 1.0 # Weight for weighted average + +class HealthCheck(ABC): + """Base class inherited by all health check modules""" + @abstractmethod + def run(self) -> CheckResult: + pass + +class HealthMonitor: + """ + Main engine for system health monitoring. + """ + def __init__(self): + self.history_file = Path.home() / ".cortex" / "health_history.json" + self.history_file.parent.mkdir(exist_ok=True) + self.checks: List[HealthCheck] = [] + + # Register each check here + # (Import here to prevent circular references) + from .checks.security import SecurityCheck + from .checks.updates import UpdateCheck + from .checks.performance import PerformanceCheck + from .checks.disk import DiskCheck + + self.register_check(SecurityCheck()) + self.register_check(UpdateCheck()) + self.register_check(PerformanceCheck()) + self.register_check(DiskCheck()) + + def register_check(self, check: HealthCheck): + self.checks.append(check) + + def run_all(self) -> Dict: + results = [] + total_weighted_score = 0 + total_weight = 0 + + for check in self.checks: + try: + result = check.run() + results.append(result) + total_weighted_score += result.score * result.weight + total_weight += result.weight + except Exception as e: + console.print(f"[red]Error running check {check.__class__.__name__}: {e}[/red]") + + final_score = 0 + if total_weight > 0: + final_score = int(total_weighted_score / total_weight) + + report = { + "timestamp": time.strftime("%Y-%m-%dT%H:%M:%S"), + "total_score": final_score, + "results": [ + { + "name": r.name, + "category": r.category, + "score": r.score, + "status": r.status, + "details": r.details, + "recommendation": r.recommendation + } + for r in results + ] + } + + self._save_history(report) + return report + + def _save_history(self, report: Dict): + history = [] + if self.history_file.exists(): + try: + with open(self.history_file, 'r') as f: + history = json.load(f) + except json.JSONDecodeError: + pass + + history.append(report) + history = history[-100:] + + with open(self.history_file, 'w') as f: + json.dump(history, f, indent=4) + +if __name__ == "__main__": + # For testing execution + print("HealthMonitor initialized.") \ No newline at end of file diff --git a/tests/test_health_monitor.py b/tests/test_health_monitor.py new file mode 100644 index 0000000..d352f0d --- /dev/null +++ b/tests/test_health_monitor.py @@ -0,0 +1,137 @@ +import unittest +from unittest.mock import patch, MagicMock, mock_open +from cortex.health.monitor import HealthMonitor, CheckResult +from cortex.health.checks.disk import DiskCheck +from cortex.health.checks.performance import PerformanceCheck +from cortex.health.checks.security import SecurityCheck +from cortex.health.checks.updates import UpdateCheck + +class TestDiskCheck(unittest.TestCase): + @patch('shutil.disk_usage') + def test_disk_usage_scoring(self, mock_usage): + # Case 1: Healthy (50% used) -> 100 pts + # total=100, used=50, free=50 + mock_usage.return_value = (100, 50, 50) + check = DiskCheck() + result = check.run() + self.assertEqual(result.score, 100) + self.assertEqual(result.status, "OK") + + # Case 2: Warning (85% used) -> 50 pts + mock_usage.return_value = (100, 85, 15) + result = check.run() + self.assertEqual(result.score, 50) + self.assertEqual(result.status, "WARNING") + + # Case 3: Critical (95% used) -> 0 pts + mock_usage.return_value = (100, 95, 5) + result = check.run() + self.assertEqual(result.score, 0) + self.assertEqual(result.status, "CRITICAL") + +class TestPerformanceCheck(unittest.TestCase): + @patch('os.getloadavg') + @patch('multiprocessing.cpu_count') + def test_load_average(self, mock_cpu, mock_load): + # Case 1: Load OK (Load 2.0 / 4 Cores = 0.5 ratio) + mock_cpu.return_value = 4 + mock_load.return_value = (2.0, 2.0, 2.0) + + # Mock reading /proc/meminfo (Normal case) + mem_data = "MemTotal: 1000 kB\nMemAvailable: 500 kB\n" + with patch('builtins.open', mock_open(read_data=mem_data)): + check = PerformanceCheck() + result = check.run() + self.assertEqual(result.score, 100) # No penalty + + @patch('os.getloadavg') + @patch('multiprocessing.cpu_count') + def test_high_load_penalty(self, mock_cpu, mock_load): + # Case 2: High Load (Load 5.0 / 4 Cores = 1.25 ratio) -> -50 pts + mock_cpu.return_value = 4 + mock_load.return_value = (5.0, 5.0, 5.0) + + # Assume memory is normal + mem_data = "MemTotal: 1000 kB\nMemAvailable: 500 kB\n" + with patch('builtins.open', mock_open(read_data=mem_data)): + check = PerformanceCheck() + result = check.run() + self.assertEqual(result.score, 50) # 100 - 50 = 50 + +class TestSecurityCheck(unittest.TestCase): + @patch('subprocess.run') + def test_ufw_status(self, mock_run): + # Case 1: UFW Inactive -> 0 pts + mock_run.return_value.stdout = "inactive" + mock_run.return_value.returncode = 0 + + check = SecurityCheck() + result = check.run() + self.assertEqual(result.score, 0) + self.assertIn("Firewall Inactive", result.details) + + @patch('subprocess.run') + def test_ufw_active(self, mock_run): + # Case 2: UFW Active -> 100 pts (SSH config is safe by default mock) + mock_run.return_value.stdout = "active" + mock_run.return_value.returncode = 0 + + # Test error handling when sshd_config does not exist + with patch('os.path.exists', return_value=False): + check = SecurityCheck() + result = check.run() + self.assertEqual(result.score, 100) + +class TestUpdateCheck(unittest.TestCase): + @patch('subprocess.run') + def test_apt_updates(self, mock_run): + # Mock output for apt list --upgradable + # Ignore first line, packages start from 2nd line + apt_output = """Listing... Done +package1/stable 1.0.0 amd64 [upgradable from: 0.9.9] +package2/stable 2.0.0 amd64 [upgradable from: 1.9.9] +security-pkg/stable 1.0.1 amd64 [upgradable from: 1.0.0] - Security Update +""" + mock_run.return_value.stdout = apt_output + mock_run.return_value.returncode = 0 + + check = UpdateCheck() + result = check.run() + + # Calculation: + # Total packages: 3 + # Security packages: 1 (line containing "security") + # Penalty: (3 * 2) + (1 * 10) = 6 + 10 = 16 pts + # Expected score: 100 - 16 = 84 pts + + self.assertEqual(result.score, 84) + self.assertIn("3 pending", result.details) + +class TestHealthMonitor(unittest.TestCase): + def test_monitor_aggregation(self): + monitor = HealthMonitor() + # Register mock checks instead of real check classes + + mock_check1 = MagicMock() + mock_check1.run.return_value = CheckResult( + name="Check1", category="test", score=100, status="OK", details="", weight=0.5 + ) + + mock_check2 = MagicMock() + mock_check2.run.return_value = CheckResult( + name="Check2", category="test", score=0, status="CRITICAL", details="", weight=0.5 + ) + + monitor.checks = [mock_check1, mock_check2] + + # Mock history saving to prevent file write + with patch.object(monitor, '_save_history'): + report = monitor.run_all() + + # Weighted average calculation: + # (100 * 0.5) + (0 * 0.5) = 50 / (0.5 + 0.5) = 50 pts + self.assertEqual(report['total_score'], 50) + self.assertEqual(len(report['results']), 2) + +if __name__ == '__main__': + unittest.main() \ No newline at end of file From 95215f78190f6ab1aa343498cf2829e3b982ecf8 Mon Sep 17 00:00:00 2001 From: hyaku0121 Date: Thu, 11 Dec 2025 23:57:14 +0900 Subject: [PATCH 3/8] fix: Add timeouts to subprocess calls to improve reliability --- cortex/health/checks/security.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/cortex/health/checks/security.py b/cortex/health/checks/security.py index 7e21afb..c731319 100644 --- a/cortex/health/checks/security.py +++ b/cortex/health/checks/security.py @@ -11,15 +11,22 @@ def run(self) -> CheckResult: # 1. Firewall (UFW) Check ufw_active = False try: + # Add timeout to prevent hanging (Fixes Reliability Issue) res = subprocess.run( ["systemctl", "is-active", "ufw"], - capture_output=True, text=True + capture_output=True, + text=True, + timeout=5 ) # Fix: Use exact match to avoid matching "inactive" which contains "active" if res.returncode == 0 and res.stdout.strip() == "active": ufw_active = True + except subprocess.TimeoutExpired: + pass # Command timed out, treat as inactive or unavailable except FileNotFoundError: pass # Environment without systemctl (e.g., Docker or non-systemd) + except Exception: + pass # Generic error protection if not ufw_active: score = 0 # Spec: 0 points if Firewall is inactive @@ -41,6 +48,8 @@ def run(self) -> CheckResult: break except PermissionError: pass # Cannot read config, skip check + except Exception: + pass # Generic error protection status = "OK" if score < 50: status = "CRITICAL" From dff20927b17eb18a09dd41c0873358c43debafcd Mon Sep 17 00:00:00 2001 From: hyaku0121 Date: Fri, 12 Dec 2025 00:01:28 +0900 Subject: [PATCH 4/8] refactor: Address code review feedback (docstrings, timeouts, complexity) --- cortex/health/checks/disk.py | 28 ++++++---- cortex/health/checks/security.py | 88 ++++++++++++++++++-------------- cortex/health/checks/updates.py | 49 +++++++++--------- 3 files changed, 92 insertions(+), 73 deletions(-) diff --git a/cortex/health/checks/disk.py b/cortex/health/checks/disk.py index 21dcc94..bd126d7 100644 --- a/cortex/health/checks/disk.py +++ b/cortex/health/checks/disk.py @@ -2,32 +2,38 @@ from ..monitor import HealthCheck, CheckResult class DiskCheck(HealthCheck): + """Check root filesystem disk usage.""" + def run(self) -> CheckResult: - total, used, free = shutil.disk_usage("/") - # Calculate usage percentage + """ + Calculate disk usage percentage. + + Returns: + CheckResult based on usage thresholds. + """ + # Use _ for unused variable (free space) + total, used, _ = shutil.disk_usage("/") usage_percent = (used / total) * 100 score = 100 status = "OK" - details = f"{usage_percent:.1f}% Used" rec = None - - # Scoring logic (Spec compliant) + if usage_percent > 90: score = 0 status = "CRITICAL" - rec = "Clean package cache (+50 pts)" + rec = "Clean up disk space immediately" elif usage_percent > 80: score = 50 status = "WARNING" - rec = "Clean package cache (+10 pts)" - + rec = "Consider cleaning up disk space" + return CheckResult( - name="Disk Space", + name="Disk Usage", category="disk", score=score, status=status, - details=details, + details=f"{usage_percent:.1f}% used", recommendation=rec, - weight=0.15 # 15% + weight=0.20 ) \ No newline at end of file diff --git a/cortex/health/checks/security.py b/cortex/health/checks/security.py index c731319..0b59c67 100644 --- a/cortex/health/checks/security.py +++ b/cortex/health/checks/security.py @@ -3,53 +3,32 @@ from ..monitor import HealthCheck, CheckResult class SecurityCheck(HealthCheck): + """Check security configuration including firewall and SSH settings.""" + def run(self) -> CheckResult: + """ + Run security checks for firewall status and SSH configuration. + + Returns: + CheckResult with security score based on detected issues. + """ score = 100 issues = [] recommendations = [] # 1. Firewall (UFW) Check - ufw_active = False - try: - # Add timeout to prevent hanging (Fixes Reliability Issue) - res = subprocess.run( - ["systemctl", "is-active", "ufw"], - capture_output=True, - text=True, - timeout=5 - ) - # Fix: Use exact match to avoid matching "inactive" which contains "active" - if res.returncode == 0 and res.stdout.strip() == "active": - ufw_active = True - except subprocess.TimeoutExpired: - pass # Command timed out, treat as inactive or unavailable - except FileNotFoundError: - pass # Environment without systemctl (e.g., Docker or non-systemd) - except Exception: - pass # Generic error protection - + ufw_active, ufw_issue, ufw_rec = self._check_firewall() if not ufw_active: - score = 0 # Spec: 0 points if Firewall is inactive - issues.append("Firewall Inactive") - recommendations.append("Enable UFW Firewall") + score = 0 + issues.append(ufw_issue) + recommendations.append(ufw_rec) # 2. SSH Root Login Check - try: - ssh_config = "/etc/ssh/sshd_config" - if os.path.exists(ssh_config): - with open(ssh_config, 'r') as f: - for line in f: - line = line.strip() - # Check for uncommented PermitRootLogin yes - if line.startswith("PermitRootLogin") and "yes" in line.split(): - score -= 50 - issues.append("Root SSH Allowed") - recommendations.append("Disable SSH Root Login in sshd_config") - break - except PermissionError: - pass # Cannot read config, skip check - except Exception: - pass # Generic error protection + ssh_penalty, ssh_issue, ssh_rec = self._check_ssh_root_login() + if ssh_penalty > 0: + score -= ssh_penalty + issues.append(ssh_issue) + recommendations.append(ssh_rec) status = "OK" if score < 50: status = "CRITICAL" @@ -63,4 +42,35 @@ def run(self) -> CheckResult: details=", ".join(issues) if issues else "Secure", recommendation=", ".join(recommendations) if recommendations else None, weight=0.35 - ) \ No newline at end of file + ) + + def _check_firewall(self): + """Check if UFW is active.""" + try: + res = subprocess.run( + ["systemctl", "is-active", "ufw"], + capture_output=True, + text=True, + timeout=10 + ) + if res.returncode == 0 and res.stdout.strip() == "active": + return True, None, None + except (subprocess.TimeoutExpired, FileNotFoundError, Exception): + pass + + return False, "Firewall Inactive", "Enable UFW Firewall" + + def _check_ssh_root_login(self): + """Check for PermitRootLogin yes in sshd_config.""" + try: + ssh_config = "/etc/ssh/sshd_config" + if os.path.exists(ssh_config): + with open(ssh_config, 'r') as f: + for line in f: + line = line.strip() + if line.startswith("PermitRootLogin") and "yes" in line.split(): + return 50, "Root SSH Allowed", "Disable SSH Root Login in sshd_config" + except (PermissionError, Exception): + pass + + return 0, None, None \ No newline at end of file diff --git a/cortex/health/checks/updates.py b/cortex/health/checks/updates.py index 27c01cb..d40b251 100644 --- a/cortex/health/checks/updates.py +++ b/cortex/health/checks/updates.py @@ -2,48 +2,51 @@ from ..monitor import HealthCheck, CheckResult class UpdateCheck(HealthCheck): + """Check for pending system updates and security patches.""" + def run(self) -> CheckResult: + """ + Check for available updates using apt. + + Returns: + CheckResult with score based on pending updates. + """ score = 100 pkg_count = 0 sec_count = 0 - rec = None - # Parse apt list --upgradable try: - # Execute safely without pipeline + # Add timeout to prevent hangs res = subprocess.run( ["apt", "list", "--upgradable"], - capture_output=True, text=True + capture_output=True, + text=True, + timeout=30 ) - lines = res.stdout.splitlines() - # Skip first line "Listing..." + + # apt list output header usually takes first line for line in lines[1:]: if line.strip(): - pkg_count += 1 if "security" in line.lower(): sec_count += 1 + else: + pkg_count += 1 # Scoring - score -= (pkg_count * 2) # -2 pts per normal package - score -= (sec_count * 10) # -10 pts per security package - - if pkg_count > 0: - rec = f"Install {pkg_count} updates (+{100-score} pts)" + score -= (pkg_count * 2) + score -= (sec_count * 10) - except FileNotFoundError: - # Skip on non-apt environments (100 pts) - return CheckResult("Updates", "updates", 100, "SKIP", "apt not found", weight=0.30) except Exception: - pass # Ignore errors + pass status = "OK" - if score < 60: status = "CRITICAL" - elif score < 100: status = "WARNING" + if score < 50: status = "CRITICAL" + elif score < 90: status = "WARNING" - details = f"{pkg_count} pending" - if sec_count > 0: - details += f" ({sec_count} security)" + details = f"{pkg_count} packages, {sec_count} security updates pending" + if pkg_count == 0 and sec_count == 0: + details = "System up to date" return CheckResult( name="System Updates", @@ -51,6 +54,6 @@ def run(self) -> CheckResult: score=max(0, score), status=status, details=details, - recommendation=rec, - weight=0.30 # 30% + recommendation="Run 'apt upgrade'" if score < 100 else None, + weight=0.25 ) \ No newline at end of file From f7a5653d594c87cfdcb32d4cb5d991c1894bd730 Mon Sep 17 00:00:00 2001 From: hyaku0121 Date: Fri, 12 Dec 2025 00:06:21 +0900 Subject: [PATCH 5/8] refactor: Improve security check complexity and SSH parsing logic --- cortex/health/checks/security.py | 68 +++++++++++++++++++++----------- 1 file changed, 45 insertions(+), 23 deletions(-) diff --git a/cortex/health/checks/security.py b/cortex/health/checks/security.py index 0b59c67..0f80fbb 100644 --- a/cortex/health/checks/security.py +++ b/cortex/health/checks/security.py @@ -3,32 +3,41 @@ from ..monitor import HealthCheck, CheckResult class SecurityCheck(HealthCheck): - """Check security configuration including firewall and SSH settings.""" + """ + Checks system security posture including firewall status and SSH configuration. + + Evaluates UFW firewall activity and SSH root login permissions, + returning a weighted score and actionable recommendations. + """ def run(self) -> CheckResult: """ - Run security checks for firewall status and SSH configuration. + Execute security checks and return aggregated results. Returns: - CheckResult with security score based on detected issues. + CheckResult: Security assessment with score (0-100), status, + detected issues, and recommendations. """ score = 100 issues = [] recommendations = [] # 1. Firewall (UFW) Check - ufw_active, ufw_issue, ufw_rec = self._check_firewall() - if not ufw_active: - score = 0 - issues.append(ufw_issue) - recommendations.append(ufw_rec) + # Returns: score_delta (negative for penalty), issues, recommendations + fw_score_delta, fw_issues, fw_recs = self._check_firewall() + + # If firewall is inactive, score becomes 0 immediately per requirements + if fw_score_delta == -100: + score = 0 + + issues.extend(fw_issues) + recommendations.extend(fw_recs) # 2. SSH Root Login Check - ssh_penalty, ssh_issue, ssh_rec = self._check_ssh_root_login() - if ssh_penalty > 0: - score -= ssh_penalty - issues.append(ssh_issue) - recommendations.append(ssh_rec) + ssh_score_delta, ssh_issues, ssh_recs = self._check_ssh_root_login() + score += ssh_score_delta + issues.extend(ssh_issues) + recommendations.extend(ssh_recs) status = "OK" if score < 50: status = "CRITICAL" @@ -44,8 +53,13 @@ def run(self) -> CheckResult: weight=0.35 ) - def _check_firewall(self): - """Check if UFW is active.""" + def _check_firewall(self) -> tuple[int, list[str], list[str]]: + """ + Check if UFW is active. + + Returns: + tuple: (score_delta, issues_list, recommendations_list) + """ try: res = subprocess.run( ["systemctl", "is-active", "ufw"], @@ -54,23 +68,31 @@ def _check_firewall(self): timeout=10 ) if res.returncode == 0 and res.stdout.strip() == "active": - return True, None, None + return 0, [], [] except (subprocess.TimeoutExpired, FileNotFoundError, Exception): pass - return False, "Firewall Inactive", "Enable UFW Firewall" + # Return -100 to signal immediate failure condition + return -100, ["Firewall Inactive"], ["Enable UFW Firewall"] - def _check_ssh_root_login(self): - """Check for PermitRootLogin yes in sshd_config.""" + def _check_ssh_root_login(self) -> tuple[int, list[str], list[str]]: + """ + Check for PermitRootLogin yes in sshd_config. + + Returns: + tuple: (score_delta, issues_list, recommendations_list) + """ try: ssh_config = "/etc/ssh/sshd_config" if os.path.exists(ssh_config): with open(ssh_config, 'r') as f: for line in f: - line = line.strip() - if line.startswith("PermitRootLogin") and "yes" in line.split(): - return 50, "Root SSH Allowed", "Disable SSH Root Login in sshd_config" + parts = line.split() + # Precise check: PermitRootLogin must be the first word, yes the second + # This avoids matching commented lines or "no" followed by comments + if len(parts) >= 2 and parts[0] == "PermitRootLogin" and parts[1] == "yes": + return -50, ["Root SSH Allowed"], ["Disable SSH Root Login in sshd_config"] except (PermissionError, Exception): pass - return 0, None, None \ No newline at end of file + return 0, [], [] \ No newline at end of file From dc4143e7f659e9d184a8f5108e8035ed5586fc58 Mon Sep 17 00:00:00 2001 From: hyaku0121 Date: Fri, 12 Dec 2025 00:11:13 +0900 Subject: [PATCH 6/8] fix: Resolve SonarCloud code smells and reduce complexity --- cortex/health/checks/disk.py | 38 +++++++---- cortex/health/checks/security.py | 11 ++- scripts/verify_ubuntu_compatibility.py | 94 ++++++++++++-------------- 3 files changed, 71 insertions(+), 72 deletions(-) diff --git a/cortex/health/checks/disk.py b/cortex/health/checks/disk.py index bd126d7..631e974 100644 --- a/cortex/health/checks/disk.py +++ b/cortex/health/checks/disk.py @@ -15,25 +15,35 @@ def run(self) -> CheckResult: total, used, _ = shutil.disk_usage("/") usage_percent = (used / total) * 100 - score = 100 - status = "OK" - rec = None - + # Explicit early returns to avoid static analysis confusion if usage_percent > 90: - score = 0 - status = "CRITICAL" - rec = "Clean up disk space immediately" - elif usage_percent > 80: - score = 50 - status = "WARNING" - rec = "Consider cleaning up disk space" + return CheckResult( + name="Disk Usage", + category="disk", + score=0, + status="CRITICAL", + details=f"{usage_percent:.1f}% used", + recommendation="Clean up disk space immediately", + weight=0.20 + ) + if usage_percent > 80: + return CheckResult( + name="Disk Usage", + category="disk", + score=50, + status="WARNING", + details=f"{usage_percent:.1f}% used", + recommendation="Consider cleaning up disk space", + weight=0.20 + ) + return CheckResult( name="Disk Usage", category="disk", - score=score, - status=status, + score=100, + status="OK", details=f"{usage_percent:.1f}% used", - recommendation=rec, + recommendation=None, weight=0.20 ) \ No newline at end of file diff --git a/cortex/health/checks/security.py b/cortex/health/checks/security.py index 0f80fbb..64d594c 100644 --- a/cortex/health/checks/security.py +++ b/cortex/health/checks/security.py @@ -23,10 +23,8 @@ def run(self) -> CheckResult: recommendations = [] # 1. Firewall (UFW) Check - # Returns: score_delta (negative for penalty), issues, recommendations fw_score_delta, fw_issues, fw_recs = self._check_firewall() - # If firewall is inactive, score becomes 0 immediately per requirements if fw_score_delta == -100: score = 0 @@ -69,10 +67,10 @@ def _check_firewall(self) -> tuple[int, list[str], list[str]]: ) if res.returncode == 0 and res.stdout.strip() == "active": return 0, [], [] - except (subprocess.TimeoutExpired, FileNotFoundError, Exception): + except Exception: + # Catch-all is intentional here for robustness against missing systemctl etc. pass - # Return -100 to signal immediate failure condition return -100, ["Firewall Inactive"], ["Enable UFW Firewall"] def _check_ssh_root_login(self) -> tuple[int, list[str], list[str]]: @@ -88,11 +86,10 @@ def _check_ssh_root_login(self) -> tuple[int, list[str], list[str]]: with open(ssh_config, 'r') as f: for line in f: parts = line.split() - # Precise check: PermitRootLogin must be the first word, yes the second - # This avoids matching commented lines or "no" followed by comments if len(parts) >= 2 and parts[0] == "PermitRootLogin" and parts[1] == "yes": return -50, ["Root SSH Allowed"], ["Disable SSH Root Login in sshd_config"] - except (PermissionError, Exception): + except Exception: + # Catch-all is intentional here for file permission issues etc. pass return 0, [], [] \ No newline at end of file diff --git a/scripts/verify_ubuntu_compatibility.py b/scripts/verify_ubuntu_compatibility.py index dca1c0b..1d1beac 100644 --- a/scripts/verify_ubuntu_compatibility.py +++ b/scripts/verify_ubuntu_compatibility.py @@ -4,13 +4,14 @@ import json import datetime import shutil +import pathlib -# File name to store history data -HISTORY_FILE = "security_history.json" +# Use absolute path for history file +HISTORY_FILE = pathlib.Path.home() / ".cortex" / "security_history.json" def load_history(): """Load past execution history""" - if os.path.exists(HISTORY_FILE): + if HISTORY_FILE.exists(): try: with open(HISTORY_FILE, 'r') as f: return json.load(f) @@ -20,6 +21,8 @@ def load_history(): def save_history(score, status, details): """Save execution result to history""" + HISTORY_FILE.parent.mkdir(parents=True, exist_ok=True) + history = load_history() record = { "timestamp": datetime.datetime.now().isoformat(), @@ -28,7 +31,6 @@ def save_history(score, status, details): "details": details } history.append(record) - # Keep only the latest 10 records history = history[-10:] with open(HISTORY_FILE, 'w') as f: @@ -59,25 +61,21 @@ def show_trend(history): elif diff < 0: print(f" Trend: šŸ“‰ Dropped by {abs(diff)} points since previous run") else: - print(f" Trend: āž”ļø Stable") + print(" Trend: āž”ļø Stable") def fix_firewall(): """Enable Firewall (Automated Fix)""" print("\n [Fixing] Enabling UFW Firewall...") - # Check if ufw is installed using 'which' or checking path - # (Since sudo is used, we check if we can find ufw path) if not shutil.which("ufw") and not os.path.exists("/usr/sbin/ufw"): print(" -> āš ļø UFW is not installed. Cannot enable.") - print(" (Try: sudo apt install ufw)") return False try: - # Depends on execution environment, sudo might be required - subprocess.run(["sudo", "ufw", "enable"], check=True) + subprocess.run(["sudo", "ufw", "enable"], check=True, timeout=30) print(" -> āœ… Success: Firewall enabled.") return True - except subprocess.CalledProcessError as e: + except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as e: print(f" -> āŒ Failed to enable firewall: {e}") return False @@ -85,12 +83,10 @@ def fix_ssh_config(config_path): """Disable SSH Root Login (Automated Fix)""" print(f"\n [Fixing] Disabling Root Login in {config_path}...") - # Check if file exists before trying to fix if not os.path.exists(config_path): print(f" -> āš ļø Config file not found: {config_path}") return False - # 1. Create backup backup_path = config_path + ".bak." + datetime.datetime.now().strftime("%Y%m%d%H%M%S") try: shutil.copy2(config_path, backup_path) @@ -99,7 +95,6 @@ def fix_ssh_config(config_path): print(" -> āŒ Failed to create backup (Permission denied). Need sudo?") return False - # 2. Rewrite configuration try: new_lines = [] with open(config_path, 'r') as f: @@ -108,7 +103,6 @@ def fix_ssh_config(config_path): fixed = False for line in lines: if line.strip().startswith("PermitRootLogin") and "yes" in line: - # Comment out and add disabled setting new_lines.append(f"# {line.strip()} (Disabled by Auto-Fix)\n") new_lines.append("PermitRootLogin no\n") fixed = True @@ -120,88 +114,89 @@ def fix_ssh_config(config_path): f.writelines(new_lines) print(" -> āœ… Success: sshd_config updated.") - # Attempt to restart SSH service print(" -> Restarting sshd service...") - subprocess.run(["sudo", "systemctl", "restart", "ssh"], check=False) + res = subprocess.run( + ["sudo", "systemctl", "restart", "ssh"], + capture_output=True, text=True, timeout=30 + ) + if res.returncode != 0: + print(f" -> āš ļø SSH restart failed: {res.stderr}") + return True return True else: print(" -> No changes needed.") return True - except PermissionError: - print(" -> āŒ Failed to write config (Permission denied). Need sudo?") - return False except Exception as e: print(f" -> āŒ Error during fix: {e}") return False -def verify_security_logic(): - print("=== Ubuntu Security Logic Verification ===") - - # --------------------------------------------------------- - # 1. Firewall (UFW) Check Logic - # --------------------------------------------------------- +def _check_firewall_status(): + """Helper to check firewall status.""" print("\n[1] Checking Firewall (UFW)...") - ufw_active = False - ufw_needs_fix = False try: print(" Running: systemctl is-active ufw") res = subprocess.run( ["systemctl", "is-active", "ufw"], - capture_output=True, text=True + capture_output=True, text=True, timeout=10 ) output = res.stdout.strip() print(f" Output: '{output}'") if res.returncode == 0 and output == "active": - ufw_active = True print(" -> JUDGEMENT: Firewall is ACTIVE (Score: 100)") + return True else: print(" -> JUDGEMENT: Firewall is INACTIVE (Score: 0)") - ufw_needs_fix = True + return False except FileNotFoundError: print(" -> ERROR: 'systemctl' command not found.") except Exception as e: print(f" -> ERROR: {e}") + return False - # --------------------------------------------------------- - # 2. SSH Root Login Check Logic - # --------------------------------------------------------- +def _check_ssh_status(ssh_config): + """Helper to check SSH status.""" print("\n[2] Checking SSH Configuration...") - ssh_config = "/etc/ssh/sshd_config" score_penalty = 0 - ssh_needs_fix = False + needs_fix = False if os.path.exists(ssh_config): print(f" File found: {ssh_config}") try: with open(ssh_config, 'r') as f: - found_risky_setting = False for line in f: - if line.strip().startswith("PermitRootLogin") and "yes" in line: + parts = line.split() + if len(parts) >= 2 and parts[0] == "PermitRootLogin" and parts[1] == "yes": print(f" -> FOUND RISKY LINE: {line.strip()}") score_penalty = 50 - found_risky_setting = True - ssh_needs_fix = True + needs_fix = True break - if not found_risky_setting: + if not needs_fix: print(" -> No 'PermitRootLogin yes' found (Safe)") except PermissionError: print(" -> ERROR: Permission denied. Try running with 'sudo'.") else: print(f" -> WARNING: {ssh_config} does not exist.") + + return score_penalty, needs_fix - # --------------------------------------------------------- - # Final Report & History - # --------------------------------------------------------- +def verify_security_logic(): + print("=== Ubuntu Security Logic Verification ===") + + ufw_active = _check_firewall_status() + ssh_config = "/etc/ssh/sshd_config" + ssh_penalty, ssh_needs_fix = _check_ssh_status(ssh_config) + + # Final Report print("\n=== Summary ===") final_score = 100 if not ufw_active: final_score = 0 - final_score -= score_penalty + final_score -= ssh_penalty final_score = max(0, final_score) status = "OK" @@ -211,18 +206,17 @@ def verify_security_logic(): print(f"Current Score: {final_score}") print(f"Status: {status}") - # --- Trend Tracking --- + # History print("\n... Saving history ...") details = [] + ufw_needs_fix = not ufw_active if ufw_needs_fix: details.append("Firewall Inactive") if ssh_needs_fix: details.append("Root SSH Allowed") history = save_history(final_score, status, ", ".join(details)) show_trend(history) - # --------------------------------------------------------- - # Automated Fixes (Interactive) - # --------------------------------------------------------- + # Automated Fixes if ufw_needs_fix or ssh_needs_fix: print("\n=== šŸ› ļø Automated Fixes Available ===") print("Issues detected that can be automatically fixed.") @@ -233,13 +227,11 @@ def verify_security_logic(): fix_firewall() if ssh_needs_fix: fix_ssh_config(ssh_config) - print("\nāœ… Fixes attempt complete. Please re-run script to verify.") else: print("Skipping fixes.") if __name__ == "__main__": - # Warn that sudo might be required for execution if os.geteuid() != 0: print("NOTE: This script works best with 'sudo' for fixing issues.") verify_security_logic() \ No newline at end of file From 6ba17150582791a96d209326ed3a7ee4002843e6 Mon Sep 17 00:00:00 2001 From: hyaku0121 Date: Fri, 12 Dec 2025 00:19:09 +0900 Subject: [PATCH 7/8] docs: Add missing docstrings to HealthMonitor public APIs --- cortex/health/monitor.py | 59 ++++++++++++++++++++++++++++------------ 1 file changed, 42 insertions(+), 17 deletions(-) diff --git a/cortex/health/monitor.py b/cortex/health/monitor.py index c85b624..7ba95d0 100644 --- a/cortex/health/monitor.py +++ b/cortex/health/monitor.py @@ -1,7 +1,7 @@ import json import time from abc import ABC, abstractmethod -from dataclasses import dataclass, field +from dataclasses import dataclass from pathlib import Path from typing import List, Dict, Optional from rich.console import Console @@ -10,7 +10,7 @@ @dataclass class CheckResult: - """Data class to hold the result of each check""" + """Data class to hold the result of each check.""" name: str # Item name (e.g. "Disk Space") category: str # Category (security, updates, performance, disk) score: int # Score 0-100 @@ -20,40 +20,59 @@ class CheckResult: weight: float = 1.0 # Weight for weighted average class HealthCheck(ABC): - """Base class inherited by all health check modules""" + """Base class inherited by all health check modules.""" + @abstractmethod def run(self) -> CheckResult: + """Execute the check and return a result.""" pass class HealthMonitor: """ Main engine for system health monitoring. + + Manages registration of health checks, execution, score aggregation, + and history persistence. """ def __init__(self): + """Initialize the health monitor and register default checks.""" self.history_file = Path.home() / ".cortex" / "health_history.json" self.history_file.parent.mkdir(exist_ok=True) self.checks: List[HealthCheck] = [] - + # Register each check here # (Import here to prevent circular references) from .checks.security import SecurityCheck from .checks.updates import UpdateCheck from .checks.performance import PerformanceCheck from .checks.disk import DiskCheck - + self.register_check(SecurityCheck()) self.register_check(UpdateCheck()) self.register_check(PerformanceCheck()) self.register_check(DiskCheck()) + + def register_check(self, check: HealthCheck) -> None: + """ + Register a health check instance to be run as part of the monitor. - def register_check(self, check: HealthCheck): + Args: + check (HealthCheck): The check instance to register. + """ self.checks.append(check) def run_all(self) -> Dict: + """ + Run all registered checks and return an aggregated health report. + + Returns: + Dict: A report containing the timestamp, total weighted score, + and a list of individual check results. + """ results = [] total_weighted_score = 0 total_weight = 0 - + for check in self.checks: try: result = check.run() @@ -62,11 +81,11 @@ def run_all(self) -> Dict: total_weight += result.weight except Exception as e: console.print(f"[red]Error running check {check.__class__.__name__}: {e}[/red]") - + final_score = 0 if total_weight > 0: final_score = int(total_weighted_score / total_weight) - + report = { "timestamp": time.strftime("%Y-%m-%dT%H:%M:%S"), "total_score": final_score, @@ -82,11 +101,17 @@ def run_all(self) -> Dict: for r in results ] } - + self._save_history(report) return report - def _save_history(self, report: Dict): + def _save_history(self, report: Dict) -> None: + """ + Save the current health report to the history JSON file. + + Args: + report (Dict): The health report to save. + """ history = [] if self.history_file.exists(): try: @@ -96,11 +121,11 @@ def _save_history(self, report: Dict): pass history.append(report) + # Keep only the last 100 records history = history[-100:] - with open(self.history_file, 'w') as f: - json.dump(history, f, indent=4) - -if __name__ == "__main__": - # For testing execution - print("HealthMonitor initialized.") \ No newline at end of file + try: + with open(self.history_file, 'w') as f: + json.dump(history, f, indent=4) + except Exception as e: + console.print(f"[yellow]Warning: Could not save health history: {e}[/yellow]") \ No newline at end of file From 618e0759f65587841cbb6387e9b34f50c7c63576 Mon Sep 17 00:00:00 2001 From: hyaku0121 Date: Fri, 12 Dec 2025 00:22:44 +0900 Subject: [PATCH 8/8] fix: Address SonarCloud and CodeRabbit feedback (redundant exceptions, error handling) --- cortex/health/checks/disk.py | 17 ++++- cortex/health/checks/security.py | 109 ++++++++++++------------------- cortex/health/checks/updates.py | 13 +++- 3 files changed, 65 insertions(+), 74 deletions(-) diff --git a/cortex/health/checks/disk.py b/cortex/health/checks/disk.py index 631e974..4b06659 100644 --- a/cortex/health/checks/disk.py +++ b/cortex/health/checks/disk.py @@ -11,9 +11,20 @@ def run(self) -> CheckResult: Returns: CheckResult based on usage thresholds. """ - # Use _ for unused variable (free space) - total, used, _ = shutil.disk_usage("/") - usage_percent = (used / total) * 100 + try: + # Use _ for unused variable (free space) + total, used, _ = shutil.disk_usage("/") + usage_percent = (used / total) * 100 + except Exception as e: + return CheckResult( + name="Disk Usage", + category="disk", + score=0, + status="CRITICAL", + details=f"Check failed: {e}", + recommendation="Check disk mounts and permissions", + weight=0.20 + ) # Explicit early returns to avoid static analysis confusion if usage_percent > 90: diff --git a/cortex/health/checks/security.py b/cortex/health/checks/security.py index 64d594c..c731319 100644 --- a/cortex/health/checks/security.py +++ b/cortex/health/checks/security.py @@ -3,93 +3,64 @@ from ..monitor import HealthCheck, CheckResult class SecurityCheck(HealthCheck): - """ - Checks system security posture including firewall status and SSH configuration. - - Evaluates UFW firewall activity and SSH root login permissions, - returning a weighted score and actionable recommendations. - """ - def run(self) -> CheckResult: - """ - Execute security checks and return aggregated results. - - Returns: - CheckResult: Security assessment with score (0-100), status, - detected issues, and recommendations. - """ score = 100 issues = [] recommendations = [] # 1. Firewall (UFW) Check - fw_score_delta, fw_issues, fw_recs = self._check_firewall() - - if fw_score_delta == -100: - score = 0 - - issues.extend(fw_issues) - recommendations.extend(fw_recs) - - # 2. SSH Root Login Check - ssh_score_delta, ssh_issues, ssh_recs = self._check_ssh_root_login() - score += ssh_score_delta - issues.extend(ssh_issues) - recommendations.extend(ssh_recs) - - status = "OK" - if score < 50: status = "CRITICAL" - elif score < 100: status = "WARNING" - - return CheckResult( - name="Security Posture", - category="security", - score=max(0, score), - status=status, - details=", ".join(issues) if issues else "Secure", - recommendation=", ".join(recommendations) if recommendations else None, - weight=0.35 - ) - - def _check_firewall(self) -> tuple[int, list[str], list[str]]: - """ - Check if UFW is active. - - Returns: - tuple: (score_delta, issues_list, recommendations_list) - """ + ufw_active = False try: + # Add timeout to prevent hanging (Fixes Reliability Issue) res = subprocess.run( ["systemctl", "is-active", "ufw"], capture_output=True, text=True, - timeout=10 + timeout=5 ) + # Fix: Use exact match to avoid matching "inactive" which contains "active" if res.returncode == 0 and res.stdout.strip() == "active": - return 0, [], [] + ufw_active = True + except subprocess.TimeoutExpired: + pass # Command timed out, treat as inactive or unavailable + except FileNotFoundError: + pass # Environment without systemctl (e.g., Docker or non-systemd) except Exception: - # Catch-all is intentional here for robustness against missing systemctl etc. - pass - - return -100, ["Firewall Inactive"], ["Enable UFW Firewall"] + pass # Generic error protection - def _check_ssh_root_login(self) -> tuple[int, list[str], list[str]]: - """ - Check for PermitRootLogin yes in sshd_config. - - Returns: - tuple: (score_delta, issues_list, recommendations_list) - """ + if not ufw_active: + score = 0 # Spec: 0 points if Firewall is inactive + issues.append("Firewall Inactive") + recommendations.append("Enable UFW Firewall") + + # 2. SSH Root Login Check try: ssh_config = "/etc/ssh/sshd_config" if os.path.exists(ssh_config): with open(ssh_config, 'r') as f: for line in f: - parts = line.split() - if len(parts) >= 2 and parts[0] == "PermitRootLogin" and parts[1] == "yes": - return -50, ["Root SSH Allowed"], ["Disable SSH Root Login in sshd_config"] + line = line.strip() + # Check for uncommented PermitRootLogin yes + if line.startswith("PermitRootLogin") and "yes" in line.split(): + score -= 50 + issues.append("Root SSH Allowed") + recommendations.append("Disable SSH Root Login in sshd_config") + break + except PermissionError: + pass # Cannot read config, skip check except Exception: - # Catch-all is intentional here for file permission issues etc. - pass - - return 0, [], [] \ No newline at end of file + pass # Generic error protection + + status = "OK" + if score < 50: status = "CRITICAL" + elif score < 100: status = "WARNING" + + return CheckResult( + name="Security Posture", + category="security", + score=max(0, score), + status=status, + details=", ".join(issues) if issues else "Secure", + recommendation=", ".join(recommendations) if recommendations else None, + weight=0.35 + ) \ No newline at end of file diff --git a/cortex/health/checks/updates.py b/cortex/health/checks/updates.py index d40b251..a38a464 100644 --- a/cortex/health/checks/updates.py +++ b/cortex/health/checks/updates.py @@ -37,8 +37,17 @@ def run(self) -> CheckResult: score -= (pkg_count * 2) score -= (sec_count * 10) - except Exception: - pass + except Exception as e: + # CodeRabbit Suggestion: Return failure state instead of ignoring errors + return CheckResult( + name="System Updates", + category="updates", + score=0, + status="CRITICAL", + details=f"Check failed: {e}", + recommendation="Verify package manager configuration", + weight=0.25 + ) status = "OK" if score < 50: status = "CRITICAL"