Source code for simulation.engine

"""Core simulation engine for the electric port simulator."""

import time
from datetime import datetime, timedelta, timezone
from typing import Dict, List, Optional, Union

from config import Settings, SimulationMode
from database import DatabaseManager
from forecasting import PortForecaster
from models import BoatState, ChargerState, Port, Trip
from optimization import BaseOptimizer
from simulation.trip_manager import TripManager
from weather import OpenMeteoClient


[docs] class SimulationEngine: """Main simulation engine for the electric port.""" def __init__( self, port: Port, settings: Settings, db_manager: DatabaseManager, start_date: Optional[Union[datetime, str]] = None, days: int = 1, trips_directory: str = "assets/trips", ): """ Initialize the simulation engine. Args: port: Port instance (contains boats and chargers) settings: Simulation settings db_manager: Database manager start_date: Simulation start date as datetime or string "YYYY-MM-DD" (default: today at midnight UTC) days: Number of days to simulate (max 7) trips_directory: Directory containing trip CSV files """ self.port = port self.settings = settings self.db_manager = db_manager # Set start date to today at midnight UTC if not provided if start_date is None: now = datetime.now(timezone.utc) self.start_date = datetime(now.year, now.month, now.day, 0, 0, 0) elif isinstance(start_date, str): # Handle string dates (e.g., "2025-09-01") self.start_date = datetime.strptime(start_date, "%Y-%m-%d") else: self.start_date = start_date # Cap days at 7 self.days = min(days, 7) # Calculate total simulation time in seconds self.total_duration = self.days * 24 * 3600 # Current simulation datetime self.current_datetime = self.start_date # Track boat-charger assignments self.boat_charger_map = {} # {boat_name: charger_name} # Trip schedule from settings: (start_hour, slot_number) self.trip_schedule = self.settings.trip_schedule # Initialize trip manager print(f"\nLoading trips from {trips_directory}...") self.trip_manager = TripManager(trips_directory) # Track active trips: {boat_name: (trip, start_datetime, elapsed_seconds)} self.active_trips: Dict[str, tuple[Trip, datetime, float]] = {} # Track delayed trips (boats waiting for sufficient SOC): {boat_name: trip} self.delayed_trips: Dict[str, Trip] = {} # Track last date we assigned trips (to assign at midnight) self.last_assignment_date: Optional[str] = None # Track last date we fetched weather forecast self.last_weather_fetch_date: Optional[str] = None # Track last date we generated energy forecast self.last_energy_forecast_date: Optional[str] = None # Initialize forecaster self.forecaster = PortForecaster( port, db_manager, settings.timestep, settings.trip_schedule ) # Initialize optimizer (if enabled) self.optimizer = None self.use_optimizer = settings.use_optimizer # Pre-assign boats to chargers 1:1 (required when optimizer is enabled) self.boat_charger_assignments = {} # {boat_name: charger_index} if self.use_optimizer: num_boats = len(self.port.boats) num_chargers = len(self.port.chargers) if num_boats != num_chargers: raise ValueError( f"Optimizer requires n_boats == n_chargers, " f"got {num_boats} boats and {num_chargers} chargers" ) # Fixed 1:1 mapping: boat i -> charger i for i, boat in enumerate(self.port.boats): self.boat_charger_assignments[boat.name] = i self.optimizer = BaseOptimizer( port, db_manager, settings.timestep, self.boat_charger_assignments, ) print( f" Pre-assignments: {', '.join(f'{b} β†’ {self.port.chargers[c].name}' for b, c in self.boat_charger_assignments.items())}" ) # Store latest forecasts for optimizer self.latest_energy_forecasts = [] # Track boats with energy shortfalls (for priority charging) self.boats_with_shortfalls = set() # {boat_name} # Initialize weather client and fetch forecast if PV systems present self.weather_client = None self.weather_forecast = {} # {timestamp_str: {metric: value}} self.forecast_loaded = False if self.port.pv_systems: print( f"\nInitializing weather data for {len(self.port.pv_systems)} PV system(s)..." ) self.weather_client = OpenMeteoClient(self.port.lat, self.port.lon) self._load_weather_forecast() # Mark initial fetch date self.last_weather_fetch_date = self.start_date.strftime("%Y-%m-%d")
[docs] def run(self): """Run the simulation based on the configured mode.""" print("\n" + "=" * 60) print("Starting Simulation") print("=" * 60) print(f"Port: {self.port.name}") print(f"Start: {self.start_date.strftime('%Y-%m-%d %H:%M:%S')} UTC") print(f"Duration: {self.days} day(s)") print(f"Timestep: {self.settings.timestep}s") print(f"Mode: {self.settings.mode.value}") print(f"Boats: {len(self.port.boats)}") print(f"Chargers: {len(self.port.chargers)}") print("=" * 60 + "\n") if self.settings.mode == SimulationMode.BATCH: self._run_batch() else: self._run_realtime()
def _run_batch(self): """Run simulation in batch mode (all timesteps at once).""" timestep_count = int(self.total_duration / self.settings.timestep) print(f"Running {timestep_count} timesteps...\n") for step in range(timestep_count): self._simulate_timestep() # Progress indicator every 100 steps or at midnight if step % 100 == 0 or self.current_datetime.hour == 0: print( f"[{step}/{timestep_count}] {self.current_datetime.strftime('%Y-%m-%d %H:%M:%S')} UTC" ) # Advance time self.current_datetime += timedelta(seconds=self.settings.timestep) print(f"\nβœ“ Simulation completed: {timestep_count} timesteps") def _run_realtime(self): """Run simulation in real-time mode.""" print("Running in real-time mode...\n") timestep_count = int(self.total_duration / self.settings.timestep) start_real_time = time.time() for step in range(timestep_count): step_start = time.time() self._simulate_timestep() # Print status if step % 10 == 0: print( f"[{step}/{timestep_count}] {self.current_datetime.strftime('%Y-%m-%d %H:%M:%S')} UTC" ) # Advance time self.current_datetime += timedelta(seconds=self.settings.timestep) # Wait for the timestep duration elapsed = time.time() - step_start sleep_time = self.settings.timestep - elapsed if sleep_time > 0: time.sleep(sleep_time) total_real_time = time.time() - start_real_time print( f"\nβœ“ Simulation completed: {timestep_count} timesteps in {total_real_time:.1f}s" ) def _simulate_timestep(self): """Simulate a single timestep.""" # Check if it's midnight (00:00) is_midnight = ( self.current_datetime.hour == 0 and self.current_datetime.minute == 0 ) if is_midnight: # 0. Fetch weather forecast daily at midnight (00:00) self._fetch_weather() # 1. Assign daily trips at midnight (00:00): self._assign_daily_trips() # 2. Generate energy forecast at midnight (00:00): self._generate_energy_forecast() # 3. Check and handle trip schedules self._handle_trips() # 4. Update PV production based on weather self._update_pv_production() # 5. Check which boats need charging and assign to chargers self._assign_boats_to_chargers() # 6. Update BESS (after charger assignment to see correct load) self._update_bess() # 7. Update charging for boats self._update_charging() # 8. Save measurements to database self._save_measurements() def _fetch_weather(self): """Fetch weather forecast daily at midnight (00:00).""" if not self.weather_client: return current_date_str = self.current_datetime.strftime("%Y-%m-%d") # Check if we haven't fetched for this date yet if self.last_weather_fetch_date != current_date_str: print(f"\n 🌀️ Fetching weather forecast for {current_date_str}") # Calculate remaining days in simulation from current date days_remaining = ( self.start_date + timedelta(days=self.days) - self.current_datetime ).days days_to_fetch = min(days_remaining, 7) # OpenMeteo provides up to 7 days if days_to_fetch > 0: # Fetch forecast from current date self._load_weather_forecast( start_from=self.current_datetime, days=days_to_fetch ) self.last_weather_fetch_date = current_date_str print(f" βœ“ Weather data updated for next {days_to_fetch} day(s)") print() def _generate_energy_forecast(self): """Generate energy consumption/production forecast at midnight (00:00).""" current_date_str = self.current_datetime.strftime("%Y-%m-%d") # Check if we haven't forecasted for this date yet if self.last_energy_forecast_date != current_date_str: print(f" πŸ“Š Generating energy forecast for {current_date_str}") # Get trip assignments for today trip_assignments = {} for boat in self.port.boats: trips = self.trip_manager.get_trips_for_date( boat.name, self.current_datetime ) trip_assignments[boat.name] = trips # Generate 24-hour forecast forecasts = self.forecaster.generate_daily_forecast( self.current_datetime, trip_assignments ) # Store forecasts for optimizer self.latest_energy_forecasts = forecasts # Save to database self.forecaster.save_forecasts_to_db(forecasts) # Print summary self.forecaster.print_forecast_summary(forecasts) # Run optimization if enabled if self.use_optimizer and self.optimizer: self._run_optimization() self.last_energy_forecast_date = current_date_str print() def _run_optimization(self): """ Run optimization to create optimal schedules for chargers and BESS. """ print(f" 🎯 Running optimization...") # Run optimization result = self.optimizer.optimize_daily_schedule( self.current_datetime, self.latest_energy_forecasts ) # Save schedules to database self.optimizer.save_schedules_to_db(result) print(f" βœ“ Schedules saved to database") def _handle_energy_shortfalls(self, result, trip_assignments: Dict[str, List]): """ Handle cases where optimizer cannot meet energy requirements (graceful degradation). When energy shortfalls are detected, this function: 1. Logs warnings about which boats are affected 2. Attempts to maximize charging for affected boats 3. Updates schedules to use maximum available power Args: result: OptimizationResult with energy_shortfalls trip_assignments: Trip assignments per boat """ if not result.energy_shortfalls: return print( f" ⚠️ Handling energy shortfalls for {len(result.energy_shortfalls)} boat(s)..." ) # Update tracking of boats with shortfalls self.boats_with_shortfalls = set(result.energy_shortfalls.keys()) # For each boat with shortfall, try to maximize charging for boat_name, shortfall_kwh in result.energy_shortfalls.items(): boat = next(b for b in self.port.boats if b.name == boat_name) shortfall_pct = (shortfall_kwh / boat.battery_capacity) * 100 print( f" {boat_name}: {shortfall_kwh:.2f} kWh shortfall ({shortfall_pct:.1f}% of battery)" ) # Calculate remaining trips and energy needed trips = trip_assignments.get(boat_name, []) if trips: total_trip_energy = sum( trip.estimate_energy_required(boat.k) for trip in trips ) current_energy = boat.soc * boat.battery_capacity energy_needed = total_trip_energy + ( boat.battery_capacity - current_energy ) print(f" Current SOC: {boat.soc:.1%}") print(f" Energy needed: {energy_needed:.2f} kWh") print(f" Energy available: {current_energy:.2f} kWh") print(f" Shortfall: {shortfall_kwh:.2f} kWh") # Override schedules to use maximum power for this boat when available # This maximizes charging priority for boats with shortfalls self._override_schedules_for_shortfall_boat(boat_name, result) if trips: print( f" ⚠️ Warning: {len(trips)} trip(s) may be delayed or cancelled" ) print( f" Action: Maximizing charging priority for {boat_name}" ) # Note: The charger assignment logic will prioritize boats with lower SOC, # and boats with shortfalls are now marked for maximum power charging. def _assign_daily_trips(self): """Assign trips for all boats at midnight (00:00).""" current_date_str = self.current_datetime.strftime("%Y-%m-%d") # Check if we haven't assigned for this date yet if self.last_assignment_date != current_date_str: # Clear any delayed trips from previous day if self.delayed_trips: print( f" πŸ—‘οΈ Clearing {len(self.delayed_trips)} delayed trip(s) from previous day" ) self.delayed_trips.clear() print(f" πŸ“… Assigning trips for {current_date_str}") weekday_name = self.current_datetime.strftime("%A") print(f" Day: {weekday_name}") for boat in self.port.boats: trips = self.trip_manager.assign_daily_trips( boat.name, self.current_datetime ) if trips: trip_names = [t.route_name for t in trips] print(f" {boat.name}: {len(trips)} trip(s) - {trip_names}") else: print(f" {boat.name}: No trips (rest day)") self.last_assignment_date = current_date_str print() def _handle_trips(self): """Handle boat trips based on schedule.""" current_hour = self.current_datetime.hour current_minute = self.current_datetime.minute for boat in self.port.boats: boat_name = boat.name # Check if boat is currently on a trip if boat_name in self.active_trips: trip, start_time, elapsed = self.active_trips[boat_name] # Update elapsed time elapsed += self.settings.timestep # Check if trip is complete if elapsed >= trip.duration: # Discharge for final partial timestep [elapsed - timestep, duration] self._discharge_boat_on_trip(boat, trip, elapsed) # Trip completed, return to port boat.state = BoatState.IDLE print( f" ← {boat.name} returned from {trip.route_name} at {self.current_datetime.strftime('%H:%M')}, SOC={boat.soc:.1%}" ) del self.active_trips[boat_name] else: # Still on trip, discharge battery (same segment-based energy as requirement) self._discharge_boat_on_trip(boat, trip, elapsed) self.active_trips[boat_name] = (trip, start_time, elapsed) continue # Check for delayed trips (boats waiting for sufficient charge) if boat_name in self.delayed_trips: trip = self.delayed_trips[boat_name] # Don't start trips after 6 PM (18:00) if current_hour >= 18: # Cancel delayed trip if it's past 6 PM if current_hour == 18 and current_minute < ( self.settings.timestep / 60 ): print( f" ❌ {boat.name} cancelled DELAYED {trip.route_name} - too late (after 6 PM)" ) del self.delayed_trips[boat_name] continue estimated_energy = trip.estimate_energy_required(boat.k) required_soc = estimated_energy / boat.battery_capacity # Check if boat now has enough charge (tolerance for float precision) if boat.soc >= (int(required_soc * 100) / 100): # Disconnect from charger if connected if boat_name in self.boat_charger_map: charger_name = self.boat_charger_map[boat_name] charger = next( c for c in self.port.chargers if c.name == charger_name ) charger.state = ChargerState.IDLE charger.power = 0.0 charger.connected_boat = None del self.boat_charger_map[boat_name] # Start delayed trip self.active_trips[boat_name] = ( trip, self.current_datetime, 0.0, ) boat.state = BoatState.SAILING print( f" β†’ {boat.name} starting DELAYED {trip.route_name} at {self.current_datetime.strftime('%H:%M')}, " f"SOC={boat.soc:.1%} (needed {required_soc:.1%})" ) del self.delayed_trips[boat_name] continue # Check if it's time to start a new trip for start_hour, slot in self.trip_schedule: # Start trip at the scheduled hour (check within the timestep window) if current_hour == start_hour and current_minute < ( self.settings.timestep / 60 ): # Get assigned trip for this slot trip = self.trip_manager.get_trip_for_slot( boat_name, self.current_datetime, slot ) if trip is None: continue # No trip assigned for this slot # Estimate energy required for trip estimated_energy = trip.estimate_energy_required(boat.k) required_soc = estimated_energy / boat.battery_capacity # Check if boat has enough charge (tolerance for float precision) if boat.soc >= (int(required_soc * 100) / 100): # Disconnect from charger if connected if boat_name in self.boat_charger_map: charger_name = self.boat_charger_map[boat_name] charger = next( c for c in self.port.chargers if c.name == charger_name ) charger.state = ChargerState.IDLE charger.power = 0.0 charger.connected_boat = None del self.boat_charger_map[boat_name] # Start trip self.active_trips[boat_name] = ( trip, self.current_datetime, 0.0, ) boat.state = BoatState.SAILING print( f" β†’ {boat.name} starting {trip.route_name} at {self.current_datetime.strftime('%H:%M')}, " f"SOC={boat.soc:.1%} (need {required_soc:.1%})" ) break else: # Not enough charge - delay the trip self.delayed_trips[boat_name] = trip print( f" ⏸️ {boat.name} delaying {trip.route_name} - insufficient charge " f"(has {boat.soc:.1%}, needs {required_soc:.1%}). Will depart when ready." ) break def _discharge_boat_on_trip(self, boat, trip: Trip, elapsed_seconds: float): """Discharge boat battery using the same segment-based energy as the requirement estimate.""" # Energy for the period that just passed: [elapsed - timestep, elapsed], capped at trip end start_elapsed = max(0.0, elapsed_seconds - self.settings.timestep) end_elapsed = min(elapsed_seconds, trip.duration) if start_elapsed >= end_elapsed: return energy_consumed = trip.get_energy_between(start_elapsed, end_elapsed, boat.k) soc_decrease = energy_consumed / boat.battery_capacity boat.soc = max(0, boat.soc - soc_decrease) def _assign_boats_to_chargers(self): """Assign boats that need charging to available chargers.""" # Check if we're using optimizer schedules if self.use_optimizer: self._assign_boats_to_chargers_with_schedule() elif ( hasattr(self.settings, "power_limit_mode") and self.settings.power_limit_mode ): self._assign_boats_to_chargers_power_limited() else: self._assign_boats_to_chargers_default() def _get_current_power_usage(self) -> float: """Calculate current total power being used by chargers.""" return sum( c.power for c in self.port.chargers if c.state == ChargerState.CHARGING ) def _get_available_power(self): used_power = self._get_current_power_usage() pv_production = sum(pv.current_production for pv in self.port.pv_systems) if self.use_optimizer: bess_available = 0.0 else: bess_available = sum( bess.get_max_discharge_power_available(self.settings.timestep) for bess in self.port.bess_systems ) return self.port.contracted_power + pv_production + bess_available - used_power def _assign_boats_to_chargers_with_schedule(self): """ Assign boats to chargers following the optimizer's CHARGER power schedules. Uses the pre-assigned boatβ†’charger mapping to determine each boat's charging power. Each boat can only connect to ONE charger at a time (physical constraint). IMPORTANT: Updates self.boat_charger_map which _update_charging() uses. """ now = self.current_datetime ts_str = now.strftime("%Y-%m-%d %H:%M:%S") # ------------------------------------------------------------------ # Load scheduled CHARGER powers for THIS timestep # ------------------------------------------------------------------ scheduled_charger_power = {} # charger_name -> power_kW power_setpoint_met = self.db_manager.get_metric_id("power_setpoint") for charger in self.port.chargers: src = self.db_manager.get_or_create_source(charger.name, "charger") rows = self.db_manager.get_records( table="scheduling", source_id=src, metric_id=power_setpoint_met, start_time=ts_str, end_time=ts_str, ) scheduled_charger_power[charger.name] = ( float(rows[0]["value"]) if rows else 0.0 ) # ------------------------------------------------------------------ # Derive BOAT powers from charger schedules using pre-assignments # boat_charger_assignments: {boat_name: charger_index} # ------------------------------------------------------------------ scheduled_boat_power = {} for boat_name, charger_idx in self.boat_charger_assignments.items(): charger_name = self.port.chargers[charger_idx].name scheduled_boat_power[boat_name] = scheduled_charger_power.get( charger_name, 0.0 ) # DEBUG: Print scheduled powers if now.hour < 18 and now.minute % 15 == 0: print( f"\n [DEBUG {ts_str}] Scheduled charger powers: {scheduled_charger_power}" ) print(f" [DEBUG {ts_str}] Derived boat powers: {scheduled_boat_power}") # ------------------------------------------------------------------ # Build current charger ↔ boat mapping from charger state (one-to-one) # ------------------------------------------------------------------ charger_to_boat = {} boat_to_charger = {} for charger in self.port.chargers: if charger.connected_boat: charger_to_boat[charger.name] = charger.connected_boat boat_to_charger[charger.connected_boat] = charger.name # DEBUG: Print current assignments if now.hour < 18 and now.minute % 15 == 0: currently_charging = [ ( boat_name, charger_name, next(b for b in self.port.boats if b.name == boat_name).soc, ) for boat_name, charger_name in boat_to_charger.items() ] print(f" [DEBUG] Currently charging: {currently_charging}") # ------------------------------------------------------------------ # Force disconnect boats that are already fully charged # ------------------------------------------------------------------ for boat_name, charger_name in list(boat_to_charger.items()): boat = next(b for b in self.port.boats if b.name == boat_name) if boat.soc >= 0.99: charger = next(c for c in self.port.chargers if c.name == charger_name) if now.hour < 18 and now.minute % 15 == 0: print(f" [DEBUG] Disconnecting {boat_name} - fully charged") charger.state = ChargerState.IDLE charger.power = 0.0 charger.connected_boat = None boat.state = BoatState.IDLE boat_to_charger.pop(boat_name, None) charger_to_boat.pop(charger_name, None) self.boat_charger_map.pop(boat_name, None) # ------------------------------------------------------------------ # Process each boat according to its scheduled power # ------------------------------------------------------------------ releases_made = [] assignments_made = [] for boat in self.port.boats: scheduled_power = scheduled_boat_power.get(boat.name, 0.0) # Skip boats that are sailing if boat.state == BoatState.SAILING: continue # Skip fully charged boats if boat.soc >= 0.99: continue # ------------------------------------------------------------------ # Case 1: Boat should NOT be charging (scheduled power = 0) # ------------------------------------------------------------------ if scheduled_power <= 0.01: if boat.name in boat_to_charger: charger_name = boat_to_charger[boat.name] charger = next( c for c in self.port.chargers if c.name == charger_name ) releases_made.append((boat.name, charger_name)) charger.state = ChargerState.IDLE charger.power = 0.0 charger.connected_boat = None boat.state = BoatState.IDLE boat_to_charger.pop(boat.name, None) charger_to_boat.pop(charger_name, None) self.boat_charger_map.pop(boat.name, None) continue # ------------------------------------------------------------------ # Case 2: Boat SHOULD be charging (scheduled power > 0) # ------------------------------------------------------------------ if boat.name in boat_to_charger: # Already connected - just update power charger_name = boat_to_charger[boat.name] charger = next(c for c in self.port.chargers if c.name == charger_name) old_power = charger.power new_power = min(scheduled_power, charger.max_power) charger.power = new_power charger.state = ChargerState.CHARGING boat.state = BoatState.CHARGING if ( now.hour < 18 and now.minute % 15 == 0 and abs(old_power - new_power) > 0.1 ): print( f" [DEBUG] Updated {boat.name} @ {charger.name}: {old_power:.1f}β†’{new_power:.1f} kW" ) else: # Use pre-assigned charger (1:1 mapping) assigned_idx = self.boat_charger_assignments.get(boat.name) if assigned_idx is not None: assigned_charger = self.port.chargers[assigned_idx] # Connect boat to its pre-assigned charger new_power = min(scheduled_power, assigned_charger.max_power) assigned_charger.connected_boat = boat.name assigned_charger.state = ChargerState.CHARGING assigned_charger.power = new_power boat.state = BoatState.CHARGING charger_to_boat[assigned_charger.name] = boat.name boat_to_charger[boat.name] = assigned_charger.name self.boat_charger_map[boat.name] = assigned_charger.name assignments_made.append( (boat.name, assigned_charger.name, new_power) ) else: if now.hour < 18 and now.minute % 15 == 0: print( f" [DEBUG] WARNING: No pre-assignment for {boat.name} (needs {scheduled_power:.1f} kW)" ) # DEBUG: Print releases and new assignments if now.hour < 18 and now.minute % 15 == 0: if releases_made: print(f" [DEBUG] Released: {releases_made}") if assignments_made: print(f" [DEBUG] New assignments: {assignments_made}") # DEBUG: Print final state if now.hour < 18 and now.minute % 15 == 0: total_power = sum( c.power for c in self.port.chargers if c.state == ChargerState.CHARGING ) active_chargers = [ (c.name, c.power, c.connected_boat) for c in self.port.chargers if c.state == ChargerState.CHARGING ] print(f" [DEBUG] Active chargers: {active_chargers}") print( f" [DEBUG] Total power: {total_power:.1f} kW / {self.port.contracted_power} kW contracted" ) def _override_schedules_for_shortfall_boat(self, boat_name: str, result): """ Override schedules to maximize charging for a boat with energy shortfall. This updates the schedules in the database to use maximum power when the boat is available (not sailing). Args: boat_name: Name of boat with shortfall result: OptimizationResult with schedules """ # Find which charger is assigned to this boat (if any) assigned_charger = None if boat_name in self.boat_charger_map: charger_name = self.boat_charger_map[boat_name] assigned_charger = next( c for c in self.port.chargers if c.name == charger_name ) # Update schedules to use maximum power for this boat's charger # when boat is available if assigned_charger and assigned_charger.name in result.charger_schedules: updated_schedules = [] charger_schedule = result.charger_schedules[assigned_charger.name] for timestamp, power in charger_schedule: # Check if boat is available at this timestamp # Find corresponding forecast forecast = next( ( f for f in self.latest_energy_forecasts if f.timestamp == timestamp ), None, ) if forecast and forecast.boat_available.get(boat_name, 0) == 1: # Boat is available - use maximum power updated_schedules.append((timestamp, assigned_charger.max_power)) else: # Boat is sailing or forecast not found - keep original schedule updated_schedules.append((timestamp, power)) # Update the schedule in result result.charger_schedules[assigned_charger.name] = updated_schedules def _assign_boats_to_chargers_default(self): """ Assign boats to chargers using first-come-first-served (FCFS) logic. Default behavior without optimizer: - First come, first served: boats with delayed trips get priority, then boats are served in arrival order (list order) - Use max power available on each charger - NO contracted power limit - boats charge freely at max charger power - This allows measuring the impact of unlimited charging on results """ # Get boats that need charging (not sailing, not fully charged) boats_needing_charge = [ b for b in self.port.boats if b.state != BoatState.SAILING and b.soc < 0.99 and b.name not in self.boat_charger_map ] # FCFS ordering: prioritize boats with delayed trips, then maintain list order # (no SOC-based sorting - pure first-come-first-served) boats_needing_charge.sort( key=lambda b: (0 if b.name in self.delayed_trips else 1) ) # Get available chargers available_chargers = [ c for c in self.port.chargers if c.state == ChargerState.IDLE ] # Assign boats to chargers (FCFS with max power, no contracted power limit) for boat in boats_needing_charge: if not available_chargers: # No chargers available - boat idles waiting break charger = available_chargers.pop(0) # Assign boat to charger at max power (no power limit check) self.boat_charger_map[boat.name] = charger.name boat.state = BoatState.CHARGING charger.state = ChargerState.CHARGING charger.power = charger.max_power charger.connected_boat = boat.name # Log charging start if self.current_datetime.minute % 15 == 0: priority_note = ( " (priority - delayed trip)" if boat.name in self.delayed_trips else "" ) print( f" ⚑ {boat.name} started charging at {charger.name}, SOC={boat.soc:.1%}" ) def _assign_boats_to_chargers_power_limited(self): """ Assign boats to chargers with power limiting to respect contracted power. Baseline behavior with power limit enforcement: - First come, first served: boats with delayed trips get priority, then boats are served in arrival order (list order) - Enforces contracted power limit by capping total charging power - Distributes available power proportionally when limit is hit - This provides a baseline to compare against optimizer performance """ # Get boats that need charging (not sailing, not fully charged) boats_needing_charge = [ b for b in self.port.boats if b.state != BoatState.SAILING and b.soc < 0.99 and b.name not in self.boat_charger_map ] # FCFS ordering: prioritize boats with delayed trips, then maintain list order boats_needing_charge.sort( key=lambda b: (0 if b.name in self.delayed_trips else 1) ) # Get available chargers available_chargers = [ c for c in self.port.chargers if c.state == ChargerState.IDLE ] # Calculate available power (contracted power - current usage) current_power_used = self._get_current_power_usage() pv_production = sum(pv.current_production for pv in self.port.pv_systems) available_power = ( self.port.contracted_power + pv_production - current_power_used ) # Assign boats to chargers (FCFS with power limit enforcement) for boat in boats_needing_charge: if not available_chargers: # No chargers available - boat idles waiting break if available_power <= 0.1: # No power available - cannot charge more boats break charger = available_chargers.pop(0) # Assign boat to charger, respecting power limit # Use minimum of charger max power and available power charge_power = min(charger.max_power, available_power) self.boat_charger_map[boat.name] = charger.name boat.state = BoatState.CHARGING charger.state = ChargerState.CHARGING charger.power = charge_power charger.connected_boat = boat.name # Update available power available_power -= charge_power # Log charging start if self.current_datetime.minute % 15 == 0: priority_note = ( " (priority - delayed trip)" if boat.name in self.delayed_trips else "" ) power_note = ( f" [{charge_power:.1f}kW]" if charge_power < charger.max_power else "" ) print( f" ⚑ {boat.name} started charging at {charger.name}{priority_note}{power_note}, SOC={boat.soc:.1%}" ) # If power limit is hit, proportionally reduce charging power for already-charging boats # This ensures we don't exceed the limit total_power_used = self._get_current_power_usage() max_total_power = self.port.contracted_power + pv_production if ( total_power_used > max_total_power + 0.1 ): # Small tolerance for floating point # Need to reduce power proportionally scale_factor = max_total_power / total_power_used for boat_name, charger_name in self.boat_charger_map.items(): charger = next(c for c in self.port.chargers if c.name == charger_name) if charger.state == ChargerState.CHARGING: charger.power = charger.power * scale_factor def _update_charging(self): """Update battery charge for boats that are charging.""" for boat_name, charger_name in list(self.boat_charger_map.items()): boat = next(b for b in self.port.boats if b.name == boat_name) charger = next(c for c in self.port.chargers if c.name == charger_name) # Calculate energy delivered to battery in this timestep effective_power = charger.effective_power # kW energy_delivered = (effective_power * self.settings.timestep) / 3600 # kWh # Update boat SOC soc_increase = energy_delivered / boat.battery_capacity boat.soc = min(1.0, boat.soc + soc_increase) # If boat is fully charged, disconnect if boat.soc >= 0.99: print( f" βœ“ {boat.name} fully charged at {self.current_datetime.strftime('%H:%M')}" ) boat.state = BoatState.IDLE charger.state = ChargerState.IDLE charger.power = 0.0 charger.connected_boat = None del self.boat_charger_map[boat_name] def _load_weather_forecast( self, start_from: Optional[datetime] = None, days: Optional[int] = None ): """Load weather forecast from Open-Meteo and save to database. Args: start_from: Starting date for forecast (default: simulation start_date) days: Number of days to fetch (default: simulation days, max 7) """ if not self.weather_client: return # Use provided values or defaults fetch_start = start_from if start_from else self.start_date fetch_days = min(days if days else self.days, 7) # Cap at 7 days print(" Fetching weather forecast from Open-Meteo...") forecast_data = self.weather_client.fetch_forecast(fetch_start, fetch_days) if not forecast_data or "timestamps" not in forecast_data: print(" ⚠️ Failed to fetch weather forecast") return timestamps = forecast_data["timestamps"] print(f" βœ“ Received {len(timestamps)} hours of forecast data") # Save to database forecasts = [] openmeteo_src = self.db_manager.get_or_create_source("openmeteo", "weather") for i, ts in enumerate(timestamps): ts_str = ts.strftime("%Y-%m-%d %H:%M:%S") # Save each metric for metric, values in forecast_data.items(): if metric == "timestamps": continue if i < len(values) and values[i] is not None: metric_id = self.db_manager.get_metric_id(metric) forecasts.append( (ts_str, openmeteo_src, metric_id, str(float(values[i]))) ) # Store in memory for quick access if i < len(timestamps): self.weather_forecast[ts_str] = {} for metric, metric_values in forecast_data.items(): if ( metric != "timestamps" and i < len(metric_values) and metric_values[i] is not None ): self.weather_forecast[ts_str][metric] = float(metric_values[i]) # Save to database if forecasts: self.db_manager.save_records_batch("forecast", forecasts) print(f" βœ“ Saved {len(forecasts)} forecast values to database") self.forecast_loaded = True def _get_weather_conditions(self, timestamp: datetime) -> Dict: """ Get weather conditions for a specific timestamp. Args: timestamp: Datetime to get conditions for Returns: Dictionary with weather metrics """ # Round to nearest hour for lookup rounded = timestamp.replace(minute=0, second=0, microsecond=0) ts_str = rounded.strftime("%Y-%m-%d %H:%M:%S") if ts_str in self.weather_forecast: return self.weather_forecast[ts_str] # Return default values if not found return { "ghi": 0.0, "dni": 0.0, "dhi": 0.0, "temperature": 20.0, } def _update_pv_production(self): """Update PV production for all PV systems.""" if not self.port.pv_systems: return # Get current weather conditions conditions = self._get_weather_conditions(self.current_datetime) # Update each PV system for pv in self.port.pv_systems: pv.calculate_production( ghi=conditions.get("ghi", 0.0), dni=conditions.get("dni", 0.0), dhi=conditions.get("dhi", 0.0), temperature=conditions.get("temperature", 20.0), timestamp=self.current_datetime, ) def _update_bess(self): """Update BESS charge/discharge.""" if not self.port.bess_systems: return # Check if we're using optimizer schedules if self.use_optimizer: self._update_bess_with_schedule() else: self._update_bess_default() def _update_bess_with_schedule(self): """ Update BESS using optimized schedules from database. When schedule says idle or no schedule exists, opportunistically charge from excess PV production. Any remaining excess PV will be exported to grid. """ timestamp_str = self.current_datetime.strftime("%Y-%m-%d %H:%M:%S") power_setpoint_met = self.db_manager.get_metric_id("power_setpoint") # Calculate current power flows for opportunistic charging pv_production = sum(pv.current_production for pv in self.port.pv_systems) charger_load = sum( c.power for c in self.port.chargers if c.state == ChargerState.CHARGING ) # Excess PV = PV production not being used by chargers excess_pv = max(0.0, pv_production - charger_load) for bess in self.port.bess_systems: bess_src = self.db_manager.get_or_create_source(bess.name, "bess") schedule = self.db_manager.get_records( "scheduling", source_id=bess_src, metric_id=power_setpoint_met, start_time=timestamp_str, end_time=timestamp_str, ) if schedule: # Use scheduled power (positive = discharge, negative = charge) scheduled_power = float(schedule[0]["value"]) if scheduled_power > 0.1: # Discharge as scheduled bess.discharge(scheduled_power, self.settings.timestep) elif scheduled_power < -0.1: # Charge as scheduled bess.charge(abs(scheduled_power), self.settings.timestep) else: # Schedule says idle - opportunistically charge from excess PV if excess_pv > 0.1: max_charge = bess.get_max_charge_power_available( self.settings.timestep ) charge_power = min(excess_pv, max_charge) if charge_power > 0.1: bess.charge(charge_power, self.settings.timestep) excess_pv -= charge_power else: bess.idle() else: bess.idle() else: # No schedule - opportunistically charge from excess PV if excess_pv > 0.1: max_charge = bess.get_max_charge_power_available( self.settings.timestep ) charge_power = min(excess_pv, max_charge) if charge_power > 0.1: bess.charge(charge_power, self.settings.timestep) excess_pv -= charge_power else: bess.idle() else: bess.idle() # Note: Any remaining excess_pv after BESS charging will be tracked as # power_active_export in _save_measurements() def _update_bess_default(self): """ Update BESS using default control strategy. Behavior depends on whether PV/renewables are present: With PV systems: - Charge when PV generation exceeds load (store excess solar) - Discharge when load exceeds generation or for peak shaving Without PV systems: - Charge when electricity is cheaper AND boats are not charging - Discharge when load exceeds contracted power (peak shaving) """ # Calculate current power flows pv_production = sum(pv.current_production for pv in self.port.pv_systems) charger_load = sum( c.power for c in self.port.chargers if c.state == ChargerState.CHARGING ) has_pv = len(self.port.pv_systems) > 0 if has_pv: self._update_bess_with_pv(pv_production, charger_load) else: self._update_bess_without_pv(charger_load) def _update_bess_with_pv(self, pv_production: float, charger_load: float): """ Update BESS when paired with PV/renewables (default mode, no optimizer). - Charge when PV generation exceeds load (store excess solar) - NO peak shaving (no contracted power limit in default mode) - Grid is assumed unlimited, so no discharge needed for peak shaving """ # PV surplus = excess PV not used by chargers pv_surplus = pv_production - charger_load for bess in self.port.bess_systems: if pv_surplus > 0: # Excess PV available - charge BESS with the surplus max_charge = bess.get_max_charge_power_available(self.settings.timestep) charge_power = min(pv_surplus, max_charge) if charge_power > 0.1: bess.charge(charge_power, self.settings.timestep) pv_surplus -= charge_power else: bess.idle() else: # No surplus - idle (no peak shaving in default mode) bess.idle() def _update_bess_without_pv(self, charger_load: float): """ Update BESS when NOT paired with PV/renewables (default mode, no optimizer). - Charge when electricity is cheaper AND boats are not charging - NO peak shaving (no contracted power limit in default mode) - Grid is assumed unlimited for charging """ # Get current tariff price current_price = self.port.get_tariff_price(self.current_datetime) # Determine if current price is "cheap": # - If tariff is available, use price threshold (below 0.10 €/kWh is cheap) # - If no tariff (price=0), use time-based heuristic (22:00 - 06:00 off-peak) current_hour = self.current_datetime.hour if current_price > 0: # Use actual tariff - consider cheap if below threshold # Typical off-peak prices are around 0.05-0.10 €/kWh is_cheap_period = current_price < 0.10 else: # No tariff loaded - use time-based heuristic is_cheap_period = current_hour >= 22 or current_hour < 6 # Check if any boats are currently charging boats_charging = charger_load > 0.1 for bess in self.port.bess_systems: if is_cheap_period and not boats_charging: # Cheap electricity and no boats charging - charge BESS from grid # No grid capacity limit in default mode - use max charge rate max_charge = bess.get_max_charge_power_available(self.settings.timestep) if max_charge > 0.1: bess.charge(max_charge, self.settings.timestep) else: bess.idle() else: # Expensive period or boats charging - idle bess.idle() def _save_measurements(self): """Save current state to database.""" measurements = [] # Convert current datetime to ISO format string (UTC) timestamp_str = self.current_datetime.strftime("%Y-%m-%d %H:%M:%S") # Calculate PV production (renewables) total_pv_production = sum(pv.current_production for pv in self.port.pv_systems) # Calculate BESS current state total_bess_power = sum(bess.current_power for bess in self.port.bess_systems) # Positive = charging (consuming power), Negative = discharging (providing power) bess_discharge = -total_bess_power if total_bess_power < 0 else 0 bess_charge = total_bess_power if total_bess_power > 0 else 0 # Calculate BESS usable capacity (what it COULD provide if needed) if self.use_optimizer: bess_usable_capacity = 0.0 else: bess_usable_capacity = sum( bess.get_max_discharge_power_available(self.settings.timestep) for bess in self.port.bess_systems ) # Calculate port metrics total_power_used = sum( c.power for c in self.port.chargers if c.state == ChargerState.CHARGING ) # Available power = contracted_power + renewables + usable_bess - used_power # This represents the maximum power the port could still draw/use available_power = ( self.port.contracted_power + total_pv_production + bess_usable_capacity - total_power_used ) # Calculate grid import/export # Grid balance = (consumption + BESS charging) - (PV production + BESS discharging) # Positive = importing from grid, Negative = exporting to grid grid_balance = ( total_power_used + bess_charge - total_pv_production - bess_discharge ) power_active_import = max(0.0, grid_balance) # Power drawn from grid power_active_export = max(0.0, -grid_balance) # Power exported to grid # Get source and metric IDs port_src = self.db_manager.get_or_create_source(self.port.name, "port") power_active_consumption_met = self.db_manager.get_metric_id( "power_active_consumption" ) power_active_production_met = self.db_manager.get_metric_id( "power_active_production" ) power_active_import_met = self.db_manager.get_metric_id("power_active_import") power_active_export_met = self.db_manager.get_metric_id("power_active_export") bess_discharge_met = self.db_manager.get_metric_id("bess_discharge") bess_charge_met = self.db_manager.get_metric_id("bess_charge") available_power_met = self.db_manager.get_metric_id("available_power") contracted_power_met = self.db_manager.get_metric_id("contracted_power") soc_met = self.db_manager.get_metric_id("soc") state_met = self.db_manager.get_metric_id("state") power_active_met = self.db_manager.get_metric_id("power_active") energy_stored_met = self.db_manager.get_metric_id("energy_stored") # Port measurements measurements.append( ( timestamp_str, port_src, power_active_consumption_met, str(total_power_used), ) ) measurements.append( ( timestamp_str, port_src, power_active_production_met, str(total_pv_production), ) ) measurements.append( (timestamp_str, port_src, bess_discharge_met, str(bess_discharge)) ) measurements.append( (timestamp_str, port_src, bess_charge_met, str(bess_charge)) ) measurements.append( (timestamp_str, port_src, power_active_import_met, str(power_active_import)) ) measurements.append( (timestamp_str, port_src, power_active_export_met, str(power_active_export)) ) measurements.append( (timestamp_str, port_src, available_power_met, str(available_power)) ) measurements.append( ( timestamp_str, port_src, contracted_power_met, str(self.port.contracted_power), ) ) # Boat measurements for boat in self.port.boats: boat_src = self.db_manager.get_or_create_source(boat.name, "boat") measurements.append((timestamp_str, boat_src, soc_met, str(boat.soc * 100))) measurements.append( ( timestamp_str, boat_src, state_met, str(float(boat.state.value == "sailing")), ) ) # Calculate current motor power (only when sailing) if boat.state == BoatState.SAILING and boat.name in self.active_trips: trip, _, elapsed = self.active_trips[boat.name] point = trip.get_point_at_elapsed_time(elapsed) if point: motor_power = boat.k * (point.speed**3) else: motor_power = 0.0 else: # Motor is off when charging or idle motor_power = 0.0 measurements.append( (timestamp_str, boat_src, power_active_met, str(motor_power)) ) # Charger measurements for charger in self.port.chargers: charger_src = self.db_manager.get_or_create_source(charger.name, "charger") measurements.append( (timestamp_str, charger_src, power_active_met, str(charger.power)) ) measurements.append( ( timestamp_str, charger_src, state_met, str(float(charger.state.value == "charging")), ) ) # PV system measurements for pv in self.port.pv_systems: pv_src = self.db_manager.get_or_create_source(pv.name, "pv") measurements.append( ( timestamp_str, pv_src, power_active_production_met, str(pv.current_production), ) ) # BESS measurements for bess in self.port.bess_systems: bess_src = self.db_manager.get_or_create_source(bess.name, "bess") measurements.append( (timestamp_str, bess_src, soc_met, str(bess.current_soc * 100)) ) measurements.append( (timestamp_str, bess_src, power_active_met, str(bess.current_power)) ) measurements.append( ( timestamp_str, bess_src, energy_stored_met, str(bess.get_energy_stored()), ) ) # Save to database self.db_manager.save_records_batch("measurements", measurements)