diff --git a/flixopt/components.py b/flixopt/components.py index 4b91fe6ff..2b55df888 100644 --- a/flixopt/components.py +++ b/flixopt/components.py @@ -819,6 +819,10 @@ def _do_modeling(self): """Create transmission efficiency equations and optional absolute loss constraints for both flow directions""" super()._do_modeling() + # In DCE mode, skip constraint creation - constraints will be added later + if self._model._dce_mode: + return + # first direction self.create_transmission_equation('dir1', self.element.in1, self.element.out1) @@ -870,6 +874,10 @@ def _do_modeling(self): """Create linear conversion equations or piecewise conversion constraints between input and output flows""" super()._do_modeling() + # In DCE mode, skip constraint creation - constraints will be added later + if self._model._dce_mode: + return + # Create conversion factor constraints if specified if self.element.conversion_factors: all_input_flows = set(self.element.inputs) @@ -929,6 +937,9 @@ def __init__(self, model: FlowSystemModel, element: Storage): def _do_modeling(self): """Create charge state variables, energy balance equations, and optional investment submodels.""" super()._do_modeling() + # In DCE mode, skip variable/constraint creation - will be added later + if self._model._dce_mode: + return self._create_storage_variables() self._add_netto_discharge_constraint() self._add_energy_balance_constraint() @@ -1307,6 +1318,9 @@ def _do_modeling(self): inter-cluster linking. Overrides specific methods to customize behavior. """ super()._do_modeling() + # In DCE mode, skip constraint creation - constraints will be added later + if self._model._dce_mode: + return self._add_intercluster_linking() def _add_cluster_cyclic_constraint(self): diff --git a/flixopt/elements.py b/flixopt/elements.py index 791596b28..ae146e8c7 100644 --- a/flixopt/elements.py +++ b/flixopt/elements.py @@ -24,6 +24,7 @@ VariableCategory, register_class_for_io, ) +from .vectorized import ConstraintResult, ConstraintSpec, VariableHandle, VariableSpec if TYPE_CHECKING: import linopy @@ -35,6 +36,7 @@ Numeric_TPS, Scalar, ) + from .vectorized import EffectShareSpec logger = logging.getLogger('flixopt') @@ -663,11 +665,430 @@ class FlowModel(ElementModel): def __init__(self, model: FlowSystemModel, element: Flow): super().__init__(model, element) + self._dce_handles: dict[str, VariableHandle] = {} + + # ========================================================================= + # DCE Pattern: Declaration-Collection-Execution + # ========================================================================= + + def declare_variables(self) -> list[VariableSpec]: + """Declare variables needed by this Flow for batch creation. + + Returns VariableSpecs that will be collected by VariableRegistry + and batch-created with other Flows' variables. + """ + specs = [] + + # Main flow rate variable (always needed) + specs.append( + VariableSpec( + category='flow_rate', + element_id=self.label_full, + lower=self.absolute_flow_rate_bounds[0], + upper=self.absolute_flow_rate_bounds[1], + dims=self._model.temporal_dims, + var_category=VariableCategory.FLOW_RATE, + ) + ) + + # Status variable (if using status_parameters) + if self.with_status: + specs.append( + VariableSpec( + category='status', + element_id=self.label_full, + binary=True, + dims=self._model.temporal_dims, + var_category=VariableCategory.STATUS, + ) + ) + + # Total flow hours variable (per period) + # Bounds from flow_hours_min/max + specs.append( + VariableSpec( + category='total_flow_hours', + element_id=self.label_full, + lower=self.element.flow_hours_min if self.element.flow_hours_min is not None else 0, + upper=self.element.flow_hours_max if self.element.flow_hours_max is not None else np.inf, + dims=('period', 'scenario'), + var_category=VariableCategory.TOTAL, + ) + ) + + # Flow hours over periods (if constrained) + if self.element.flow_hours_min_over_periods is not None or self.element.flow_hours_max_over_periods is not None: + specs.append( + VariableSpec( + category='flow_hours_over_periods', + element_id=self.label_full, + lower=self.element.flow_hours_min_over_periods + if self.element.flow_hours_min_over_periods is not None + else 0, + upper=self.element.flow_hours_max_over_periods + if self.element.flow_hours_max_over_periods is not None + else np.inf, + dims=('scenario',), + var_category=VariableCategory.TOTAL_OVER_PERIODS, + ) + ) + + # Investment variables (if using InvestParameters for size) + if self.with_investment: + invest_params = self.element.size # InvestParameters + size_min = invest_params.minimum_or_fixed_size + size_max = invest_params.maximum_or_fixed_size + + # Handle linked_periods masking + if invest_params.linked_periods is not None: + size_min = size_min * invest_params.linked_periods + size_max = size_max * invest_params.linked_periods + + specs.append( + VariableSpec( + category='size', + element_id=self.label_full, + lower=size_min if invest_params.mandatory else 0, + upper=size_max, + dims=('period', 'scenario'), + var_category=VariableCategory.FLOW_SIZE, + ) + ) + + # Binary investment decision (only if not mandatory) + if not invest_params.mandatory: + specs.append( + VariableSpec( + category='invested', + element_id=self.label_full, + binary=True, + dims=('period', 'scenario'), + var_category=VariableCategory.INVESTED, + ) + ) + + return specs + + def declare_constraints(self) -> list[ConstraintSpec]: + """Declare constraints needed by this Flow for batch creation. + + Returns ConstraintSpecs with build functions that will be called + after variables are created. + """ + specs = [] + + # Flow rate bounds (depends on status/investment configuration) + if self.with_status and not self.with_investment: + # Status-controlled bounds (no investment) + specs.append( + ConstraintSpec( + category='flow_rate_ub', + element_id=self.label_full, + build_fn=self._build_status_upper_bound, + ) + ) + specs.append( + ConstraintSpec( + category='flow_rate_lb', + element_id=self.label_full, + build_fn=self._build_status_lower_bound, + ) + ) + elif self.with_investment and not self.with_status: + # Investment-scaled bounds (no status) + specs.append( + ConstraintSpec( + category='flow_rate_scaled_ub', + element_id=self.label_full, + build_fn=self._build_investment_upper_bound, + ) + ) + specs.append( + ConstraintSpec( + category='flow_rate_scaled_lb', + element_id=self.label_full, + build_fn=self._build_investment_lower_bound, + ) + ) + elif self.with_investment and self.with_status: + # Both investment and status - most complex case + specs.append( + ConstraintSpec( + category='flow_rate_scaled_status_ub', + element_id=self.label_full, + build_fn=self._build_investment_status_upper_bound, + ) + ) + specs.append( + ConstraintSpec( + category='flow_rate_scaled_status_lb', + element_id=self.label_full, + build_fn=self._build_investment_status_lower_bound, + ) + ) + + # Investment-specific constraints + if self.with_investment: + invest_params = self.element.size + # Size/invested linkage (only if not mandatory) + if not invest_params.mandatory: + specs.append( + ConstraintSpec( + category='size_invested_ub', + element_id=self.label_full, + build_fn=self._build_size_invested_upper_bound, + ) + ) + specs.append( + ConstraintSpec( + category='size_invested_lb', + element_id=self.label_full, + build_fn=self._build_size_invested_lower_bound, + ) + ) + + # Total flow hours tracking constraint + specs.append( + ConstraintSpec( + category='total_flow_hours_eq', + element_id=self.label_full, + build_fn=self._build_total_flow_hours_tracking, + ) + ) + + # Flow hours over periods tracking (if needed) + if self.element.flow_hours_min_over_periods is not None or self.element.flow_hours_max_over_periods is not None: + specs.append( + ConstraintSpec( + category='flow_hours_over_periods_eq', + element_id=self.label_full, + build_fn=self._build_flow_hours_over_periods_tracking, + ) + ) + + # Load factor constraints + if self.element.load_factor_max is not None: + specs.append( + ConstraintSpec( + category='load_factor_max', + element_id=self.label_full, + build_fn=self._build_load_factor_max, + ) + ) + + if self.element.load_factor_min is not None: + specs.append( + ConstraintSpec( + category='load_factor_min', + element_id=self.label_full, + build_fn=self._build_load_factor_min, + ) + ) + + return specs + + def on_variables_created(self, handles: dict[str, VariableHandle]) -> None: + """Called after batch variable creation with handles to our variables. + + Note: Effect shares are NOT created here in DCE mode - they are + collected via declare_effect_shares() and batch-created later. + """ + self._dce_handles = handles + + # Register the DCE variables in our local registry so properties like self.flow_rate work + for category, handle in handles.items(): + self.register_variable(handle.variable, category) + + # Effect shares are created via EffectShareRegistry in DCE mode, not here + + def finalize_dce(self) -> None: + """Finalize DCE by creating submodels that weren't batch-created. + + Called after all DCE variables and constraints are created. + Creates StatusModel submodel if needed, using the already-created status variable. + """ + if not self.with_status: + return + + # Status variable was already created via DCE, get it from handles + status_var = self._dce_handles['status'].variable + + # Create StatusModel with the existing status variable + self.add_submodels( + StatusModel( + model=self._model, + label_of_element=self.label_of_element, + parameters=self.element.status_parameters, + status=status_var, + previous_status=self.previous_status, + label_of_model=self.label_of_element, + ), + short_name='status', + ) + + def declare_effect_shares(self) -> list[EffectShareSpec]: + """Declare effect shares needed by this Flow for batch creation. + + Returns EffectShareSpecs that will be batch-processed by EffectShareRegistry. + """ + from .vectorized import EffectShareSpec + + specs = [] + + if self.element.effects_per_flow_hour: + for effect_name, factor in self.element.effects_per_flow_hour.items(): + specs.append( + EffectShareSpec( + element_id=self.label_full, + effect_name=effect_name, + factor=factor, + target='temporal', + ) + ) + + return specs + + # ========================================================================= + # DCE Constraint Build Functions + # ========================================================================= + + def _build_status_upper_bound(self, model: FlowSystemModel, handles: dict[str, VariableHandle]) -> ConstraintResult: + """Build: flow_rate <= status * size * relative_max""" + flow_rate = handles['flow_rate'].variable + status = handles['status'].variable + _, ub_relative = self.relative_flow_rate_bounds + upper = status * ub_relative * self.element.size + return ConstraintResult(lhs=flow_rate, rhs=upper, sense='<=') + + def _build_status_lower_bound(self, model: FlowSystemModel, handles: dict[str, VariableHandle]) -> ConstraintResult: + """Build: flow_rate >= status * max(epsilon, size * relative_min)""" + flow_rate = handles['flow_rate'].variable + status = handles['status'].variable + lb_relative, _ = self.relative_flow_rate_bounds + lower_bound = lb_relative * self.element.size + epsilon = np.maximum(CONFIG.Modeling.epsilon, lower_bound) + lower = status * epsilon + return ConstraintResult(lhs=flow_rate, rhs=lower, sense='>=') + + def _build_total_flow_hours_tracking( + self, model: FlowSystemModel, handles: dict[str, VariableHandle] + ) -> ConstraintResult: + """Build: total_flow_hours = sum(flow_rate * dt)""" + flow_rate = handles['flow_rate'].variable + total_flow_hours = handles['total_flow_hours'].variable + rhs = self._model.sum_temporal(flow_rate) + return ConstraintResult(lhs=total_flow_hours, rhs=rhs, sense='==') + + def _build_flow_hours_over_periods_tracking( + self, model: FlowSystemModel, handles: dict[str, VariableHandle] + ) -> ConstraintResult: + """Build: flow_hours_over_periods = sum(total_flow_hours * period_weight)""" + total_flow_hours = handles['total_flow_hours'].variable + flow_hours_over_periods = handles['flow_hours_over_periods'].variable + weighted = (total_flow_hours * self._model.flow_system.period_weights).sum('period') + return ConstraintResult(lhs=flow_hours_over_periods, rhs=weighted, sense='==') + + def _build_load_factor_max(self, model: FlowSystemModel, handles: dict[str, VariableHandle]) -> ConstraintResult: + """Build: total_flow_hours <= size * load_factor_max * total_hours""" + total_flow_hours = handles['total_flow_hours'].variable + # Get size (from investment handle if available, else from element) + if self.with_investment and 'size' in handles: + size = handles['size'].variable + else: + size = self.element.size + total_hours = self._model.temporal_weight.sum(self._model.temporal_dims) + rhs = size * self.element.load_factor_max * total_hours + return ConstraintResult(lhs=total_flow_hours, rhs=rhs, sense='<=') + + def _build_load_factor_min(self, model: FlowSystemModel, handles: dict[str, VariableHandle]) -> ConstraintResult: + """Build: total_flow_hours >= size * load_factor_min * total_hours""" + total_flow_hours = handles['total_flow_hours'].variable + if self.with_investment and 'size' in handles: + size = handles['size'].variable + else: + size = self.element.size + total_hours = self._model.temporal_weight.sum(self._model.temporal_dims) + rhs = size * self.element.load_factor_min * total_hours + return ConstraintResult(lhs=total_flow_hours, rhs=rhs, sense='>=') + + def _build_investment_upper_bound( + self, model: FlowSystemModel, handles: dict[str, VariableHandle] + ) -> ConstraintResult: + """Build: flow_rate <= size * relative_max""" + flow_rate = handles['flow_rate'].variable + size = handles['size'].variable + _, ub_relative = self.relative_flow_rate_bounds + return ConstraintResult(lhs=flow_rate, rhs=size * ub_relative, sense='<=') + + def _build_investment_lower_bound( + self, model: FlowSystemModel, handles: dict[str, VariableHandle] + ) -> ConstraintResult: + """Build: flow_rate >= size * relative_min""" + flow_rate = handles['flow_rate'].variable + size = handles['size'].variable + lb_relative, _ = self.relative_flow_rate_bounds + return ConstraintResult(lhs=flow_rate, rhs=size * lb_relative, sense='>=') + + def _build_investment_status_upper_bound( + self, model: FlowSystemModel, handles: dict[str, VariableHandle] + ) -> ConstraintResult: + """Build: flow_rate <= size * relative_max (investment + status case)""" + flow_rate = handles['flow_rate'].variable + size = handles['size'].variable + _, ub_relative = self.relative_flow_rate_bounds + return ConstraintResult(lhs=flow_rate, rhs=size * ub_relative, sense='<=') + + def _build_investment_status_lower_bound( + self, model: FlowSystemModel, handles: dict[str, VariableHandle] + ) -> ConstraintResult: + """Build: flow_rate >= (status - 1) * M + size * relative_min""" + flow_rate = handles['flow_rate'].variable + size = handles['size'].variable + status = handles['status'].variable + lb_relative, _ = self.relative_flow_rate_bounds + invest_params = self.element.size + big_m = invest_params.maximum_or_fixed_size * lb_relative + rhs = (status - 1) * big_m + size * lb_relative + return ConstraintResult(lhs=flow_rate, rhs=rhs, sense='>=') + + def _build_size_invested_upper_bound( + self, model: FlowSystemModel, handles: dict[str, VariableHandle] + ) -> ConstraintResult: + """Build: size <= invested * maximum_size""" + size = handles['size'].variable + invested = handles['invested'].variable + invest_params = self.element.size + return ConstraintResult(lhs=size, rhs=invested * invest_params.maximum_or_fixed_size, sense='<=') + + def _build_size_invested_lower_bound( + self, model: FlowSystemModel, handles: dict[str, VariableHandle] + ) -> ConstraintResult: + """Build: size >= invested * minimum_size""" + size = handles['size'].variable + invested = handles['invested'].variable + invest_params = self.element.size + return ConstraintResult(lhs=size, rhs=invested * invest_params.minimum_or_fixed_size, sense='>=') + + # ========================================================================= + # Original Implementation (kept for backward compatibility) + # ========================================================================= def _do_modeling(self): - """Create variables, constraints, and nested submodels""" + """Create variables, constraints, and nested submodels. + + When FlowSystemModel._dce_mode is True, this method skips variable/constraint + creation since those will be handled by the DCE registries. Only effects + are still created here since they don't use DCE yet. + """ super()._do_modeling() + # In DCE mode, skip all variable/constraint creation - handled by registries + # Effects are created after variables exist via on_variables_created() + if self._model._dce_mode: + return + + # === Traditional (non-DCE) variable/constraint creation === + # Main flow rate variable self.add_variables( lower=self.absolute_flow_rate_bounds[0], @@ -958,6 +1379,11 @@ def __init__(self, model: FlowSystemModel, element: Bus): def _do_modeling(self): """Create variables, constraints, and nested submodels""" super()._do_modeling() + + # In DCE mode, skip constraint creation - constraints will be added later + if self._model._dce_mode: + return + # inputs == outputs for flow in self.element.inputs + self.element.outputs: self.register_variable(flow.submodel.flow_rate, flow.label_full) @@ -1045,6 +1471,10 @@ def _do_modeling(self): for flow in all_flows: self.add_submodels(flow.create_model(self._model), short_name=flow.label) + # In DCE mode, skip constraint creation - constraints will be added later + if self._model._dce_mode: + return + # Create component status variable and StatusModel if needed if self.element.status_parameters: status = self.add_variables( diff --git a/flixopt/structure.py b/flixopt/structure.py index d165667bb..9515fbe4b 100644 --- a/flixopt/structure.py +++ b/flixopt/structure.py @@ -200,6 +200,7 @@ def __init__(self, flow_system: FlowSystem): self.effects: EffectCollectionModel | None = None self.submodels: Submodels = Submodels({}) self.variable_categories: dict[str, VariableCategory] = {} + self._dce_mode: bool = False # When True, elements skip _do_modeling() def add_variables( self, @@ -240,6 +241,152 @@ def _populate_element_variable_names(self): element._variable_names = list(element.submodel.variables) element._constraint_names = list(element.submodel.constraints) + def do_modeling_dce(self, timing: bool = False): + """Build the model using the DCE (Declaration-Collection-Execution) pattern. + + This is an alternative to `do_modeling()` that uses vectorized batch creation + of variables and constraints for better performance with large systems. + + The DCE pattern has three phases: + 1. DECLARATION: Elements declare what variables/constraints they need + 2. COLLECTION: Registries group declarations by category + 3. EXECUTION: Batch-create variables/constraints per category + + Args: + timing: If True, print detailed timing breakdown. + + Note: + This method is experimental. Use `do_modeling()` for production. + Not all element types support DCE yet - those that don't will + fall back to individual creation. + """ + import time + + from .vectorized import ConstraintRegistry, EffectShareRegistry, VariableRegistry + + timings = {} + + def record(name): + timings[name] = time.perf_counter() + + record('start') + + # Enable DCE mode - elements will skip _do_modeling() variable creation + self._dce_mode = True + + # Initialize registries + variable_registry = VariableRegistry(self) + self._variable_registry = variable_registry # Store for later access + + record('registry_init') + + # Create effect models first (they don't use DCE yet) + self.effects = self.flow_system.effects.create_model(self) + + record('effects') + + # Phase 1: DECLARATION + # Create element models and collect their declarations + logger.debug('DCE Phase 1: Declaration') + element_models = [] + + for component in self.flow_system.components.values(): + component.create_model(self) + # Component creates flow models as submodels + for flow in component.inputs + component.outputs: + if hasattr(flow.submodel, 'declare_variables'): + for spec in flow.submodel.declare_variables(): + variable_registry.register(spec) + element_models.append(flow.submodel) + + record('components') + + for bus in self.flow_system.buses.values(): + bus.create_model(self) + # Bus doesn't use DCE yet - uses traditional approach + + record('buses') + + # Phase 2: COLLECTION (implicit in registries) + logger.debug(f'DCE Phase 2: Collection - {variable_registry}') + + # Phase 3: EXECUTION (Variables) + logger.debug('DCE Phase 3: Execution (Variables)') + variable_registry.create_all() + + record('var_creation') + + # Distribute handles to elements + for element_model in element_models: + handles = variable_registry.get_handles_for_element(element_model.label_full) + element_model.on_variables_created(handles) + + record('handle_distribution') + + # Phase 3: EXECUTION (Effect Shares) + logger.debug('DCE Phase 3: Execution (Effect Shares)') + effect_share_registry = EffectShareRegistry(self, variable_registry) + self._effect_share_registry = effect_share_registry + + for element_model in element_models: + if hasattr(element_model, 'declare_effect_shares'): + for spec in element_model.declare_effect_shares(): + effect_share_registry.register(spec) + + effect_share_registry.create_all() + + record('effect_shares') + + # Phase 3: EXECUTION (Constraints) + logger.debug('DCE Phase 3: Execution (Constraints)') + constraint_registry = ConstraintRegistry(self, variable_registry) + self._constraint_registry = constraint_registry + + for element_model in element_models: + if hasattr(element_model, 'declare_constraints'): + for spec in element_model.declare_constraints(): + constraint_registry.register(spec) + + constraint_registry.create_all() + + record('constraint_creation') + + # Finalize DCE - create submodels that weren't batch-created (e.g., StatusModel) + for element_model in element_models: + if hasattr(element_model, 'finalize_dce'): + element_model.finalize_dce() + + record('finalize_dce') + + # Post-processing + self._add_scenario_equality_constraints() + self._populate_element_variable_names() + + record('end') + + if timing: + print('\n DCE Timing Breakdown:') + prev = timings['start'] + for name in [ + 'registry_init', + 'effects', + 'components', + 'buses', + 'var_creation', + 'handle_distribution', + 'effect_shares', + 'constraint_creation', + 'finalize_dce', + 'end', + ]: + elapsed = (timings[name] - prev) * 1000 + print(f' {name:25s}: {elapsed:8.2f}ms') + prev = timings[name] + total = (timings['end'] - timings['start']) * 1000 + print(f' {"TOTAL":25s}: {total:8.2f}ms') + + logger.info(f'DCE modeling complete: {len(self.variables)} variables, {len(self.constraints)} constraints') + def _add_scenario_equality_for_parameter_type( self, parameter_type: Literal['flow_rate', 'size'], @@ -2000,3 +2147,40 @@ def results_structure(self): 'variables': list(self.variables), 'constraints': list(self.constraints), } + + # ========================================================================= + # DCE Pattern: Declaration-Collection-Execution + # Override these methods in subclasses to use the DCE pattern + # ========================================================================= + + def declare_variables(self) -> list: + """Declare variables needed by this element for batch creation. + + Override in subclasses to return a list of VariableSpec objects. + These specs will be collected by VariableRegistry and batch-created. + + Returns: + List of VariableSpec objects (empty by default). + """ + return [] + + def declare_constraints(self) -> list: + """Declare constraints needed by this element for batch creation. + + Override in subclasses to return a list of ConstraintSpec objects. + The build_fn in each spec will be called after variables exist. + + Returns: + List of ConstraintSpec objects (empty by default). + """ + return [] + + def on_variables_created(self, handles: dict) -> None: + """Called after batch variable creation with handles to element's variables. + + Override in subclasses to store handles for use in constraint building. + + Args: + handles: Dict mapping category name to VariableHandle. + """ + pass diff --git a/flixopt/vectorized.py b/flixopt/vectorized.py new file mode 100644 index 000000000..c7406677a --- /dev/null +++ b/flixopt/vectorized.py @@ -0,0 +1,1198 @@ +""" +Vectorized modeling infrastructure for flixopt. + +This module implements the Declaration-Collection-Execution (DCE) pattern +for efficient batch creation of variables and constraints across many elements. + +Key concepts: +- VariableSpec: Immutable specification of a variable an element needs +- ConstraintSpec: Specification of a constraint with deferred evaluation +- VariableRegistry: Collects specs and batch-creates variables +- ConstraintRegistry: Collects specs and batch-creates constraints +- VariableHandle: Provides element access to their slice of batched variables + +Usage: + Elements declare what they need via `declare_variables()` and `declare_constraints()`. + The FlowSystemModel collects all declarations, then batch-creates them. + Elements receive handles to access their variables. +""" + +from __future__ import annotations + +import logging +from collections import defaultdict +from dataclasses import dataclass +from typing import TYPE_CHECKING, Any, Literal + +import numpy as np +import pandas as pd +import xarray as xr + +if TYPE_CHECKING: + from collections.abc import Callable + + import linopy + + from .structure import FlowSystemModel, VariableCategory + +logger = logging.getLogger('flixopt') + + +# ============================================================================= +# Specifications (Declaration Phase) +# ============================================================================= + + +@dataclass(frozen=True) +class VariableSpec: + """Immutable specification of a variable an element needs. + + This is a declaration - no linopy calls are made when creating a VariableSpec. + The spec is later collected by a VariableRegistry and batch-created with + other specs of the same category. + + Attributes: + category: Variable category for grouping (e.g., 'flow_rate', 'status'). + All specs with the same category are batch-created together. + element_id: Unique identifier of the element (e.g., 'Boiler(Q_th)'). + Used as a coordinate in the batched variable. + lower: Lower bound (scalar, array, or DataArray). Default -inf. + upper: Upper bound (scalar, array, or DataArray). Default +inf. + integer: If True, variable is integer-valued. + binary: If True, variable is binary (0 or 1). + dims: Dimensions this variable spans beyond 'element'. + Common values: ('time',), ('time', 'scenario'), ('period', 'scenario'), (). + mask: Optional mask for sparse creation (True = create, False = skip). + var_category: VariableCategory enum for segment expansion handling. + + Example: + >>> spec = VariableSpec( + ... category='flow_rate', + ... element_id='Boiler(Q_th)', + ... lower=0, + ... upper=100, + ... dims=('time', 'scenario'), + ... ) + """ + + category: str + element_id: str + lower: float | xr.DataArray = -np.inf + upper: float | xr.DataArray = np.inf + integer: bool = False + binary: bool = False + dims: tuple[str, ...] = ('time',) + mask: xr.DataArray | None = None + var_category: VariableCategory | None = None + + +@dataclass +class ConstraintSpec: + """Specification of a constraint with deferred evaluation. + + The constraint expression is not built until variables exist. A build + function is provided that will be called during the execution phase. + + Attributes: + category: Constraint category for grouping (e.g., 'flow_rate_bounds'). + element_id: Unique identifier of the element. + build_fn: Callable that builds the constraint. Called as: + build_fn(model, handles) -> ConstraintResult + where handles is a dict mapping category -> VariableHandle. + sense: Constraint sense ('==', '<=', '>='). + mask: Optional mask for sparse creation. + + Example: + >>> def build_flow_bounds(model, handles): + ... flow_rate = handles['flow_rate'].variable + ... return ConstraintResult( + ... lhs=flow_rate, + ... rhs=100, + ... sense='<=', + ... ) + >>> spec = ConstraintSpec( + ... category='flow_rate_upper', + ... element_id='Boiler(Q_th)', + ... build_fn=build_flow_bounds, + ... ) + """ + + category: str + element_id: str + build_fn: Callable[[FlowSystemModel, dict[str, VariableHandle]], ConstraintResult] + mask: xr.DataArray | None = None + + +@dataclass +class ConstraintResult: + """Result of a constraint build function. + + Attributes: + lhs: Left-hand side expression (linopy Variable or LinearExpression). + rhs: Right-hand side (expression, scalar, or DataArray). + sense: Constraint sense ('==', '<=', '>='). + """ + + lhs: linopy.Variable | linopy.expressions.LinearExpression | xr.DataArray + rhs: linopy.Variable | linopy.expressions.LinearExpression | xr.DataArray | float + sense: Literal['==', '<=', '>='] = '==' + + +@dataclass +class EffectShareSpec: + """Specification of an effect share for batch creation. + + Effect shares link flow rates to effects (costs, emissions, etc.). + Instead of creating them one at a time, we collect specs and batch-create. + + Attributes: + element_id: The flow's unique identifier (e.g., 'Boiler(gas_in)'). + effect_name: The effect to add to (e.g., 'costs', 'CO2'). + factor: Multiplier for flow_rate * timestep_duration. + target: 'temporal' for time-varying or 'periodic' for period totals. + """ + + element_id: str + effect_name: str + factor: float | xr.DataArray + target: Literal['temporal', 'periodic'] = 'temporal' + + +# ============================================================================= +# Variable Handle (Element Access) +# ============================================================================= + + +@dataclass +class VariableHandle: + """Handle providing element access to a batched variable. + + When variables are batch-created across elements, each element needs + a way to access its slice. The handle stores a reference to the + element's portion of the batched variable. + + Attributes: + variable: The element's slice of the batched variable. + This is typically `batched_var.sel(element=element_id)`. + category: The variable category this handle is for. + element_id: The element this handle belongs to. + full_variable: Optional reference to the full batched variable. + + Example: + >>> handle = registry.get_handle('flow_rate', 'Boiler(Q_th)') + >>> flow_rate = handle.variable # Access the variable + >>> total = flow_rate.sum('time') # Use in expressions + """ + + variable: linopy.Variable + category: str + element_id: str + full_variable: linopy.Variable | None = None + + def __repr__(self) -> str: + dims = list(self.variable.dims) if hasattr(self.variable, 'dims') else [] + return f"VariableHandle(category='{self.category}', element='{self.element_id}', dims={dims})" + + +# ============================================================================= +# Variable Registry (Collection & Execution) +# ============================================================================= + + +class VariableRegistry: + """Collects variable specifications and batch-creates them. + + The registry implements the Collection and Execution phases of DCE: + 1. Elements register their VariableSpecs via `register()` + 2. `create_all()` groups specs by category and batch-creates them + 3. Elements retrieve handles via `get_handle()` + + Variables are created with an 'element' dimension containing all element IDs + for that category. Each element then gets a handle to its slice. + + Attributes: + model: The FlowSystemModel to create variables in. + + Example: + >>> registry = VariableRegistry(model) + >>> registry.register(VariableSpec(category='flow_rate', element_id='Boiler', ...)) + >>> registry.register(VariableSpec(category='flow_rate', element_id='CHP', ...)) + >>> registry.create_all() # Creates one variable with element=['Boiler', 'CHP'] + >>> handle = registry.get_handle('flow_rate', 'Boiler') + """ + + def __init__(self, model: FlowSystemModel): + self.model = model + self._specs_by_category: dict[str, list[VariableSpec]] = defaultdict(list) + self._handles: dict[str, dict[str, VariableHandle]] = {} # category -> element_id -> handle + self._full_variables: dict[str, linopy.Variable] = {} # category -> full batched variable + self._created = False + + def register(self, spec: VariableSpec) -> None: + """Register a variable specification for batch creation. + + Args: + spec: The variable specification to register. + + Raises: + RuntimeError: If variables have already been created. + ValueError: If element_id is already registered for this category. + """ + if self._created: + raise RuntimeError('Cannot register specs after variables have been created') + + # Check for duplicate element_id in same category + existing_ids = {s.element_id for s in self._specs_by_category[spec.category]} + if spec.element_id in existing_ids: + raise ValueError(f"Element '{spec.element_id}' already registered for category '{spec.category}'") + + self._specs_by_category[spec.category].append(spec) + + def create_all(self) -> None: + """Batch-create all registered variables. + + Groups specs by category and creates one linopy variable per category + with an 'element' dimension. Creates handles for each element. + + Raises: + RuntimeError: If already called. + """ + if self._created: + raise RuntimeError('Variables have already been created') + + for category, specs in self._specs_by_category.items(): + if specs: + self._create_batch(category, specs) + + self._created = True + logger.debug( + f'VariableRegistry created {len(self._full_variables)} batched variables ' + f'for {sum(len(h) for h in self._handles.values())} elements' + ) + + def _create_batch(self, category: str, specs: list[VariableSpec]) -> None: + """Create all variables of a category in one linopy call. + + Args: + category: The variable category name. + specs: List of specs for this category. + """ + if not specs: + return + + # Extract element IDs and verify homogeneity + element_ids = [s.element_id for s in specs] + reference_spec = specs[0] + + # Verify all specs have same dims, binary, integer flags + for spec in specs[1:]: + if spec.dims != reference_spec.dims: + raise ValueError( + f"Inconsistent dims in category '{category}': " + f"'{spec.element_id}' has {spec.dims}, " + f"'{reference_spec.element_id}' has {reference_spec.dims}" + ) + if spec.binary != reference_spec.binary: + raise ValueError(f"Inconsistent binary flag in category '{category}'") + if spec.integer != reference_spec.integer: + raise ValueError(f"Inconsistent integer flag in category '{category}'") + + # Build coordinates: element + model dimensions + coords = self._build_coords(element_ids, reference_spec.dims) + + # Stack bounds into arrays with element dimension + # Note: Binary variables cannot have explicit bounds in linopy + if reference_spec.binary: + lower = None + upper = None + else: + lower = self._stack_bounds([s.lower for s in specs], element_ids, reference_spec.dims) + upper = self._stack_bounds([s.upper for s in specs], element_ids, reference_spec.dims) + + # Combine masks if any + mask = self._combine_masks(specs, element_ids, reference_spec.dims) + + # Build kwargs, only including bounds for non-binary variables + kwargs = { + 'coords': coords, + 'name': category, + 'binary': reference_spec.binary, + 'integer': reference_spec.integer, + 'mask': mask, + } + if lower is not None: + kwargs['lower'] = lower + if upper is not None: + kwargs['upper'] = upper + + # Single linopy call for all elements! + variable = self.model.add_variables(**kwargs) + + # Register category if specified + if reference_spec.var_category is not None: + self.model.variable_categories[variable.name] = reference_spec.var_category + + # Store full variable + self._full_variables[category] = variable + + # Create handles for each element + self._handles[category] = {} + for spec in specs: + element_slice = variable.sel(element=spec.element_id) + handle = VariableHandle( + variable=element_slice, + category=category, + element_id=spec.element_id, + full_variable=variable, + ) + self._handles[category][spec.element_id] = handle + + def _build_coords(self, element_ids: list[str], dims: tuple[str, ...]) -> xr.Coordinates: + """Build coordinate dict with element dimension + model dimensions. + + Args: + element_ids: List of element identifiers. + dims: Tuple of dimension names from the model. + + Returns: + xarray Coordinates with 'element' + requested dims. + """ + # Start with element dimension + coord_dict = {'element': pd.Index(element_ids, name='element')} + + # Add model dimensions + model_coords = self.model.get_coords(dims=dims) + if model_coords is not None: + for dim in dims: + if dim in model_coords: + coord_dict[dim] = model_coords[dim] + + return xr.Coordinates(coord_dict) + + def _stack_bounds( + self, + bounds: list[float | xr.DataArray], + element_ids: list[str], + dims: tuple[str, ...], + ) -> xr.DataArray | float: + """Stack per-element bounds into array with element dimension. + + Args: + bounds: List of bounds (one per element). + element_ids: List of element identifiers. + dims: Dimension tuple for the variable. + + Returns: + Stacked DataArray with element dimension, or scalar if all identical. + """ + # Extract scalar values from 0-d DataArrays or plain scalars + scalar_values = [] + has_multidim = False + + for b in bounds: + if isinstance(b, xr.DataArray): + if b.ndim == 0: + # 0-d DataArray - extract scalar + scalar_values.append(float(b.values)) + else: + # Multi-dimensional - need full concat + has_multidim = True + break + else: + scalar_values.append(float(b)) + + # Fast path: all scalars (including 0-d DataArrays) + if not has_multidim: + # Check if all identical (common case: all 0 or all inf) + unique_values = set(scalar_values) + if len(unique_values) == 1: + return scalar_values[0] # Return scalar - linopy will broadcast + + # Build array directly from scalars + return xr.DataArray( + np.array(scalar_values), + coords={'element': element_ids}, + dims=['element'], + ) + + # Slow path: need full concat for multi-dimensional bounds + arrays_to_stack = [] + for bound, eid in zip(bounds, element_ids, strict=False): + if isinstance(bound, xr.DataArray): + arr = bound.expand_dims(element=[eid]) + else: + arr = xr.DataArray(bound, coords={'element': [eid]}, dims=['element']) + arrays_to_stack.append(arr) + + stacked = xr.concat(arrays_to_stack, dim='element') + + # Ensure element is first dimension for consistency + if 'element' in stacked.dims and stacked.dims[0] != 'element': + dim_order = ['element'] + [d for d in stacked.dims if d != 'element'] + stacked = stacked.transpose(*dim_order) + + return stacked + + def _combine_masks( + self, + specs: list[VariableSpec], + element_ids: list[str], + dims: tuple[str, ...], + ) -> xr.DataArray | None: + """Combine per-element masks into a single mask array. + + Args: + specs: List of variable specs. + element_ids: List of element identifiers. + dims: Dimension tuple. + + Returns: + Combined mask DataArray, or None if no masks specified. + """ + masks = [s.mask for s in specs] + if all(m is None for m in masks): + return None + + # Build mask array + mask_arrays = [] + for mask, eid in zip(masks, element_ids, strict=False): + if mask is None: + # No mask = all True + arr = xr.DataArray(True, coords={'element': [eid]}, dims=['element']) + else: + arr = mask.expand_dims(element=[eid]) + mask_arrays.append(arr) + + combined = xr.concat(mask_arrays, dim='element') + return combined + + def get_handle(self, category: str, element_id: str) -> VariableHandle: + """Get the handle for an element's variable. + + Args: + category: Variable category. + element_id: Element identifier. + + Returns: + VariableHandle for the element. + + Raises: + KeyError: If category or element_id not found. + """ + if category not in self._handles: + available = list(self._handles.keys()) + raise KeyError(f"Category '{category}' not found. Available: {available}") + + if element_id not in self._handles[category]: + available = list(self._handles[category].keys()) + raise KeyError(f"Element '{element_id}' not found in category '{category}'. Available: {available}") + + return self._handles[category][element_id] + + def get_handles_for_element(self, element_id: str) -> dict[str, VariableHandle]: + """Get all handles for a specific element. + + Args: + element_id: Element identifier. + + Returns: + Dict mapping category -> VariableHandle for this element. + """ + handles = {} + for category, element_handles in self._handles.items(): + if element_id in element_handles: + handles[category] = element_handles[element_id] + return handles + + def get_full_variable(self, category: str) -> linopy.Variable: + """Get the full batched variable for a category. + + Args: + category: Variable category. + + Returns: + The full linopy Variable with element dimension. + + Raises: + KeyError: If category not found. + """ + if category not in self._full_variables: + available = list(self._full_variables.keys()) + raise KeyError(f"Category '{category}' not found. Available: {available}") + return self._full_variables[category] + + def get_element_ids(self, category: str) -> list[str]: + """Get the list of element IDs for a category. + + Args: + category: Variable category. + + Returns: + List of element IDs in the order they appear in the batched variable. + + Raises: + KeyError: If category not found. + """ + if category not in self._handles: + available = list(self._handles.keys()) + raise KeyError(f"Category '{category}' not found. Available: {available}") + return list(self._handles[category].keys()) + + @property + def categories(self) -> list[str]: + """List of all registered categories.""" + return list(self._specs_by_category.keys()) + + @property + def element_count(self) -> int: + """Total number of element registrations across all categories.""" + return sum(len(specs) for specs in self._specs_by_category.values()) + + def __repr__(self) -> str: + status = 'created' if self._created else 'pending' + return ( + f'VariableRegistry(categories={len(self._specs_by_category)}, ' + f'elements={self.element_count}, status={status})' + ) + + +# ============================================================================= +# Constraint Registry (Collection & Execution) +# ============================================================================= + + +class ConstraintRegistry: + """Collects constraint specifications and batch-creates them. + + Constraints are evaluated after variables exist. The build function + in each spec is called to generate the constraint expression. + + Attributes: + model: The FlowSystemModel to create constraints in. + variable_registry: The VariableRegistry to get handles from. + + Example: + >>> registry = ConstraintRegistry(model, var_registry) + >>> registry.register( + ... ConstraintSpec( + ... category='flow_bounds', + ... element_id='Boiler', + ... build_fn=lambda m, h: ConstraintResult(h['flow_rate'].variable, 100, '<='), + ... ) + ... ) + >>> registry.create_all() + """ + + def __init__(self, model: FlowSystemModel, variable_registry: VariableRegistry): + self.model = model + self.variable_registry = variable_registry + self._specs_by_category: dict[str, list[ConstraintSpec]] = defaultdict(list) + self._created = False + + def register(self, spec: ConstraintSpec) -> None: + """Register a constraint specification for batch creation. + + Args: + spec: The constraint specification to register. + + Raises: + RuntimeError: If constraints have already been created. + """ + if self._created: + raise RuntimeError('Cannot register specs after constraints have been created') + self._specs_by_category[spec.category].append(spec) + + def create_all(self) -> None: + """Batch-create all registered constraints. + + Calls each spec's build function with the model and variable handles, + then groups results by category for batch creation. + + Raises: + RuntimeError: If already called. + """ + if self._created: + raise RuntimeError('Constraints have already been created') + + for category, specs in self._specs_by_category.items(): + if specs: + self._create_batch(category, specs) + + self._created = True + logger.debug(f'ConstraintRegistry created {len(self._specs_by_category)} constraint categories') + + def _create_batch(self, category: str, specs: list[ConstraintSpec]) -> None: + """Create all constraints of a category. + + Attempts to use true vectorized batching for known constraint patterns. + Falls back to individual creation for complex constraints. + + Args: + category: The constraint category name. + specs: List of specs for this category. + """ + # Try vectorized batching for known patterns + if self._try_vectorized_batch(category, specs): + return + + # Fall back to individual creation + self._create_individual(category, specs) + + def _try_vectorized_batch(self, category: str, specs: list[ConstraintSpec]) -> bool: + """Try to create constraints using true vectorized batching. + + Returns True if successful, False to fall back to individual creation. + """ + # Known batchable constraint patterns + if category == 'total_flow_hours_eq': + return self._batch_total_flow_hours_eq(specs) + elif category == 'flow_hours_over_periods_eq': + return self._batch_flow_hours_over_periods_eq(specs) + elif category == 'flow_rate_ub': + return self._batch_flow_rate_ub(specs) + elif category == 'flow_rate_lb': + return self._batch_flow_rate_lb(specs) + elif category == 'flow_rate_scaled_ub': + return self._batch_flow_rate_scaled_ub(specs) + elif category == 'flow_rate_scaled_lb': + return self._batch_flow_rate_scaled_lb(specs) + elif category == 'size_invested_ub': + return self._batch_size_invested_ub(specs) + elif category == 'size_invested_lb': + return self._batch_size_invested_lb(specs) + + return False + + def _get_flow_elements(self) -> dict[str, Any]: + """Build a mapping from element_id (label_full) to Flow element.""" + if not hasattr(self, '_flow_element_map'): + self._flow_element_map = {} + for comp in self.model.flow_system.components.values(): + for flow in comp.inputs + comp.outputs: + self._flow_element_map[flow.label_full] = flow + return self._flow_element_map + + def _batch_total_flow_hours_eq(self, specs: list[ConstraintSpec]) -> bool: + """Batch create: total_flow_hours = sum_temporal(flow_rate)""" + try: + # Get full batched variables + flow_rate = self.variable_registry.get_full_variable('flow_rate') + total_flow_hours = self.variable_registry.get_full_variable('total_flow_hours') + + # Vectorized sum across time dimension + rhs = self.model.sum_temporal(flow_rate) + + # Single constraint call for all elements + self.model.add_constraints(total_flow_hours == rhs, name='total_flow_hours_eq') + + logger.debug(f'Batched {len(specs)} total_flow_hours_eq constraints') + return True + except Exception as e: + logger.warning(f'Failed to batch total_flow_hours_eq, falling back: {e}') + return False + + def _batch_flow_hours_over_periods_eq(self, specs: list[ConstraintSpec]) -> bool: + """Batch create: flow_hours_over_periods = sum(total_flow_hours * period_weight)""" + try: + # Get full batched variables + total_flow_hours = self.variable_registry.get_full_variable('total_flow_hours') + flow_hours_over_periods = self.variable_registry.get_full_variable('flow_hours_over_periods') + + # Vectorized weighted sum + period_weights = self.model.flow_system.period_weights + if period_weights is None: + period_weights = 1.0 + weighted = (total_flow_hours * period_weights).sum('period') + + # Single constraint call for all elements + self.model.add_constraints(flow_hours_over_periods == weighted, name='flow_hours_over_periods_eq') + + logger.debug(f'Batched {len(specs)} flow_hours_over_periods_eq constraints') + return True + except Exception as e: + logger.warning(f'Failed to batch flow_hours_over_periods_eq, falling back: {e}') + return False + + def _batch_flow_rate_ub(self, specs: list[ConstraintSpec]) -> bool: + """Batch create: flow_rate <= status * size * relative_max""" + try: + # Get element_ids from specs (subset of all flows - only those with status) + spec_element_ids = [spec.element_id for spec in specs] + + # Get full batched variables and select only relevant elements + flow_rate_full = self.variable_registry.get_full_variable('flow_rate') + status_full = self.variable_registry.get_full_variable('status') + + flow_rate = flow_rate_full.sel(element=spec_element_ids) + status = status_full.sel(element=spec_element_ids) + + # Build upper bounds array from flow elements + flow_elements = self._get_flow_elements() + upper_bounds = xr.concat( + [flow_elements[eid].size * flow_elements[eid].relative_maximum for eid in spec_element_ids], + dim='element', + ).assign_coords(element=spec_element_ids) + + # Create vectorized constraint: flow_rate <= status * upper_bounds + rhs = status * upper_bounds + self.model.add_constraints(flow_rate <= rhs, name='flow_rate_ub') + + logger.debug(f'Batched {len(specs)} flow_rate_ub constraints') + return True + except Exception as e: + logger.warning(f'Failed to batch flow_rate_ub, falling back: {e}') + return False + + def _batch_flow_rate_lb(self, specs: list[ConstraintSpec]) -> bool: + """Batch create: flow_rate >= status * epsilon""" + try: + from .config import CONFIG + + # Get element_ids from specs (subset of all flows - only those with status) + spec_element_ids = [spec.element_id for spec in specs] + + # Get full batched variables and select only relevant elements + flow_rate_full = self.variable_registry.get_full_variable('flow_rate') + status_full = self.variable_registry.get_full_variable('status') + + flow_rate = flow_rate_full.sel(element=spec_element_ids) + status = status_full.sel(element=spec_element_ids) + + # Build lower bounds array from flow elements + # epsilon = max(CONFIG.Modeling.epsilon, size * relative_minimum) + flow_elements = self._get_flow_elements() + lower_bounds = xr.concat( + [ + np.maximum( + CONFIG.Modeling.epsilon, + flow_elements[eid].size * flow_elements[eid].relative_minimum, + ) + for eid in spec_element_ids + ], + dim='element', + ).assign_coords(element=spec_element_ids) + + # Create vectorized constraint: flow_rate >= status * lower_bounds + rhs = status * lower_bounds + self.model.add_constraints(flow_rate >= rhs, name='flow_rate_lb') + + logger.debug(f'Batched {len(specs)} flow_rate_lb constraints') + return True + except Exception as e: + logger.warning(f'Failed to batch flow_rate_lb, falling back: {e}') + return False + + def _batch_flow_rate_scaled_ub(self, specs: list[ConstraintSpec]) -> bool: + """Batch create: flow_rate <= size * relative_max (investment-scaled bounds)""" + try: + spec_element_ids = [spec.element_id for spec in specs] + + flow_rate_full = self.variable_registry.get_full_variable('flow_rate') + size_full = self.variable_registry.get_full_variable('size') + + flow_rate = flow_rate_full.sel(element=spec_element_ids) + size = size_full.sel(element=spec_element_ids) + + # Build relative_max array from flow elements + flow_elements = self._get_flow_elements() + rel_max = xr.concat( + [flow_elements[eid].relative_maximum for eid in spec_element_ids], + dim='element', + ).assign_coords(element=spec_element_ids) + + rhs = size * rel_max + self.model.add_constraints(flow_rate <= rhs, name='flow_rate_scaled_ub') + + logger.debug(f'Batched {len(specs)} flow_rate_scaled_ub constraints') + return True + except Exception as e: + logger.warning(f'Failed to batch flow_rate_scaled_ub, falling back: {e}') + return False + + def _batch_flow_rate_scaled_lb(self, specs: list[ConstraintSpec]) -> bool: + """Batch create: flow_rate >= size * relative_min (investment-scaled bounds)""" + try: + spec_element_ids = [spec.element_id for spec in specs] + + flow_rate_full = self.variable_registry.get_full_variable('flow_rate') + size_full = self.variable_registry.get_full_variable('size') + + flow_rate = flow_rate_full.sel(element=spec_element_ids) + size = size_full.sel(element=spec_element_ids) + + # Build relative_min array from flow elements + flow_elements = self._get_flow_elements() + rel_min = xr.concat( + [flow_elements[eid].relative_minimum for eid in spec_element_ids], + dim='element', + ).assign_coords(element=spec_element_ids) + + rhs = size * rel_min + self.model.add_constraints(flow_rate >= rhs, name='flow_rate_scaled_lb') + + logger.debug(f'Batched {len(specs)} flow_rate_scaled_lb constraints') + return True + except Exception as e: + logger.warning(f'Failed to batch flow_rate_scaled_lb, falling back: {e}') + return False + + def _batch_size_invested_ub(self, specs: list[ConstraintSpec]) -> bool: + """Batch create: size <= invested * maximum_size""" + try: + spec_element_ids = [spec.element_id for spec in specs] + + size_full = self.variable_registry.get_full_variable('size') + invested_full = self.variable_registry.get_full_variable('invested') + + size = size_full.sel(element=spec_element_ids) + invested = invested_full.sel(element=spec_element_ids) + + # Build max_size array from flow elements + flow_elements = self._get_flow_elements() + max_sizes = xr.concat( + [flow_elements[eid].size.maximum_or_fixed_size for eid in spec_element_ids], + dim='element', + ).assign_coords(element=spec_element_ids) + + rhs = invested * max_sizes + self.model.add_constraints(size <= rhs, name='size_invested_ub') + + logger.debug(f'Batched {len(specs)} size_invested_ub constraints') + return True + except Exception as e: + logger.warning(f'Failed to batch size_invested_ub, falling back: {e}') + return False + + def _batch_size_invested_lb(self, specs: list[ConstraintSpec]) -> bool: + """Batch create: size >= invested * minimum_size""" + try: + spec_element_ids = [spec.element_id for spec in specs] + + size_full = self.variable_registry.get_full_variable('size') + invested_full = self.variable_registry.get_full_variable('invested') + + size = size_full.sel(element=spec_element_ids) + invested = invested_full.sel(element=spec_element_ids) + + # Build min_size array from flow elements + flow_elements = self._get_flow_elements() + min_sizes = xr.concat( + [flow_elements[eid].size.minimum_or_fixed_size for eid in spec_element_ids], + dim='element', + ).assign_coords(element=spec_element_ids) + + rhs = invested * min_sizes + self.model.add_constraints(size >= rhs, name='size_invested_lb') + + logger.debug(f'Batched {len(specs)} size_invested_lb constraints') + return True + except Exception as e: + logger.warning(f'Failed to batch size_invested_lb, falling back: {e}') + return False + + def _create_individual(self, category: str, specs: list[ConstraintSpec]) -> None: + """Create constraints individually (fallback for complex constraints).""" + for spec in specs: + # Get handles for this element + handles = self.variable_registry.get_handles_for_element(spec.element_id) + + # Build the constraint + try: + result = spec.build_fn(self.model, handles) + except Exception as e: + raise RuntimeError( + f"Failed to build constraint '{category}' for element '{spec.element_id}': {e}" + ) from e + + # Create the constraint + constraint_name = f'{spec.element_id}|{category}' + if result.sense == '==': + self.model.add_constraints(result.lhs == result.rhs, name=constraint_name) + elif result.sense == '<=': + self.model.add_constraints(result.lhs <= result.rhs, name=constraint_name) + elif result.sense == '>=': + self.model.add_constraints(result.lhs >= result.rhs, name=constraint_name) + else: + raise ValueError(f'Invalid constraint sense: {result.sense}') + + @property + def categories(self) -> list[str]: + """List of all registered categories.""" + return list(self._specs_by_category.keys()) + + def __repr__(self) -> str: + status = 'created' if self._created else 'pending' + total_specs = sum(len(specs) for specs in self._specs_by_category.values()) + return f'ConstraintRegistry(categories={len(self._specs_by_category)}, specs={total_specs}, status={status})' + + +# ============================================================================= +# System Constraint Registry (Cross-Element Constraints) +# ============================================================================= + + +@dataclass +class SystemConstraintSpec: + """Specification for constraints that span multiple elements. + + These are constraints like bus balance that aggregate across elements. + + Attributes: + category: Constraint category (e.g., 'bus_balance'). + build_fn: Callable that builds the constraint. Called as: + build_fn(model, variable_registry) -> list[ConstraintResult] or ConstraintResult + """ + + category: str + build_fn: Callable[[FlowSystemModel, VariableRegistry], ConstraintResult | list[ConstraintResult]] + + +class SystemConstraintRegistry: + """Registry for system-wide constraints that span multiple elements. + + These constraints are created after element constraints and have access + to the full variable registry. + + Example: + >>> registry = SystemConstraintRegistry(model, var_registry) + >>> registry.register( + ... SystemConstraintSpec( + ... category='bus_balance', + ... build_fn=build_bus_balance, + ... ) + ... ) + >>> registry.create_all() + """ + + def __init__(self, model: FlowSystemModel, variable_registry: VariableRegistry): + self.model = model + self.variable_registry = variable_registry + self._specs: list[SystemConstraintSpec] = [] + self._created = False + + def register(self, spec: SystemConstraintSpec) -> None: + """Register a system constraint specification.""" + if self._created: + raise RuntimeError('Cannot register specs after constraints have been created') + self._specs.append(spec) + + def create_all(self) -> None: + """Create all registered system constraints.""" + if self._created: + raise RuntimeError('System constraints have already been created') + + for spec in self._specs: + try: + results = spec.build_fn(self.model, self.variable_registry) + except Exception as e: + raise RuntimeError(f"Failed to build system constraint '{spec.category}': {e}") from e + + # Handle single or multiple results + if isinstance(results, ConstraintResult): + results = [results] + + for i, result in enumerate(results): + name = f'{spec.category}' if len(results) == 1 else f'{spec.category}_{i}' + if result.sense == '==': + self.model.add_constraints(result.lhs == result.rhs, name=name) + elif result.sense == '<=': + self.model.add_constraints(result.lhs <= result.rhs, name=name) + elif result.sense == '>=': + self.model.add_constraints(result.lhs >= result.rhs, name=name) + + self._created = True + + def __repr__(self) -> str: + status = 'created' if self._created else 'pending' + return f'SystemConstraintRegistry(specs={len(self._specs)}, status={status})' + + +# ============================================================================= +# Effect Share Registry (Batch Effect Share Creation) +# ============================================================================= + + +class EffectShareRegistry: + """Collects effect share specifications and batch-creates them. + + Effect shares link flow rates to effects (costs, emissions, etc.). + Traditional approach creates them one at a time; this batches them. + + The key insight: all flow_rate variables are already batched with an + element dimension. We can create ONE effect share variable for all + flows contributing to an effect, then ONE constraint. + + Example: + >>> registry = EffectShareRegistry(model, var_registry) + >>> registry.register(EffectShareSpec('Boiler(gas_in)', 'costs', 30.0)) + >>> registry.register(EffectShareSpec('HeatPump(elec_in)', 'costs', 100.0)) + >>> registry.create_all() # One batched call instead of two! + """ + + def __init__(self, model: FlowSystemModel, variable_registry: VariableRegistry): + self.model = model + self.variable_registry = variable_registry + # Group by (effect_name, target) for batching + self._specs_by_effect: dict[tuple[str, str], list[EffectShareSpec]] = defaultdict(list) + self._created = False + + def register(self, spec: EffectShareSpec) -> None: + """Register an effect share specification.""" + if self._created: + raise RuntimeError('Cannot register specs after shares have been created') + key = (spec.effect_name, spec.target) + self._specs_by_effect[key].append(spec) + + def create_all(self) -> None: + """Batch-create all registered effect shares. + + For each (effect, target) combination: + 1. Build a factors array aligned with element dimension + 2. Compute batched expression: flow_rate * timestep_duration * factors + 3. Add ONE share to the effect with the sum across elements + """ + if self._created: + raise RuntimeError('Effect shares have already been created') + + for (effect_name, target), specs in self._specs_by_effect.items(): + self._create_batch(effect_name, target, specs) + + self._created = True + logger.debug(f'EffectShareRegistry created shares for {len(self._specs_by_effect)} effect/target combinations') + + def _create_batch(self, effect_name: str, target: str, specs: list[EffectShareSpec]) -> None: + """Create batched effect shares for one effect/target combination. + + The key insight: instead of creating one complex constraint with a sum + of 200+ terms, we create a BATCHED share variable with element dimension, + then ONE simple vectorized constraint where each entry is just: + share_var[e,t] = flow_rate[e,t] * timestep_duration * factor[e] + + This is much faster because linopy can process the simple per-element + constraint efficiently. + """ + import time + + logger.debug(f'_create_batch called for {effect_name}/{target} with {len(specs)} specs') + try: + # Get the full batched flow_rate variable + flow_rate = self.variable_registry.get_full_variable('flow_rate') + element_ids = self.variable_registry.get_element_ids('flow_rate') + + # Build factors: map element_id -> factor (0 for elements without effects) + spec_map = {spec.element_id: spec.factor for spec in specs} + factors_list = [spec_map.get(eid, 0) for eid in element_ids] + + # Stack factors into DataArray with element dimension + # xarray handles broadcasting of scalars and DataArrays automatically + factors_da = xr.concat( + [xr.DataArray(f) if not isinstance(f, xr.DataArray) else f for f in factors_list], + dim='element', + ).assign_coords(element=element_ids) + + # Compute batched expression: flow_rate * timestep_duration * factors + # Broadcasting handles (element, time, ...) * (element,) or (element, time, ...) + t1 = time.perf_counter() + expression = flow_rate * self.model.timestep_duration * factors_da + t2 = time.perf_counter() + + # Get the effect model + effect = self.model.effects.effects[effect_name] + + if target == 'temporal': + # Create ONE batched share variable with element dimension + # Combine element coord with temporal coords + temporal_coords = self.model.get_coords(self.model.temporal_dims) + share_var = self.model.add_variables( + coords=xr.Coordinates( + {'element': element_ids, **{dim: temporal_coords[dim] for dim in temporal_coords}} + ), + name=f'flow_effects->{effect_name}(temporal)', + ) + t3 = time.perf_counter() + + # ONE vectorized constraint (simple per-element equality) + self.model.add_constraints( + share_var == expression, + name=f'flow_effects->{effect_name}(temporal)', + ) + t4 = time.perf_counter() + + # Add sum of shares to the effect's total_per_timestep equation + # Sum across elements to get contribution at each timestep + effect.submodel.temporal._eq_total_per_timestep.lhs -= share_var.sum('element') + t5 = time.perf_counter() + + logger.debug( + f'{effect_name}: expr={(t2 - t1) * 1000:.1f}ms var={(t3 - t2) * 1000:.1f}ms con={(t4 - t3) * 1000:.1f}ms mod={(t5 - t4) * 1000:.1f}ms' + ) + + elif target == 'periodic': + # Similar for periodic, but sum over time first + all_coords = self.model.get_coords() + periodic_coords = {dim: all_coords[dim] for dim in ['period', 'scenario'] if dim in all_coords} + if periodic_coords: + periodic_coords['element'] = element_ids + + share_var = self.model.add_variables( + coords=xr.Coordinates(periodic_coords), + name=f'flow_effects->{effect_name}(periodic)', + ) + + # Sum expression over time + periodic_expression = expression.sum(self.model.temporal_dims) + + self.model.add_constraints( + share_var == periodic_expression, + name=f'flow_effects->{effect_name}(periodic)', + ) + + effect.submodel.periodic._eq_total.lhs -= share_var.sum('element') + + logger.debug(f'Batched {len(specs)} effect shares for {effect_name}/{target}') + + except Exception as e: + logger.warning(f'Failed to batch effect shares for {effect_name}/{target}: {e}') + # Fall back to individual creation + for spec in specs: + self._create_individual(effect_name, target, [spec]) + + def _create_individual(self, effect_name: str, target: str, specs: list[EffectShareSpec]) -> None: + """Fall back to individual effect share creation.""" + logger.debug(f'_create_individual called for {effect_name}/{target} with {len(specs)} specs') + for spec in specs: + handles = self.variable_registry.get_handles_for_element(spec.element_id) + if 'flow_rate' not in handles: + continue + + flow_rate = handles['flow_rate'].variable + expression = flow_rate * self.model.timestep_duration * spec.factor + + effect = self.model.effects.effects[effect_name] + if target == 'temporal': + effect.submodel.temporal.add_share( + spec.element_id, + expression, + dims=('time', 'period', 'scenario'), + ) + elif target == 'periodic': + periodic_expression = expression.sum(self.model.temporal_dims) + effect.submodel.periodic.add_share( + spec.element_id, + periodic_expression, + dims=('period', 'scenario'), + ) + + @property + def effect_count(self) -> int: + """Number of distinct effect/target combinations.""" + return len(self._specs_by_effect) + + @property + def total_specs(self) -> int: + """Total number of registered specs.""" + return sum(len(specs) for specs in self._specs_by_effect.values()) + + def __repr__(self) -> str: + status = 'created' if self._created else 'pending' + return f'EffectShareRegistry(effects={self.effect_count}, specs={self.total_specs}, status={status})' diff --git a/flixopt/vectorized_example.py b/flixopt/vectorized_example.py new file mode 100644 index 000000000..7284a269b --- /dev/null +++ b/flixopt/vectorized_example.py @@ -0,0 +1,490 @@ +""" +Proof-of-concept: DCE Pattern for Vectorized Modeling + +This example demonstrates how the Declaration-Collection-Execution (DCE) pattern +works with a simplified flow system. It shows: + +1. How elements declare their variables and constraints +2. How the FlowSystemModel orchestrates batch creation +3. The performance benefits of vectorization + +Run this file directly to see the pattern in action: + python -m flixopt.vectorized_example +""" + +from __future__ import annotations + +import time + +import linopy +import pandas as pd +import xarray as xr + +from .structure import VariableCategory +from .vectorized import ( + ConstraintRegistry, + ConstraintResult, + ConstraintSpec, + SystemConstraintRegistry, + VariableHandle, + VariableRegistry, + VariableSpec, +) + +# ============================================================================= +# Simplified Element Classes (Demonstrating DCE Pattern) +# ============================================================================= + + +class SimplifiedElementModel: + """Base class for element models using the DCE pattern. + + Key methods: + declare_variables(): Returns list of VariableSpec + declare_constraints(): Returns list of ConstraintSpec + on_variables_created(): Called with handles after batch creation + """ + + def __init__(self, element_id: str): + self.element_id = element_id + self._handles: dict[str, VariableHandle] = {} + + def declare_variables(self) -> list[VariableSpec]: + """Override to declare what variables this element needs.""" + return [] + + def declare_constraints(self) -> list[ConstraintSpec]: + """Override to declare what constraints this element needs.""" + return [] + + def on_variables_created(self, handles: dict[str, VariableHandle]) -> None: + """Called after batch creation with handles to our variables.""" + self._handles = handles + + def get_variable(self, category: str) -> linopy.Variable: + """Get this element's variable by category.""" + if category not in self._handles: + raise KeyError(f"No handle for category '{category}' in element '{self.element_id}'") + return self._handles[category].variable + + +class FlowModel(SimplifiedElementModel): + """Simplified Flow model demonstrating the DCE pattern.""" + + def __init__( + self, + element_id: str, + min_flow: float = 0.0, + max_flow: float = 100.0, + with_status: bool = False, + ): + super().__init__(element_id) + self.min_flow = min_flow + self.max_flow = max_flow + self.with_status = with_status + + def declare_variables(self) -> list[VariableSpec]: + specs = [] + + # Main flow rate variable + specs.append( + VariableSpec( + category='flow_rate', + element_id=self.element_id, + lower=self.min_flow if not self.with_status else 0.0, # If status, bounds via constraint + upper=self.max_flow, + dims=('time',), + var_category=VariableCategory.FLOW_RATE, + ) + ) + + # Status variable (if needed) + if self.with_status: + specs.append( + VariableSpec( + category='status', + element_id=self.element_id, + lower=0, + upper=1, + binary=True, + dims=('time',), + var_category=VariableCategory.STATUS, + ) + ) + + return specs + + def declare_constraints(self) -> list[ConstraintSpec]: + specs = [] + + if self.with_status: + # Flow rate upper bound: flow_rate <= status * max_flow + specs.append( + ConstraintSpec( + category='flow_rate_ub', + element_id=self.element_id, + build_fn=self._build_upper_bound, + ) + ) + + # Flow rate lower bound: flow_rate >= status * min_flow + specs.append( + ConstraintSpec( + category='flow_rate_lb', + element_id=self.element_id, + build_fn=self._build_lower_bound, + ) + ) + + return specs + + def _build_upper_bound(self, model, handles: dict[str, VariableHandle]) -> ConstraintResult: + flow_rate = handles['flow_rate'].variable + status = handles['status'].variable + return ConstraintResult( + lhs=flow_rate, + rhs=status * self.max_flow, + sense='<=', + ) + + def _build_lower_bound(self, model, handles: dict[str, VariableHandle]) -> ConstraintResult: + flow_rate = handles['flow_rate'].variable + status = handles['status'].variable + min_bound = max(self.min_flow, 1e-6) # Numerical stability + return ConstraintResult( + lhs=flow_rate, + rhs=status * min_bound, + sense='>=', + ) + + +class StorageModel(SimplifiedElementModel): + """Simplified Storage model demonstrating the DCE pattern.""" + + def __init__( + self, + element_id: str, + capacity: float = 1000.0, + max_charge_rate: float = 100.0, + max_discharge_rate: float = 100.0, + efficiency: float = 0.95, + ): + super().__init__(element_id) + self.capacity = capacity + self.max_charge_rate = max_charge_rate + self.max_discharge_rate = max_discharge_rate + self.efficiency = efficiency + + def declare_variables(self) -> list[VariableSpec]: + return [ + # State of charge + VariableSpec( + category='charge_state', + element_id=self.element_id, + lower=0, + upper=self.capacity, + dims=('time',), # In practice, would use extra_timestep + var_category=VariableCategory.CHARGE_STATE, + ), + # Charge rate + VariableSpec( + category='charge_rate', + element_id=self.element_id, + lower=0, + upper=self.max_charge_rate, + dims=('time',), + var_category=VariableCategory.FLOW_RATE, + ), + # Discharge rate + VariableSpec( + category='discharge_rate', + element_id=self.element_id, + lower=0, + upper=self.max_discharge_rate, + dims=('time',), + var_category=VariableCategory.FLOW_RATE, + ), + ] + + def declare_constraints(self) -> list[ConstraintSpec]: + return [ + ConstraintSpec( + category='energy_balance', + element_id=self.element_id, + build_fn=self._build_energy_balance, + ), + ] + + def _build_energy_balance(self, model, handles: dict[str, VariableHandle]) -> ConstraintResult: + """Energy balance: soc[t] = soc[t-1] + charge*eff - discharge/eff.""" + charge_state = handles['charge_state'].variable + charge_rate = handles['charge_rate'].variable + discharge_rate = handles['discharge_rate'].variable + + # For simplicity, assume timestep duration = 1 hour + # In practice, would get from model.timestep_duration + dt = 1.0 + + # soc[t] - soc[t-1] - charge*eff*dt + discharge*dt/eff = 0 + # Note: This is simplified - real implementation handles initial conditions + lhs = ( + charge_state.isel(time=slice(1, None)) + - charge_state.isel(time=slice(None, -1)) + - charge_rate.isel(time=slice(None, -1)) * self.efficiency * dt + + discharge_rate.isel(time=slice(None, -1)) * dt / self.efficiency + ) + + return ConstraintResult(lhs=lhs, rhs=0, sense='==') + + +# ============================================================================= +# Simplified FlowSystemModel with DCE Support +# ============================================================================= + + +class SimplifiedFlowSystemModel(linopy.Model): + """Simplified model demonstrating the DCE pattern orchestration. + + This shows how FlowSystemModel would be modified to support DCE. + """ + + def __init__(self, timesteps: pd.DatetimeIndex): + super().__init__(force_dim_names=True) + self.timesteps = timesteps + self.element_models: dict[str, SimplifiedElementModel] = {} + self.variable_categories: dict[str, VariableCategory] = {} + + # DCE Registries + self.variable_registry = VariableRegistry(self) + self.constraint_registry: ConstraintRegistry | None = None + self.system_constraint_registry: SystemConstraintRegistry | None = None + + def get_coords(self, dims: tuple[str, ...] | None = None) -> xr.Coordinates | None: + """Get model coordinates (simplified version).""" + coords = {'time': self.timesteps} + if dims is not None: + coords = {k: v for k, v in coords.items() if k in dims} + return xr.Coordinates(coords) if coords else None + + def add_element(self, model: SimplifiedElementModel) -> None: + """Add an element model.""" + self.element_models[model.element_id] = model + + def do_modeling_dce(self) -> None: + """Build the model using the DCE pattern. + + Phase 1: Declaration - Collect all specs from elements + Phase 2: Collection - Already done by registries + Phase 3: Execution - Batch create variables and constraints + """ + print('\n=== Phase 1: DECLARATION ===') + start = time.perf_counter() + + # Collect variable declarations + for element_id, model in self.element_models.items(): + for spec in model.declare_variables(): + self.variable_registry.register(spec) + print(f' Declared variables for: {element_id}') + + declaration_time = time.perf_counter() - start + print(f' Declaration time: {declaration_time * 1000:.2f}ms') + + print('\n=== Phase 2: COLLECTION (implicit) ===') + print(f' {self.variable_registry}') + + print('\n=== Phase 3: EXECUTION (Variables) ===') + start = time.perf_counter() + + # Batch create all variables + self.variable_registry.create_all() + + var_creation_time = time.perf_counter() - start + print(f' Variable creation time: {var_creation_time * 1000:.2f}ms') + + # Distribute handles to elements + for element_id, model in self.element_models.items(): + handles = self.variable_registry.get_handles_for_element(element_id) + model.on_variables_created(handles) + print(f' Distributed {len(handles)} handles to: {element_id}') + + print('\n=== Phase 3: EXECUTION (Constraints) ===') + start = time.perf_counter() + + # Now collect and create constraints + self.constraint_registry = ConstraintRegistry(self, self.variable_registry) + + for _element_id, model in self.element_models.items(): + for spec in model.declare_constraints(): + self.constraint_registry.register(spec) + + self.constraint_registry.create_all() + + constraint_time = time.perf_counter() - start + print(f' Constraint creation time: {constraint_time * 1000:.2f}ms') + + print('\n=== SUMMARY ===') + print(f' Variables: {len(self.variables)}') + print(f' Constraints: {len(self.constraints)}') + print(f' Categories in registry: {self.variable_registry.categories}') + + +# ============================================================================= +# Comparison: Old Pattern vs DCE Pattern +# ============================================================================= + + +def benchmark_old_pattern(n_elements: int, n_timesteps: int) -> float: + """Simulate the old pattern: individual variable/constraint creation.""" + model = linopy.Model(force_dim_names=True) + timesteps = pd.date_range('2024-01-01', periods=n_timesteps, freq='h') + + start = time.perf_counter() + + # Old pattern: create variables one at a time + for i in range(n_elements): + model.add_variables( + lower=0, + upper=100, + coords=xr.Coordinates({'time': timesteps}), + name=f'flow_rate_{i}', + ) + model.add_variables( + lower=0, + upper=1, + coords=xr.Coordinates({'time': timesteps}), + name=f'status_{i}', + binary=True, + ) + + # Create constraints one at a time + for i in range(n_elements): + flow_rate = model.variables[f'flow_rate_{i}'] + status = model.variables[f'status_{i}'] + model.add_constraints(flow_rate <= status * 100, name=f'ub_{i}') + model.add_constraints(flow_rate >= status * 1e-6, name=f'lb_{i}') + + elapsed = time.perf_counter() - start + return elapsed + + +def benchmark_dce_pattern(n_elements: int, n_timesteps: int) -> float: + """Benchmark the DCE pattern: batch variable/constraint creation.""" + model = linopy.Model(force_dim_names=True) + timesteps = pd.date_range('2024-01-01', periods=n_timesteps, freq='h') + + start = time.perf_counter() + + # DCE pattern: batch create variables + element_ids = [f'element_{i}' for i in range(n_elements)] + + # Single call for all flow_rate variables + model.add_variables( + lower=0, + upper=100, + coords=xr.Coordinates( + { + 'element': pd.Index(element_ids), + 'time': timesteps, + } + ), + name='flow_rate', + ) + + # Single call for all status variables + model.add_variables( + lower=0, + upper=1, + coords=xr.Coordinates( + { + 'element': pd.Index(element_ids), + 'time': timesteps, + } + ), + name='status', + binary=True, + ) + + # Batch constraints (vectorized across elements) + flow_rate = model.variables['flow_rate'] + status = model.variables['status'] + model.add_constraints(flow_rate <= status * 100, name='flow_rate_ub') + model.add_constraints(flow_rate >= status * 1e-6, name='flow_rate_lb') + + elapsed = time.perf_counter() - start + return elapsed + + +def run_benchmark(): + """Run benchmark comparing old vs DCE pattern.""" + print('\n' + '=' * 60) + print('BENCHMARK: Old Pattern vs DCE Pattern') + print('=' * 60) + + configs = [ + (10, 24), + (50, 168), + (100, 168), + (200, 168), + (500, 168), + ] + + print(f'\n{"Elements":>10} {"Timesteps":>10} {"Old (ms)":>12} {"DCE (ms)":>12} {"Speedup":>10}') + print('-' * 60) + + for n_elements, n_timesteps in configs: + # Run each benchmark 3 times and take minimum + old_times = [benchmark_old_pattern(n_elements, n_timesteps) for _ in range(3)] + dce_times = [benchmark_dce_pattern(n_elements, n_timesteps) for _ in range(3)] + + old_time = min(old_times) * 1000 # Convert to ms + dce_time = min(dce_times) * 1000 + speedup = old_time / dce_time if dce_time > 0 else float('inf') + + print(f'{n_elements:>10} {n_timesteps:>10} {old_time:>12.2f} {dce_time:>12.2f} {speedup:>10.1f}x') + + +def run_demo(): + """Run a demonstration of the DCE pattern.""" + print('\n' + '=' * 60) + print('DEMO: DCE Pattern with Simplified Elements') + print('=' * 60) + + # Create timesteps + timesteps = pd.date_range('2024-01-01', periods=24, freq='h') + + # Create model + model = SimplifiedFlowSystemModel(timesteps) + + # Add some flows + model.add_element(FlowModel('Boiler_Q_th', min_flow=10, max_flow=100, with_status=True)) + model.add_element(FlowModel('HeatPump_Q_th', min_flow=5, max_flow=50, with_status=True)) + model.add_element(FlowModel('Solar_Q_th', min_flow=0, max_flow=30, with_status=False)) + + # Add a storage + model.add_element(StorageModel('ThermalStorage', capacity=500)) + + # Build the model using DCE + model.do_modeling_dce() + + # Show that elements can access their variables + print('\n=== Element Variable Access ===') + boiler = model.element_models['Boiler_Q_th'] + print(f' Boiler flow_rate shape: {boiler.get_variable("flow_rate").shape}') + print(f' Boiler status shape: {boiler.get_variable("status").shape}') + + storage = model.element_models['ThermalStorage'] + print(f' Storage charge_state shape: {storage.get_variable("charge_state").shape}') + + # Show batched variables + print('\n=== Batched Variables in Registry ===') + flow_rate_full = model.variable_registry.get_full_variable('flow_rate') + print(f' flow_rate full shape: {flow_rate_full.shape}') + print(f' flow_rate dims: {flow_rate_full.dims}') + + status_full = model.variable_registry.get_full_variable('status') + print(f' status full shape: {status_full.shape}') + + +if __name__ == '__main__': + run_demo() + run_benchmark()