diff --git a/cortex/cli.py b/cortex/cli.py index 17004c6..c120bca 100644 --- a/cortex/cli.py +++ b/cortex/cli.py @@ -38,7 +38,7 @@ validate_installation_id, ValidationError ) -# Import the new Notification Manager +# Import Notification Manager from cortex.notification_manager import NotificationManager @@ -112,10 +112,9 @@ def _clear_line(self): sys.stdout.write('\r\033[K') sys.stdout.flush() - # --- New Notification Method --- + # --- Notification Method --- def notify(self, args): """Handle notification commands""" - # Addressing CodeRabbit feedback: Handle missing subcommand gracefully if not args.notify_action: self._print_error("Please specify a subcommand (config/enable/disable/dnd/send)") return 1 @@ -132,16 +131,14 @@ def notify(self, args): elif args.notify_action == 'enable': mgr.config["enabled"] = True - # Addressing CodeRabbit feedback: Ideally should use a public method instead of private _save_config, - # but keeping as is for a simple fix (or adding a save method to NotificationManager would be best). - mgr._save_config() + mgr.save_config() self._print_success("Notifications enabled") return 0 elif args.notify_action == 'disable': mgr.config["enabled"] = False - mgr._save_config() - cx_print("Notifications disabled (Critical alerts will still show)", "warning") + mgr.save_config() + self._print_success("Notifications disabled (Critical alerts will still show)") return 0 elif args.notify_action == 'dnd': @@ -149,7 +146,6 @@ def notify(self, args): self._print_error("Please provide start and end times (HH:MM)") return 1 - # Addressing CodeRabbit feedback: Add time format validation try: datetime.strptime(args.start, "%H:%M") datetime.strptime(args.end, "%H:%M") @@ -159,7 +155,7 @@ def notify(self, args): mgr.config["dnd_start"] = args.start mgr.config["dnd_end"] = args.end - mgr._save_config() + mgr.save_config() self._print_success(f"DND Window updated: {args.start} - {args.end}") return 0 @@ -174,7 +170,56 @@ def notify(self, args): else: self._print_error("Unknown notify command") return 1 - # ------------------------------- + + # --- New Health Command --- + def health(self, args): + """Run system health checks and show recommendations""" + from cortex.health.monitor import HealthMonitor + + self._print_status("šŸ”", "Running system health checks...") + monitor = HealthMonitor() + report = monitor.run_all() + + # --- Display Results --- + score = report['total_score'] + + # Color code the score + score_color = "green" + if score < 60: score_color = "red" + elif score < 80: score_color = "yellow" + + console.print() + console.print(f"šŸ“Š [bold]System Health Score:[/bold] [{score_color}]{score}/100[/{score_color}]") + console.print() + + console.print("[bold]Factors:[/bold]") + recommendations = [] + + for res in report['results']: + status_icon = "āœ…" + if res['status'] == 'WARNING': status_icon = "āš ļø " + elif res['status'] == 'CRITICAL': status_icon = "āŒ" + + console.print(f" {status_icon} {res['name']:<15}: {res['score']}/100 ({res['details']})") + + if res['recommendation']: + recommendations.append(res['recommendation']) + + console.print() + + if recommendations: + console.print("[bold]Recommendations:[/bold]") + for i, rec in enumerate(recommendations, 1): + console.print(f" {i}. {rec}") + + console.print() + # Note: Auto-fix logic would go here, prompting user to apply specific commands. + # For this iteration, we display actionable advice. + console.print("[dim]Run suggested commands manually to improve your score.[/dim]") + else: + self._print_success("System is in excellent health! No actions needed.") + + return 0 def install(self, software: str, execute: bool = False, dry_run: bool = False): # Validate input first @@ -543,7 +588,8 @@ def show_rich_help(): table.add_row("install ", "Install software") table.add_row("history", "View history") table.add_row("rollback ", "Undo installation") - table.add_row("notify", "Manage desktop notifications") # Added this line + table.add_row("notify", "Manage desktop notifications") + table.add_row("health", "Check system health score") # Added this line console.print(table) console.print() @@ -598,7 +644,7 @@ def main(): edit_pref_parser.add_argument('key', nargs='?') edit_pref_parser.add_argument('value', nargs='?') - # --- New Notify Command --- + # --- Notify Command --- notify_parser = subparsers.add_parser('notify', help='Manage desktop notifications') notify_subs = notify_parser.add_subparsers(dest='notify_action', help='Notify actions') @@ -615,6 +661,9 @@ def main(): send_parser.add_argument('--title', default='Cortex Notification') send_parser.add_argument('--level', choices=['low', 'normal', 'critical'], default='normal') send_parser.add_argument('--actions', nargs='*', help='Action buttons') + + # --- New Health Command --- + health_parser = subparsers.add_parser('health', help='Check system health score') # -------------------------- args = parser.parse_args() @@ -642,9 +691,11 @@ def main(): return cli.check_pref(key=args.key) elif args.command == 'edit-pref': return cli.edit_pref(action=args.action, key=args.key, value=args.value) - # Handle the new notify command elif args.command == 'notify': return cli.notify(args) + # Handle new command + elif args.command == 'health': + return cli.health(args) else: parser.print_help() return 1 diff --git a/cortex/health/__init__.py b/cortex/health/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cortex/health/checks/disk.py b/cortex/health/checks/disk.py new file mode 100644 index 0000000..4b06659 --- /dev/null +++ b/cortex/health/checks/disk.py @@ -0,0 +1,60 @@ +import shutil +from ..monitor import HealthCheck, CheckResult + +class DiskCheck(HealthCheck): + """Check root filesystem disk usage.""" + + def run(self) -> CheckResult: + """ + Calculate disk usage percentage. + + Returns: + CheckResult based on usage thresholds. + """ + try: + # Use _ for unused variable (free space) + total, used, _ = shutil.disk_usage("/") + usage_percent = (used / total) * 100 + except Exception as e: + return CheckResult( + name="Disk Usage", + category="disk", + score=0, + status="CRITICAL", + details=f"Check failed: {e}", + recommendation="Check disk mounts and permissions", + weight=0.20 + ) + + # Explicit early returns to avoid static analysis confusion + if usage_percent > 90: + return CheckResult( + name="Disk Usage", + category="disk", + score=0, + status="CRITICAL", + details=f"{usage_percent:.1f}% used", + recommendation="Clean up disk space immediately", + weight=0.20 + ) + + if usage_percent > 80: + return CheckResult( + name="Disk Usage", + category="disk", + score=50, + status="WARNING", + details=f"{usage_percent:.1f}% used", + recommendation="Consider cleaning up disk space", + weight=0.20 + ) + + return CheckResult( + name="Disk Usage", + category="disk", + score=100, + status="OK", + details=f"{usage_percent:.1f}% used", + recommendation=None, + weight=0.20 + ) \ No newline at end of file diff --git a/cortex/health/checks/performance.py b/cortex/health/checks/performance.py new file mode 100644 index 0000000..9e5e66f --- /dev/null +++ b/cortex/health/checks/performance.py @@ -0,0 +1,63 @@ +import os +import multiprocessing +from ..monitor import HealthCheck, CheckResult + +class PerformanceCheck(HealthCheck): + def run(self) -> CheckResult: + score = 100 + issues = [] + rec = None + + # 1. Load Average (1min) + try: + load1, _, _ = os.getloadavg() + cores = multiprocessing.cpu_count() + # Load ratio against core count + load_ratio = load1 / cores + + if load_ratio > 1.0: + score -= 50 + issues.append(f"High Load ({load1:.2f})") + rec = "Check top processes" + except Exception: + pass # Skip on Windows etc. + + # 2. Memory Usage (Linux /proc/meminfo) + try: + with open('/proc/meminfo', 'r') as f: + meminfo = {} + for line in f: + parts = line.split(':') + if len(parts) == 2: + meminfo[parts[0].strip()] = int(parts[1].strip().split()[0]) + + if 'MemTotal' in meminfo and 'MemAvailable' in meminfo: + total = meminfo['MemTotal'] + avail = meminfo['MemAvailable'] + used_percent = ((total - avail) / total) * 100 + + if used_percent > 80: + penalty = int(used_percent - 80) + score -= penalty + issues.append(f"High Memory ({used_percent:.0f}%)") + except FileNotFoundError: + pass # Non-Linux systems + + # Summary of results + status = "OK" + if score < 50: + status = "CRITICAL" + elif score < 90: + status = "WARNING" + + details = ", ".join(issues) if issues else "Optimal" + + return CheckResult( + name="System Load", + category="performance", + score=max(0, score), + status=status, + details=details, + recommendation=rec, + weight=0.20 # 20% + ) \ No newline at end of file diff --git a/cortex/health/checks/security.py b/cortex/health/checks/security.py new file mode 100644 index 0000000..c731319 --- /dev/null +++ b/cortex/health/checks/security.py @@ -0,0 +1,66 @@ +import subprocess +import os +from ..monitor import HealthCheck, CheckResult + +class SecurityCheck(HealthCheck): + def run(self) -> CheckResult: + score = 100 + issues = [] + recommendations = [] + + # 1. Firewall (UFW) Check + ufw_active = False + try: + # Add timeout to prevent hanging (Fixes Reliability Issue) + res = subprocess.run( + ["systemctl", "is-active", "ufw"], + capture_output=True, + text=True, + timeout=5 + ) + # Fix: Use exact match to avoid matching "inactive" which contains "active" + if res.returncode == 0 and res.stdout.strip() == "active": + ufw_active = True + except subprocess.TimeoutExpired: + pass # Command timed out, treat as inactive or unavailable + except FileNotFoundError: + pass # Environment without systemctl (e.g., Docker or non-systemd) + except Exception: + pass # Generic error protection + + if not ufw_active: + score = 0 # Spec: 0 points if Firewall is inactive + issues.append("Firewall Inactive") + recommendations.append("Enable UFW Firewall") + + # 2. SSH Root Login Check + try: + ssh_config = "/etc/ssh/sshd_config" + if os.path.exists(ssh_config): + with open(ssh_config, 'r') as f: + for line in f: + line = line.strip() + # Check for uncommented PermitRootLogin yes + if line.startswith("PermitRootLogin") and "yes" in line.split(): + score -= 50 + issues.append("Root SSH Allowed") + recommendations.append("Disable SSH Root Login in sshd_config") + break + except PermissionError: + pass # Cannot read config, skip check + except Exception: + pass # Generic error protection + + status = "OK" + if score < 50: status = "CRITICAL" + elif score < 100: status = "WARNING" + + return CheckResult( + name="Security Posture", + category="security", + score=max(0, score), + status=status, + details=", ".join(issues) if issues else "Secure", + recommendation=", ".join(recommendations) if recommendations else None, + weight=0.35 + ) \ No newline at end of file diff --git a/cortex/health/checks/updates.py b/cortex/health/checks/updates.py new file mode 100644 index 0000000..a38a464 --- /dev/null +++ b/cortex/health/checks/updates.py @@ -0,0 +1,68 @@ +import subprocess +from ..monitor import HealthCheck, CheckResult + +class UpdateCheck(HealthCheck): + """Check for pending system updates and security patches.""" + + def run(self) -> CheckResult: + """ + Check for available updates using apt. + + Returns: + CheckResult with score based on pending updates. + """ + score = 100 + pkg_count = 0 + sec_count = 0 + + try: + # Add timeout to prevent hangs + res = subprocess.run( + ["apt", "list", "--upgradable"], + capture_output=True, + text=True, + timeout=30 + ) + lines = res.stdout.splitlines() + + # apt list output header usually takes first line + for line in lines[1:]: + if line.strip(): + if "security" in line.lower(): + sec_count += 1 + else: + pkg_count += 1 + + # Scoring + score -= (pkg_count * 2) + score -= (sec_count * 10) + + except Exception as e: + # CodeRabbit Suggestion: Return failure state instead of ignoring errors + return CheckResult( + name="System Updates", + category="updates", + score=0, + status="CRITICAL", + details=f"Check failed: {e}", + recommendation="Verify package manager configuration", + weight=0.25 + ) + + status = "OK" + if score < 50: status = "CRITICAL" + elif score < 90: status = "WARNING" + + details = f"{pkg_count} packages, {sec_count} security updates pending" + if pkg_count == 0 and sec_count == 0: + details = "System up to date" + + return CheckResult( + name="System Updates", + category="updates", + score=max(0, score), + status=status, + details=details, + recommendation="Run 'apt upgrade'" if score < 100 else None, + weight=0.25 + ) \ No newline at end of file diff --git a/cortex/health/monitor.py b/cortex/health/monitor.py new file mode 100644 index 0000000..7ba95d0 --- /dev/null +++ b/cortex/health/monitor.py @@ -0,0 +1,131 @@ +import json +import time +from abc import ABC, abstractmethod +from dataclasses import dataclass +from pathlib import Path +from typing import List, Dict, Optional +from rich.console import Console + +console = Console() + +@dataclass +class CheckResult: + """Data class to hold the result of each check.""" + name: str # Item name (e.g. "Disk Space") + category: str # Category (security, updates, performance, disk) + score: int # Score 0-100 + status: str # "OK", "WARNING", "CRITICAL" + details: str # Detailed message + recommendation: Optional[str] = None # Recommended action (if any) + weight: float = 1.0 # Weight for weighted average + +class HealthCheck(ABC): + """Base class inherited by all health check modules.""" + + @abstractmethod + def run(self) -> CheckResult: + """Execute the check and return a result.""" + pass + +class HealthMonitor: + """ + Main engine for system health monitoring. + + Manages registration of health checks, execution, score aggregation, + and history persistence. + """ + def __init__(self): + """Initialize the health monitor and register default checks.""" + self.history_file = Path.home() / ".cortex" / "health_history.json" + self.history_file.parent.mkdir(exist_ok=True) + self.checks: List[HealthCheck] = [] + + # Register each check here + # (Import here to prevent circular references) + from .checks.security import SecurityCheck + from .checks.updates import UpdateCheck + from .checks.performance import PerformanceCheck + from .checks.disk import DiskCheck + + self.register_check(SecurityCheck()) + self.register_check(UpdateCheck()) + self.register_check(PerformanceCheck()) + self.register_check(DiskCheck()) + + def register_check(self, check: HealthCheck) -> None: + """ + Register a health check instance to be run as part of the monitor. + + Args: + check (HealthCheck): The check instance to register. + """ + self.checks.append(check) + + def run_all(self) -> Dict: + """ + Run all registered checks and return an aggregated health report. + + Returns: + Dict: A report containing the timestamp, total weighted score, + and a list of individual check results. + """ + results = [] + total_weighted_score = 0 + total_weight = 0 + + for check in self.checks: + try: + result = check.run() + results.append(result) + total_weighted_score += result.score * result.weight + total_weight += result.weight + except Exception as e: + console.print(f"[red]Error running check {check.__class__.__name__}: {e}[/red]") + + final_score = 0 + if total_weight > 0: + final_score = int(total_weighted_score / total_weight) + + report = { + "timestamp": time.strftime("%Y-%m-%dT%H:%M:%S"), + "total_score": final_score, + "results": [ + { + "name": r.name, + "category": r.category, + "score": r.score, + "status": r.status, + "details": r.details, + "recommendation": r.recommendation + } + for r in results + ] + } + + self._save_history(report) + return report + + def _save_history(self, report: Dict) -> None: + """ + Save the current health report to the history JSON file. + + Args: + report (Dict): The health report to save. + """ + history = [] + if self.history_file.exists(): + try: + with open(self.history_file, 'r') as f: + history = json.load(f) + except json.JSONDecodeError: + pass + + history.append(report) + # Keep only the last 100 records + history = history[-100:] + + try: + with open(self.history_file, 'w') as f: + json.dump(history, f, indent=4) + except Exception as e: + console.print(f"[yellow]Warning: Could not save health history: {e}[/yellow]") \ No newline at end of file diff --git a/scripts/verify_ubuntu_compatibility.py b/scripts/verify_ubuntu_compatibility.py new file mode 100644 index 0000000..1d1beac --- /dev/null +++ b/scripts/verify_ubuntu_compatibility.py @@ -0,0 +1,237 @@ +import subprocess +import os +import sys +import json +import datetime +import shutil +import pathlib + +# Use absolute path for history file +HISTORY_FILE = pathlib.Path.home() / ".cortex" / "security_history.json" + +def load_history(): + """Load past execution history""" + if HISTORY_FILE.exists(): + try: + with open(HISTORY_FILE, 'r') as f: + return json.load(f) + except json.JSONDecodeError: + return [] + return [] + +def save_history(score, status, details): + """Save execution result to history""" + HISTORY_FILE.parent.mkdir(parents=True, exist_ok=True) + + history = load_history() + record = { + "timestamp": datetime.datetime.now().isoformat(), + "score": score, + "status": status, + "details": details + } + history.append(record) + history = history[-10:] + + with open(HISTORY_FILE, 'w') as f: + json.dump(history, f, indent=4) + + return history + +def show_trend(history): + """Show historical trend (Trend Tracking)""" + print("\n=== šŸ“Š Historical Trend Analysis ===") + if not history: + print(" No historical data available yet.") + return + + scores = [h["score"] for h in history] + avg_score = sum(scores) / len(scores) + last_score = scores[-1] + + print(f" History Count: {len(history)} runs") + print(f" Average Score: {avg_score:.1f}") + print(f" Last Run Score: {last_score}") + + if len(scores) > 1: + prev_score = scores[-2] + diff = last_score - prev_score + if diff > 0: + print(f" Trend: šŸ“ˆ Improved by {diff} points since previous run") + elif diff < 0: + print(f" Trend: šŸ“‰ Dropped by {abs(diff)} points since previous run") + else: + print(" Trend: āž”ļø Stable") + +def fix_firewall(): + """Enable Firewall (Automated Fix)""" + print("\n [Fixing] Enabling UFW Firewall...") + + if not shutil.which("ufw") and not os.path.exists("/usr/sbin/ufw"): + print(" -> āš ļø UFW is not installed. Cannot enable.") + return False + + try: + subprocess.run(["sudo", "ufw", "enable"], check=True, timeout=30) + print(" -> āœ… Success: Firewall enabled.") + return True + except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as e: + print(f" -> āŒ Failed to enable firewall: {e}") + return False + +def fix_ssh_config(config_path): + """Disable SSH Root Login (Automated Fix)""" + print(f"\n [Fixing] Disabling Root Login in {config_path}...") + + if not os.path.exists(config_path): + print(f" -> āš ļø Config file not found: {config_path}") + return False + + backup_path = config_path + ".bak." + datetime.datetime.now().strftime("%Y%m%d%H%M%S") + try: + shutil.copy2(config_path, backup_path) + print(f" -> Backup created at: {backup_path}") + except PermissionError: + print(" -> āŒ Failed to create backup (Permission denied). Need sudo?") + return False + + try: + new_lines = [] + with open(config_path, 'r') as f: + lines = f.readlines() + + fixed = False + for line in lines: + if line.strip().startswith("PermitRootLogin") and "yes" in line: + new_lines.append(f"# {line.strip()} (Disabled by Auto-Fix)\n") + new_lines.append("PermitRootLogin no\n") + fixed = True + else: + new_lines.append(line) + + if fixed: + with open(config_path, 'w') as f: + f.writelines(new_lines) + print(" -> āœ… Success: sshd_config updated.") + + print(" -> Restarting sshd service...") + res = subprocess.run( + ["sudo", "systemctl", "restart", "ssh"], + capture_output=True, text=True, timeout=30 + ) + if res.returncode != 0: + print(f" -> āš ļø SSH restart failed: {res.stderr}") + return True + return True + else: + print(" -> No changes needed.") + return True + + except Exception as e: + print(f" -> āŒ Error during fix: {e}") + return False + +def _check_firewall_status(): + """Helper to check firewall status.""" + print("\n[1] Checking Firewall (UFW)...") + try: + print(" Running: systemctl is-active ufw") + res = subprocess.run( + ["systemctl", "is-active", "ufw"], + capture_output=True, text=True, timeout=10 + ) + output = res.stdout.strip() + print(f" Output: '{output}'") + + if res.returncode == 0 and output == "active": + print(" -> JUDGEMENT: Firewall is ACTIVE (Score: 100)") + return True + else: + print(" -> JUDGEMENT: Firewall is INACTIVE (Score: 0)") + return False + + except FileNotFoundError: + print(" -> ERROR: 'systemctl' command not found.") + except Exception as e: + print(f" -> ERROR: {e}") + return False + +def _check_ssh_status(ssh_config): + """Helper to check SSH status.""" + print("\n[2] Checking SSH Configuration...") + score_penalty = 0 + needs_fix = False + + if os.path.exists(ssh_config): + print(f" File found: {ssh_config}") + try: + with open(ssh_config, 'r') as f: + for line in f: + parts = line.split() + if len(parts) >= 2 and parts[0] == "PermitRootLogin" and parts[1] == "yes": + print(f" -> FOUND RISKY LINE: {line.strip()}") + score_penalty = 50 + needs_fix = True + break + + if not needs_fix: + print(" -> No 'PermitRootLogin yes' found (Safe)") + + except PermissionError: + print(" -> ERROR: Permission denied. Try running with 'sudo'.") + else: + print(f" -> WARNING: {ssh_config} does not exist.") + + return score_penalty, needs_fix + +def verify_security_logic(): + print("=== Ubuntu Security Logic Verification ===") + + ufw_active = _check_firewall_status() + ssh_config = "/etc/ssh/sshd_config" + ssh_penalty, ssh_needs_fix = _check_ssh_status(ssh_config) + + # Final Report + print("\n=== Summary ===") + final_score = 100 + if not ufw_active: + final_score = 0 + final_score -= ssh_penalty + final_score = max(0, final_score) + + status = "OK" + if final_score < 50: status = "CRITICAL" + elif final_score < 100: status = "WARNING" + + print(f"Current Score: {final_score}") + print(f"Status: {status}") + + # History + print("\n... Saving history ...") + details = [] + ufw_needs_fix = not ufw_active + if ufw_needs_fix: details.append("Firewall Inactive") + if ssh_needs_fix: details.append("Root SSH Allowed") + + history = save_history(final_score, status, ", ".join(details)) + show_trend(history) + + # Automated Fixes + if ufw_needs_fix or ssh_needs_fix: + print("\n=== šŸ› ļø Automated Fixes Available ===") + print("Issues detected that can be automatically fixed.") + user_input = input("Do you want to apply fixes now? (y/n): ").strip().lower() + + if user_input == 'y': + if ufw_needs_fix: + fix_firewall() + if ssh_needs_fix: + fix_ssh_config(ssh_config) + print("\nāœ… Fixes attempt complete. Please re-run script to verify.") + else: + print("Skipping fixes.") + +if __name__ == "__main__": + if os.geteuid() != 0: + print("NOTE: This script works best with 'sudo' for fixing issues.") + verify_security_logic() \ No newline at end of file diff --git a/tests/test_health_monitor.py b/tests/test_health_monitor.py new file mode 100644 index 0000000..d352f0d --- /dev/null +++ b/tests/test_health_monitor.py @@ -0,0 +1,137 @@ +import unittest +from unittest.mock import patch, MagicMock, mock_open +from cortex.health.monitor import HealthMonitor, CheckResult +from cortex.health.checks.disk import DiskCheck +from cortex.health.checks.performance import PerformanceCheck +from cortex.health.checks.security import SecurityCheck +from cortex.health.checks.updates import UpdateCheck + +class TestDiskCheck(unittest.TestCase): + @patch('shutil.disk_usage') + def test_disk_usage_scoring(self, mock_usage): + # Case 1: Healthy (50% used) -> 100 pts + # total=100, used=50, free=50 + mock_usage.return_value = (100, 50, 50) + check = DiskCheck() + result = check.run() + self.assertEqual(result.score, 100) + self.assertEqual(result.status, "OK") + + # Case 2: Warning (85% used) -> 50 pts + mock_usage.return_value = (100, 85, 15) + result = check.run() + self.assertEqual(result.score, 50) + self.assertEqual(result.status, "WARNING") + + # Case 3: Critical (95% used) -> 0 pts + mock_usage.return_value = (100, 95, 5) + result = check.run() + self.assertEqual(result.score, 0) + self.assertEqual(result.status, "CRITICAL") + +class TestPerformanceCheck(unittest.TestCase): + @patch('os.getloadavg') + @patch('multiprocessing.cpu_count') + def test_load_average(self, mock_cpu, mock_load): + # Case 1: Load OK (Load 2.0 / 4 Cores = 0.5 ratio) + mock_cpu.return_value = 4 + mock_load.return_value = (2.0, 2.0, 2.0) + + # Mock reading /proc/meminfo (Normal case) + mem_data = "MemTotal: 1000 kB\nMemAvailable: 500 kB\n" + with patch('builtins.open', mock_open(read_data=mem_data)): + check = PerformanceCheck() + result = check.run() + self.assertEqual(result.score, 100) # No penalty + + @patch('os.getloadavg') + @patch('multiprocessing.cpu_count') + def test_high_load_penalty(self, mock_cpu, mock_load): + # Case 2: High Load (Load 5.0 / 4 Cores = 1.25 ratio) -> -50 pts + mock_cpu.return_value = 4 + mock_load.return_value = (5.0, 5.0, 5.0) + + # Assume memory is normal + mem_data = "MemTotal: 1000 kB\nMemAvailable: 500 kB\n" + with patch('builtins.open', mock_open(read_data=mem_data)): + check = PerformanceCheck() + result = check.run() + self.assertEqual(result.score, 50) # 100 - 50 = 50 + +class TestSecurityCheck(unittest.TestCase): + @patch('subprocess.run') + def test_ufw_status(self, mock_run): + # Case 1: UFW Inactive -> 0 pts + mock_run.return_value.stdout = "inactive" + mock_run.return_value.returncode = 0 + + check = SecurityCheck() + result = check.run() + self.assertEqual(result.score, 0) + self.assertIn("Firewall Inactive", result.details) + + @patch('subprocess.run') + def test_ufw_active(self, mock_run): + # Case 2: UFW Active -> 100 pts (SSH config is safe by default mock) + mock_run.return_value.stdout = "active" + mock_run.return_value.returncode = 0 + + # Test error handling when sshd_config does not exist + with patch('os.path.exists', return_value=False): + check = SecurityCheck() + result = check.run() + self.assertEqual(result.score, 100) + +class TestUpdateCheck(unittest.TestCase): + @patch('subprocess.run') + def test_apt_updates(self, mock_run): + # Mock output for apt list --upgradable + # Ignore first line, packages start from 2nd line + apt_output = """Listing... Done +package1/stable 1.0.0 amd64 [upgradable from: 0.9.9] +package2/stable 2.0.0 amd64 [upgradable from: 1.9.9] +security-pkg/stable 1.0.1 amd64 [upgradable from: 1.0.0] - Security Update +""" + mock_run.return_value.stdout = apt_output + mock_run.return_value.returncode = 0 + + check = UpdateCheck() + result = check.run() + + # Calculation: + # Total packages: 3 + # Security packages: 1 (line containing "security") + # Penalty: (3 * 2) + (1 * 10) = 6 + 10 = 16 pts + # Expected score: 100 - 16 = 84 pts + + self.assertEqual(result.score, 84) + self.assertIn("3 pending", result.details) + +class TestHealthMonitor(unittest.TestCase): + def test_monitor_aggregation(self): + monitor = HealthMonitor() + # Register mock checks instead of real check classes + + mock_check1 = MagicMock() + mock_check1.run.return_value = CheckResult( + name="Check1", category="test", score=100, status="OK", details="", weight=0.5 + ) + + mock_check2 = MagicMock() + mock_check2.run.return_value = CheckResult( + name="Check2", category="test", score=0, status="CRITICAL", details="", weight=0.5 + ) + + monitor.checks = [mock_check1, mock_check2] + + # Mock history saving to prevent file write + with patch.object(monitor, '_save_history'): + report = monitor.run_all() + + # Weighted average calculation: + # (100 * 0.5) + (0 * 0.5) = 50 / (0.5 + 0.5) = 50 pts + self.assertEqual(report['total_score'], 50) + self.assertEqual(len(report['results']), 2) + +if __name__ == '__main__': + unittest.main() \ No newline at end of file