| |
| """ |
| Validate control system invariants against simulation results. |
| |
| Three invariants: |
| 1. validate-budget: cumulative sacrifice never exceeds max_energy_reduction_pct |
| 2. validate-gates: no interventions in no-shade windows (May, mornings, low stress) |
| 3. validate-dose: average intervention offset < 10°, top canopy > 70% sunlit |
| |
| Usage |
| ----- |
| # Validate existing simulation log |
| python scripts/validate_control_system.py --log Data/simulation_logs/sim_2025-07-01_2025-07-07.json |
| |
| # Run a fresh simulation and validate |
| python scripts/validate_control_system.py --start 2025-07-01 --end 2025-07-07 |
| |
| # Verbose output |
| python scripts/validate_control_system.py --log <path> -v |
| """ |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import json |
| import logging |
| import sys |
| from datetime import date, datetime |
| from pathlib import Path |
| from typing import Optional |
|
|
| sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) |
|
|
| from config.settings import ( |
| MAX_ENERGY_REDUCTION_PCT, |
| NO_SHADE_BEFORE_HOUR, |
| NO_SHADE_MONTHS, |
| SEMILLON_TRANSITION_TEMP_C, |
| SHADE_ELIGIBLE_GHI_ABOVE, |
| ) |
|
|
| logger = logging.getLogger("validate") |
|
|
|
|
| |
| |
| |
|
|
| class ValidationResult: |
| """Tracks pass/fail for a single invariant.""" |
|
|
| def __init__(self, name: str, description: str): |
| self.name = name |
| self.description = description |
| self.passed = True |
| self.violations: list[str] = [] |
| self.stats: dict = {} |
|
|
| def fail(self, message: str) -> None: |
| self.passed = False |
| self.violations.append(message) |
|
|
| def report(self) -> str: |
| status = "PASS" if self.passed else "FAIL" |
| lines = [f"[{status}] {self.name}: {self.description}"] |
| for k, v in self.stats.items(): |
| lines.append(f" {k}: {v}") |
| if self.violations: |
| lines.append(f" Violations ({len(self.violations)}):") |
| for v in self.violations[:20]: |
| lines.append(f" - {v}") |
| if len(self.violations) > 20: |
| lines.append(f" ... and {len(self.violations) - 20} more") |
| return "\n".join(lines) |
|
|
|
|
| |
| |
| |
|
|
| def validate_budget(results: list[dict]) -> ValidationResult: |
| """Verify cumulative energy sacrifice never exceeds the annual ceiling.""" |
| vr = ValidationResult( |
| "validate-budget", |
| f"Cumulative sacrifice <= {MAX_ENERGY_REDUCTION_PCT}% of annual potential", |
| ) |
|
|
| |
| from src.energy_budget import EnergyBudgetPlanner |
| planner = EnergyBudgetPlanner() |
|
|
| |
| years = set() |
| for r in results: |
| ts = r.get("timestamp", "") |
| if isinstance(ts, str) and len(ts) >= 4: |
| try: |
| years.add(int(ts[:4])) |
| except ValueError: |
| pass |
| if not years: |
| vr.fail("No valid timestamps in results") |
| return vr |
|
|
| for year in sorted(years): |
| annual = planner.compute_annual_plan(year) |
| total_potential = annual["total_potential_kWh"] |
| total_budget = annual["total_budget_kWh"] |
|
|
| |
| year_results = [ |
| r for r in results |
| if r.get("timestamp", "").startswith(str(year)) |
| ] |
| cumulative = 0.0 |
| peak_cumulative = 0.0 |
| peak_slot = "" |
|
|
| for r in year_results: |
| cost = r.get("energy_cost_kwh", 0) or 0 |
| |
| if r.get("live_override"): |
| continue |
| offset = r.get("plan_offset_deg", 0) or 0 |
| if offset > 0: |
| cumulative += cost |
| if cumulative > peak_cumulative: |
| peak_cumulative = cumulative |
| peak_slot = r.get("timestamp", "?") |
|
|
| pct_used = (cumulative / total_potential * 100) if total_potential > 0 else 0 |
|
|
| vr.stats[f"{year}_potential_kWh"] = f"{total_potential:.2f}" |
| vr.stats[f"{year}_budget_kWh"] = f"{total_budget:.2f}" |
| vr.stats[f"{year}_spent_kWh"] = f"{cumulative:.4f}" |
| vr.stats[f"{year}_pct_used"] = f"{pct_used:.3f}%" |
| vr.stats[f"{year}_peak_at"] = peak_slot |
|
|
| if cumulative > total_budget: |
| vr.fail( |
| f"{year}: spent {cumulative:.4f} kWh > budget {total_budget:.2f} kWh " |
| f"({pct_used:.2f}% > {MAX_ENERGY_REDUCTION_PCT}%)" |
| ) |
|
|
| return vr |
|
|
|
|
| |
| |
| |
|
|
| def validate_gates(results: list[dict]) -> ValidationResult: |
| """Verify no interventions occur in prohibited windows.""" |
| vr = ValidationResult( |
| "validate-gates", |
| "No interventions in May, before 10:00, or when temp < transition / GHI < threshold", |
| ) |
|
|
| total_interventions = 0 |
| may_violations = 0 |
| morning_violations = 0 |
| temp_violations = 0 |
| ghi_violations = 0 |
|
|
| for r in results: |
| offset = r.get("plan_offset_deg", 0) or 0 |
| if offset <= 0: |
| continue |
| |
| if r.get("live_override"): |
| continue |
|
|
| total_interventions += 1 |
| ts = r.get("timestamp", "") |
|
|
| |
| try: |
| if isinstance(ts, str): |
| dt = datetime.fromisoformat(ts) |
| else: |
| dt = ts |
| except (ValueError, TypeError): |
| continue |
|
|
| month = dt.month |
| hour = dt.hour |
|
|
| |
| if month in NO_SHADE_MONTHS: |
| may_violations += 1 |
| vr.fail(f"May intervention at {ts}: offset={offset}°") |
|
|
| |
| if hour < NO_SHADE_BEFORE_HOUR: |
| morning_violations += 1 |
| vr.fail(f"Morning intervention at {ts} (hour={hour}): offset={offset}°") |
|
|
| |
| temp = r.get("air_temp_c") |
| if temp is not None and temp < SEMILLON_TRANSITION_TEMP_C: |
| temp_violations += 1 |
| vr.fail( |
| f"Low-temp intervention at {ts}: temp={temp:.1f}°C < " |
| f"{SEMILLON_TRANSITION_TEMP_C}°C, offset={offset}°" |
| ) |
|
|
| |
| ghi = r.get("ghi_w_m2") |
| if ghi is not None and ghi < SHADE_ELIGIBLE_GHI_ABOVE: |
| ghi_violations += 1 |
| vr.fail( |
| f"Low-GHI intervention at {ts}: GHI={ghi:.0f} < " |
| f"{SHADE_ELIGIBLE_GHI_ABOVE}, offset={offset}°" |
| ) |
|
|
| vr.stats["total_interventions"] = total_interventions |
| vr.stats["may_violations"] = may_violations |
| vr.stats["morning_violations"] = morning_violations |
| vr.stats["temp_violations"] = temp_violations |
| vr.stats["ghi_violations"] = ghi_violations |
|
|
| return vr |
|
|
|
|
| |
| |
| |
|
|
| def validate_dose(results: list[dict]) -> ValidationResult: |
| """Verify intervention offsets are minimal (average < 10°).""" |
| vr = ValidationResult( |
| "validate-dose", |
| "Average intervention offset < 10°; interventions are minimum-dose", |
| ) |
|
|
| offsets = [] |
| for r in results: |
| offset = r.get("plan_offset_deg", 0) or 0 |
| if offset > 0 and not r.get("live_override"): |
| offsets.append(offset) |
|
|
| if not offsets: |
| vr.stats["intervention_count"] = 0 |
| vr.stats["note"] = "No interventions to validate" |
| return vr |
|
|
| avg_offset = sum(offsets) / len(offsets) |
| max_offset = max(offsets) |
| total_slots = len(results) |
| intervention_rate = len(offsets) / total_slots * 100 if total_slots > 0 else 0 |
|
|
| vr.stats["intervention_count"] = len(offsets) |
| vr.stats["intervention_rate"] = f"{intervention_rate:.1f}%" |
| vr.stats["avg_offset_deg"] = f"{avg_offset:.1f}°" |
| vr.stats["max_offset_deg"] = f"{max_offset:.0f}°" |
| vr.stats["median_offset_deg"] = f"{sorted(offsets)[len(offsets)//2]:.0f}°" |
|
|
| |
| brackets = [(0, 5), (5, 10), (10, 15), (15, 20), (20, 60)] |
| for lo, hi in brackets: |
| count = sum(1 for o in offsets if lo < o <= hi) |
| vr.stats[f"offsets_{lo}-{hi}deg"] = count |
|
|
| if avg_offset >= 10.0: |
| vr.fail( |
| f"Average offset {avg_offset:.1f}° >= 10° limit " |
| f"(should be minimum-dose interventions)" |
| ) |
|
|
| if max_offset > 20: |
| vr.fail( |
| f"Max offset {max_offset:.0f}° > 20° " |
| f"(unusually large for minimum-dose strategy)" |
| ) |
|
|
| |
| if intervention_rate > 30: |
| vr.fail( |
| f"Intervention rate {intervention_rate:.1f}% > 30% " |
| f"(interventions should be rare, not the norm)" |
| ) |
|
|
| return vr |
|
|
|
|
| |
| |
| |
|
|
| def load_results(log_path: Path) -> list[dict]: |
| """Load simulation results from JSON.""" |
| with open(log_path) as f: |
| return json.load(f) |
|
|
|
|
| def run_simulation(start: date, end: date) -> list[dict]: |
| """Run a fresh simulation and return results.""" |
| from scripts.run_control_simulation import ControlSimulation |
| sim = ControlSimulation(start, end) |
| return sim.run() |
|
|
|
|
| def main(): |
| parser = argparse.ArgumentParser( |
| description="Validate control system invariants." |
| ) |
| parser.add_argument( |
| "--log", type=str, default=None, |
| help="Path to existing simulation log JSON", |
| ) |
| parser.add_argument( |
| "--start", type=str, default="2025-07-01", |
| help="Start date for fresh simulation (if no --log)", |
| ) |
| parser.add_argument( |
| "--end", type=str, default="2025-07-07", |
| help="End date for fresh simulation (if no --log)", |
| ) |
| parser.add_argument( |
| "--verbose", "-v", action="store_true", |
| help="Enable debug logging", |
| ) |
| args = parser.parse_args() |
|
|
| level = logging.DEBUG if args.verbose else logging.INFO |
| logging.basicConfig( |
| level=level, |
| format="%(asctime)s %(name)-15s %(levelname)-7s %(message)s", |
| datefmt="%H:%M:%S", |
| ) |
|
|
| |
| if args.log: |
| log_path = Path(args.log) |
| if not log_path.exists(): |
| print(f"Error: log file not found: {log_path}") |
| sys.exit(1) |
| print(f"Loading results from: {log_path}") |
| results = load_results(log_path) |
| else: |
| start = date.fromisoformat(args.start) |
| end = date.fromisoformat(args.end) |
| print(f"Running simulation: {start} → {end}") |
| results = run_simulation(start, end) |
|
|
| print(f"\nValidating {len(results)} tick results...\n") |
|
|
| |
| validations = [ |
| validate_budget(results), |
| validate_gates(results), |
| validate_dose(results), |
| ] |
|
|
| all_passed = True |
| for v in validations: |
| print(v.report()) |
| print() |
| if not v.passed: |
| all_passed = False |
|
|
| |
| n_pass = sum(1 for v in validations if v.passed) |
| n_fail = sum(1 for v in validations if not v.passed) |
| print(f"{'='*60}") |
| print(f"Results: {n_pass} passed, {n_fail} failed out of {len(validations)}") |
| print(f"{'='*60}") |
|
|
| sys.exit(0 if all_passed else 1) |
|
|
|
|
| def selftest(): |
| """Run validators against synthetic data with deliberate violations.""" |
| print("Running self-test with synthetic violations...\n") |
|
|
| |
| may_tick = { |
| "timestamp": "2025-05-15T12:00:00+00:00", |
| "slot_index": 48, |
| "plan_offset_deg": 10.0, |
| "live_override": False, |
| "air_temp_c": 35.0, |
| "ghi_w_m2": 800.0, |
| "energy_cost_kwh": 0.5, |
| } |
| |
| morning_tick = { |
| "timestamp": "2025-07-10T07:00:00+00:00", |
| "slot_index": 28, |
| "plan_offset_deg": 5.0, |
| "live_override": False, |
| "air_temp_c": 32.0, |
| "ghi_w_m2": 500.0, |
| "energy_cost_kwh": 0.1, |
| } |
| |
| cold_tick = { |
| "timestamp": "2025-07-10T13:00:00+00:00", |
| "slot_index": 52, |
| "plan_offset_deg": 8.0, |
| "live_override": False, |
| "air_temp_c": 22.0, |
| "ghi_w_m2": 700.0, |
| "energy_cost_kwh": 0.2, |
| } |
| |
| big_tick = { |
| "timestamp": "2025-07-10T14:00:00+00:00", |
| "slot_index": 56, |
| "plan_offset_deg": 25.0, |
| "live_override": False, |
| "air_temp_c": 38.0, |
| "ghi_w_m2": 900.0, |
| "energy_cost_kwh": 1.0, |
| } |
| |
| normal_tick = { |
| "timestamp": "2025-07-10T11:00:00+00:00", |
| "slot_index": 44, |
| "plan_offset_deg": 0.0, |
| "live_override": False, |
| "air_temp_c": 33.0, |
| "ghi_w_m2": 800.0, |
| "energy_cost_kwh": 0.0, |
| } |
|
|
| test_results = [may_tick, morning_tick, cold_tick, big_tick, normal_tick] |
|
|
| v_gates = validate_gates(test_results) |
| assert not v_gates.passed, "validate-gates should FAIL on synthetic data" |
| assert v_gates.stats["may_violations"] == 1 |
| assert v_gates.stats["morning_violations"] == 1 |
| assert v_gates.stats["temp_violations"] == 1 |
| print(f" validate-gates: correctly caught {len(v_gates.violations)} violations") |
|
|
| v_dose = validate_dose(test_results) |
| assert not v_dose.passed, "validate-dose should FAIL (avg > 10° and max > 20°)" |
| print(f" validate-dose: correctly caught {len(v_dose.violations)} violations") |
|
|
| v_budget = validate_budget(test_results) |
| assert v_budget.passed, "validate-budget should PASS (small amounts)" |
| print(f" validate-budget: correctly passed (small spend)") |
|
|
| print("\nSelf-test PASSED: all validators correctly detect violations.\n") |
|
|
|
|
| if __name__ == "__main__": |
| if "--selftest" in sys.argv: |
| selftest() |
| else: |
| main() |
|
|