Source code for models.trip

"""Trip model for boat routes."""

import csv
from dataclasses import dataclass
from datetime import datetime, timedelta
from pathlib import Path
from typing import List, Optional


[docs] @dataclass class TripPoint: """ One waypoint in a trip route. Attributes: timestamp: Time at this point. point_type: Type label (e.g. Static, Dock, Terrestrial, Interpolated). speed: Speed in knots. heading: Heading in degrees. latitude: Latitude. longitude: Longitude. """ timestamp: datetime point_type: str speed: float heading: float latitude: float longitude: float
[docs] class Trip: """ Boat trip with waypoints loaded from CSV. Attributes: route_name: Route name (from CSV filename stem). points: List of TripPoint waypoints. duration: Total trip duration in seconds. """ def __init__(self, csv_path: str): """Load trip from a CSV file (columns: timestamp, type, speed, heading, latitude, longitude).""" self.route_name = Path(csv_path).stem self.points: List[TripPoint] = [] self._load_from_csv(csv_path) if self.points: first_time = self.points[0].timestamp last_time = self.points[-1].timestamp self.duration = (last_time - first_time).total_seconds() else: self.duration = 0 def _load_from_csv(self, csv_path: str): """Parse CSV into TripPoint list; skips empty rows and rows missing timestamp/speed/latitude.""" with open(csv_path, "r", encoding="utf-8") as f: reader = csv.DictReader(f) for _row_num, row in enumerate(reader, start=2): if not any(row.values()): continue if not row.get("timestamp") or not row.get("timestamp").strip(): continue try: timestamp_str = row["timestamp"].strip() try: timestamp = datetime.strptime( timestamp_str, "%Y-%m-%d %H:%M:%S.%f" ) except ValueError: if "." in timestamp_str: base, frac = timestamp_str.split(".") frac = frac[:6].ljust(6, "0") timestamp_str = f"{base}.{frac}" timestamp = datetime.strptime( timestamp_str, "%Y-%m-%d %H:%M:%S.%f" ) else: timestamp = datetime.strptime( timestamp_str, "%Y-%m-%d %H:%M:%S" ) if not row.get("speed") or not row.get("speed").strip(): continue if not row.get("latitude") or not row.get("latitude").strip(): continue point = TripPoint( timestamp=timestamp, point_type=row.get("type", ""), speed=float(row["speed"]), heading=float(row["heading"]) if row.get("heading") else 0.0, latitude=float(row["latitude"]), longitude=float(row["longitude"]), ) self.points.append(point) except (ValueError, KeyError) as e: raise ValueError(f"Error parsing CSV: {csv_path}") from e
[docs] def get_point_at_elapsed_time(self, elapsed_seconds: float) -> Optional[TripPoint]: """ Return the waypoint closest to the given elapsed time from trip start. Args: elapsed_seconds: Seconds since trip start. Returns: Closest TripPoint, or None if elapsed_seconds exceeds trip duration. """ if not self.points or elapsed_seconds > self.duration: return None target_time = self.points[0].timestamp + timedelta(seconds=elapsed_seconds) closest_point = self.points[0] min_diff = abs((target_time - closest_point.timestamp).total_seconds()) for point in self.points[1:]: diff = abs((target_time - point.timestamp).total_seconds()) if diff < min_diff: min_diff = diff closest_point = point else: break return closest_point
[docs] def estimate_energy_required(self, boat_k_factor: float) -> float: """ Estimate total energy (kWh) for the full trip using cube-law power per segment. Args: boat_k_factor: Boat k-factor (motor_power / range_speed^3). Returns: Total energy in kWh. """ if not self.points: return 0.0 total_energy = 0.0 for i in range(len(self.points) - 1): point = self.points[i] next_point = self.points[i + 1] segment_duration = (next_point.timestamp - point.timestamp).total_seconds() power_kw = boat_k_factor * (point.speed**3) energy_kwh = (power_kw * segment_duration) / 3600 total_energy += energy_kwh return total_energy
[docs] def get_energy_between( self, start_elapsed_seconds: float, end_elapsed_seconds: float, boat_k_factor: float, ) -> float: """ Energy (kWh) consumed in [start_elapsed, end_elapsed] using same segment integration as estimate_energy_required. Args: start_elapsed_seconds: Window start (s from trip start). end_elapsed_seconds: Window end (s from trip start). boat_k_factor: Boat k-factor (motor_power / range_speed^3). Returns: Energy in kWh for the window. """ if not self.points or start_elapsed_seconds >= end_elapsed_seconds: return 0.0 t0 = self.points[0].timestamp total_energy = 0.0 for i in range(len(self.points) - 1): point = self.points[i] next_point = self.points[i + 1] seg_start = (point.timestamp - t0).total_seconds() seg_end = (next_point.timestamp - t0).total_seconds() overlap_start = max(seg_start, start_elapsed_seconds) overlap_end = min(seg_end, end_elapsed_seconds) duration = max(0.0, overlap_end - overlap_start) if duration <= 0: continue power_kw = boat_k_factor * (point.speed**3) total_energy += (power_kw * duration) / 3600 return total_energy
def __repr__(self) -> str: return ( f"Trip(route={self.route_name}, points={len(self.points)}, " f"duration={self.duration/3600:.2f}h)" )