"""
Ebsilon Model Parser
This module defines the EbsilonModelParser class, which is used to parse Ebsilon models,
simulate them, extract data about components and connections, and write the data to a JSON file.
"""
import json
import logging
import os
from typing import Any
from exerpy.functions import convert_to_SI, fluid_property_data
from . import __ebsilon_available__, is_ebsilon_available
from .ebsilon_functions import calc_eph_from_min
from .utils import EpCalculationResultStatus2Stub, EpFluidTypeStub, EpGasTableStub, EpSteamTableStub, require_ebsilon
# Import Ebsilon classes if available
if __ebsilon_available__:
from EbsOpen import EpCalculationResultStatus2, EpFluidType, EpGasTable, EpSteamTable
from win32com.client import Dispatch
else:
EpFluidType = EpFluidTypeStub
EpSteamTable = EpSteamTableStub
EpGasTable = EpGasTableStub
EpCalculationResultStatus2 = EpCalculationResultStatus2Stub
from .ebsilon_config import (
composition_params,
connector_mapping,
ebs_objects,
fluid_type_index,
grouped_components,
non_thermodynamic_unit_operators,
two_phase_fluids_mapping,
unit_id_to_string,
)
# Configure logging to display info-level messages
logging.basicConfig(level=logging.ERROR)
[docs]
class EbsilonModelParser:
"""
A class to parse Ebsilon models, simulate them, extract data, and write to JSON.
"""
def __init__(self, model_path: str, split_physical_exergy: bool = True):
"""
Initializes the parser with the given model path.
Parameters:
model_path (str): Path to the Ebsilon model file.
split_physical_exergy (bool): Flag to split physical exergy into thermal and mechanical components.
Raises:
RuntimeError: If Ebsilon is not available but is required for parsing.
"""
# Check if Ebsilon is available
if not is_ebsilon_available():
logging.warning(
"EbsilonModelParser initialized without Ebsilon support. "
"EBS environment variable is not set or EbsOpen could not be imported; "
"Ebsilon functionality will not be available."
)
# Raise an error since this parser specifically requires Ebsilon
raise RuntimeError(
"EbsilonModelParser requires Ebsilon to be available. "
"Please set the EBS environment variable to your Ebsilon installation path."
)
self.model_path = model_path
self.split_physical_exergy = split_physical_exergy
self.app = None # Ebsilon application instance
self.model = None # Opened Ebsilon model
self.oc = None # ObjectCaster for type casting
self.components_data: dict[str, dict[str, dict[str, Any]]] = {} # Dictionary to store component data
self.connections_data: dict[str, dict[str, Any]] = {} # Dictionary to store connection data
self.Tamb: float | None = None # Ambient temperature
self.pamb: float | None = None # Ambient pressure
self._storages_to_postprocess: list[dict[str, Any]] = []
self._power_bus_mul_data: dict[str, dict[int, float]] = {} # {comp_name: {connector_idx: MUL_value}}
[docs]
@require_ebsilon
def initialize_model(self):
"""
Initializes the Ebsilon application and opens the specified model.
Raises:
FileNotFoundError: If the model file cannot be opened.
RuntimeError: If the COM server cannot be started or ObjectCaster cannot be obtained.
"""
# 1) start the COM server
try:
self.app = Dispatch("EbsOpen.Application")
except Exception as e:
logging.error(f"Failed to start Ebsilon COM server: {e}")
raise RuntimeError(f"Could not start Ebsilon COM server: {e}")
# 2) try to open the .ebs model
try:
self.model = self.app.Open(self.model_path)
except Exception as e:
logging.error(f"Failed to open model file: {e}")
raise FileNotFoundError(f"File not found at: {self.model_path}") from e
# 3) grab the ObjectCaster
try:
self.oc = self.app.ObjectCaster
except Exception as e:
logging.error(f"Failed to obtain ObjectCaster: {e}")
raise RuntimeError(f"Could not get ObjectCaster: {e}")
logging.info(f"Model opened successfully: {self.model_path}")
[docs]
@require_ebsilon
def simulate_model(self):
"""
Simulates the Ebsilon model and logs any calculation errors.
Raises:
Exception: If model simulation fails.
"""
try:
# Prepare to collect calculation errors
calc_errors = self.model.CalculationErrors
# Run the simulation
self.model.SimulateNew()
error_count = calc_errors.Count
logging.warning(f"Simulation has {error_count} warning(s).")
# Log each error if any exist
if error_count > 0:
for i in range(1, error_count + 1):
error = calc_errors.Item(i)
logging.warning(f"Warning {i}: {error.Description}")
except Exception as e:
logging.error(f"Failed during simulation: {e}")
raise
[docs]
@require_ebsilon
def parse_model(self):
"""
Parses all objects in the Ebsilon model to extract component and connection data.
Raises:
ValueError: If ambient conditions are not set.
Exception: If model parsing fails.
"""
try:
total_objects = self.model.Objects.Count
logging.info(f"Parsing {total_objects} objects from the model")
# Iterate over all objects in the model and select the components
for j in range(1, total_objects + 1):
obj = self.model.Objects.Item(j)
# Check if the object is a component (epObjectKindComp = 10)
if obj.IsKindOf(10):
self.parse_component(obj)
# After parsing all components, check if Tamb and pamb have been set
if self.Tamb is None or self.pamb is None:
error_msg = (
"Ambient temperature (Tamb) and/or ambient pressure (pamb) have not been set.\n"
"Please ensure that your Ebsilon model includes component(s) of type 46 (Measuring Point) "
"with a setting for the Ambient Temperature and the Ambient Pressure in MEASM."
)
logging.error(error_msg)
raise ValueError(error_msg)
# Iterate over all objects in the model and select the connections
for j in range(1, total_objects + 1):
obj = self.model.Objects.Item(j)
# Check if the object is a pipe (epObjectKindPipe = 16)
if obj.IsKindOf(16):
self.parse_connection(obj)
# Reclassify Power Summarizer connections based on MUL signs
self._reclassify_power_bus_connections()
# After parsing all components and connections, create storage connections
self._create_storage_connections()
except Exception as e:
logging.error(f"Error while parsing the model: {e}")
raise
[docs]
@require_ebsilon
def parse_connection(self, obj: Any):
"""
Parses the connections (pipes) associated with a component.
Parameters:
obj: The Ebsilon component object whose connections are to be parsed.
"""
from .ebsilon_functions import calc_eM, calc_eT
# Cast the pipe to the correct type
pipe_cast = self.oc.CastToPipe(obj)
# Define fluid types that are considered non-material or non-energetic
non_material_fluids = {5, 6, 9, 10, 13} # Scheduled, Actual, Electric, Shaft, Logic
non_energetic_fluids = {5, 6} # Scheduled, Actual
power_fluids = {9, 10} # Electric, Shaft
logic_fluids = 13 # Logic "fluids" for heat and power flows
heat_components = {5, 15, 16, 35} # Components that handle with heat flows as input or output
power_components = {31} # Power-summerized with power flows ONLY as output
# ALL EBSILON CONNECTIONS
# Initialize connection data with the common fields
connection_data = {
"name": pipe_cast.Name,
"kind": "other", # it will be changed later ("material", "heat", "power") according to the fluid type
"source_component": None,
"source_component_type": None,
"source_connector": None,
"target_component": None,
"target_component_type": None,
"target_connector": None,
"fluid_type": fluid_type_index.get(pipe_cast.FluidType, "Unknown"),
"fluid_type_id": pipe_cast.FluidType,
}
# Check if the connection is is not in non-energetic fluids
if (pipe_cast.Kind - 1000) not in non_energetic_fluids:
# Get the components at both ends of the pipe
comp0 = pipe_cast.Comp(0) if pipe_cast.HasComp(0) else None
comp1 = pipe_cast.Comp(1) if pipe_cast.HasComp(1) else None
# Get the connectors (links) at both ends of the pipe
link0 = pipe_cast.Link(0) if pipe_cast.HasComp(0) else None
link1 = pipe_cast.Link(1) if pipe_cast.HasComp(1) else None
# GENERAL INFORMATION
connection_data.update(
{
"source_component": comp0.Name if comp0 else None,
"source_component_type": (comp0.Kind - 10000) if comp0 else None,
"source_connector": link0.Index if link0 else None,
"target_component": comp1.Name if comp1 else None,
"target_component_type": (comp1.Kind - 10000) if comp1 else None,
"target_connector": link1.Index if link1 else None,
}
)
# MATERIAL CONNECTIONS
if (pipe_cast.Kind - 1000) not in non_material_fluids:
# Extract basic thermodynamic properties
T_value = (
convert_to_SI("T", pipe_cast.T.Value, unit_id_to_string.get(pipe_cast.T.Dimension, "Unknown"))
if hasattr(pipe_cast, "T") and pipe_cast.T.Value is not None
else None
)
p_value = (
convert_to_SI("p", pipe_cast.P.Value, unit_id_to_string.get(pipe_cast.P.Dimension, "Unknown"))
if hasattr(pipe_cast, "P") and pipe_cast.P.Value is not None
else None
)
e_PH_value = (
convert_to_SI("e", pipe_cast.E.Value, unit_id_to_string.get(pipe_cast.E.Dimension, "Unknown"))
if hasattr(pipe_cast, "E") and pipe_cast.E.Value is not None
else None
)
# If e_PH is not available from Ebsilon, calculate using min-based formula
if (
e_PH_value is None
and T_value is not None
and p_value is not None
and (
hasattr(pipe_cast, "H")
and pipe_cast.H.Value is not None
and hasattr(pipe_cast, "S")
and pipe_cast.S.Value is not None
)
):
try:
e_PH_value = calc_eph_from_min(pipe_cast, self.Tamb)
if e_PH_value is not None:
logging.info(
f"Physical exergy calculated using min-based formula for {pipe_cast.Name}: "
f"{e_PH_value:.2f} J/kg"
)
except ValueError as ve:
logging.error(f"Failed to calculate e_PH from min for {pipe_cast.Name}: {ve}")
e_PH_value = None
connection_data.update(
{
"kind": "material",
"m": (
convert_to_SI(
"m", pipe_cast.M.Value, unit_id_to_string.get(pipe_cast.M.Dimension, "Unknown")
)
if hasattr(pipe_cast, "M") and pipe_cast.M.Value is not None
else None
),
"m_unit": fluid_property_data["m"]["SI_unit"],
"T": T_value,
"T_unit": fluid_property_data["T"]["SI_unit"],
"p": p_value,
"p_unit": fluid_property_data["p"]["SI_unit"],
"h": (
convert_to_SI(
"h", pipe_cast.H.Value, unit_id_to_string.get(pipe_cast.H.Dimension, "Unknown")
)
if hasattr(pipe_cast, "H") and pipe_cast.H.Value is not None
else None
),
"h_unit": fluid_property_data["h"]["SI_unit"],
"s": (
convert_to_SI(
"s", pipe_cast.S.Value, unit_id_to_string.get(pipe_cast.S.Dimension, "Unknown")
)
if hasattr(pipe_cast, "S") and pipe_cast.S.Value is not None
else None
),
"s_unit": fluid_property_data["s"]["SI_unit"],
"e_PH": e_PH_value,
"e_PH_unit": fluid_property_data["e"]["SI_unit"],
"x": (
convert_to_SI(
"x", pipe_cast.X.Value, unit_id_to_string.get(pipe_cast.X.Dimension, "Unknown")
)
if hasattr(pipe_cast, "X") and pipe_cast.X.Value is not None
else None
),
"x_unit": fluid_property_data["x"]["SI_unit"],
"VM": (
convert_to_SI(
"VM", pipe_cast.VM.Value, unit_id_to_string.get(pipe_cast.VM.Dimension, "Unknown")
)
if hasattr(pipe_cast, "VM") and pipe_cast.VM.Value is not None
else None
),
"VM_unit": fluid_property_data["VM"]["SI_unit"],
}
)
# Add the mechanical and thermal specific exergies unless the flag is set to False
if self.split_physical_exergy:
e_T_value = calc_eT(self.app, pipe_cast, connection_data["p"], self.Tamb, self.pamb)
e_M_value = calc_eM(self.app, pipe_cast, connection_data["p"], self.Tamb, self.pamb)
connection_data.update(
{
"e_T": e_T_value,
"e_T_unit": fluid_property_data["e"]["SI_unit"],
"e_M": e_M_value,
"e_M_unit": fluid_property_data["e"]["SI_unit"],
}
)
# Handle mass composition logic for fluids
if fluid_type_index.get(pipe_cast.FluidType, "Unknown") in ["Steam", "Water"]:
connection_data["mass_composition"] = {"H2O": 1}
elif fluid_type_index.get(pipe_cast.FluidType, "Unknown") in ["2PhaseLiquid", "2PhaseGaseous"]:
# Get the FMED value to determine the substance
fmed_value = pipe_cast.FMED.Value if hasattr(pipe_cast, "FMED") else None
if fmed_value in two_phase_fluids_mapping:
connection_data["mass_composition"] = two_phase_fluids_mapping[fmed_value]
else:
connection_data["mass_composition"] = {} # Default if no mapping found
logging.warning(
f"FMED value {fmed_value} not found in fluid_composition_mapping. Please add it."
)
elif fluid_type_index.get(pipe_cast.FluidType, "Unknown") in ["ThermoLiquid"]:
# For oil, we assume a default composition
connection_data["mass_composition"] = {"ThermoLiquid": 1}
else:
connection_data["mass_composition"] = {
param.lstrip("X"): getattr(pipe_cast, param).Value
for param in composition_params
if hasattr(pipe_cast, param) and getattr(pipe_cast, param).Value not in [0, None]
}
# HEAT AND POWER CONNECTIONS from Logic "fluids"
if (pipe_cast.Kind - 1000) == logic_fluids:
if (comp0 is not None and comp0.Kind is not None and comp0.Kind - 10000 in heat_components) or (
comp1 is not None and comp1.Kind is not None and comp1.Kind - 10000 in heat_components
):
connection_data.update(
{
"kind": "heat",
"energy_flow": (
convert_to_SI(
"heat", pipe_cast.Q.Value, unit_id_to_string.get(pipe_cast.Q.Dimension, "Unknown")
)
if hasattr(pipe_cast, "Q") and pipe_cast.Q.Value is not None
else None
),
"energy_flow_unit": fluid_property_data["heat"]["SI_unit"],
"E": None,
"E_unit": fluid_property_data["power"]["SI_unit"],
}
)
if (comp0 is not None and comp0.Kind is not None and comp0.Kind - 10000 in power_components) or (
comp1 is not None and comp1.Kind is not None and comp1.Kind - 10000 in power_components
):
connection_data.update(
{
"kind": "power",
"energy_flow": (
convert_to_SI(
"power", pipe_cast.Q.Value, unit_id_to_string.get(pipe_cast.Q.Dimension, "Unknown")
)
if hasattr(pipe_cast, "Q") and pipe_cast.Q.Value is not None
else None
),
"energy_flow_unit": fluid_property_data["power"]["SI_unit"],
"E": (
convert_to_SI(
"power", pipe_cast.Q.Value, unit_id_to_string.get(pipe_cast.Q.Dimension, "Unknown")
)
if hasattr(pipe_cast, "Q") and pipe_cast.Q.Value is not None
else None
),
"E_unit": fluid_property_data["power"]["SI_unit"],
}
)
# POWER CONNECTIONS from power "fluids"
if (pipe_cast.Kind - 1000) in power_fluids:
connection_data.update(
{
"kind": "power",
"energy_flow": (
convert_to_SI(
"power", pipe_cast.Q.Value, unit_id_to_string.get(pipe_cast.Q.Dimension, "Unknown")
)
if hasattr(pipe_cast, "Q") and pipe_cast.Q.Value is not None
else None
),
"energy_flow_unit": fluid_property_data["power"]["SI_unit"],
"E": (
convert_to_SI(
"power", pipe_cast.Q.Value, unit_id_to_string.get(pipe_cast.Q.Dimension, "Unknown")
)
if hasattr(pipe_cast, "Q") and pipe_cast.Q.Value is not None
else None
),
"E_unit": fluid_property_data["power"]["SI_unit"],
}
)
# Convert the connector numbers to selected standard values for each component
if (
connection_data["source_component_type"] in connector_mapping
and connection_data["source_connector"] in connector_mapping[connection_data["source_component_type"]]
):
connection_data["source_connector"] = connector_mapping[connection_data["source_component_type"]][
connection_data["source_connector"]
]
if (
connection_data["target_component_type"] in connector_mapping
and connection_data["target_connector"] in connector_mapping[connection_data["target_component_type"]]
):
connection_data["target_connector"] = connector_mapping[connection_data["target_component_type"]][
connection_data["target_connector"]
]
# Store the connection data
self.connections_data[obj.Name] = connection_data
else:
logging.info(f"Skipping non-energetic connection: {pipe_cast.Name}")
[docs]
@require_ebsilon
def parse_component(self, obj: Any):
"""
Parses data from a component, including its type and various properties.
Parameters:
obj: The Ebsilon component object to parse.
"""
# Cast the component to get its type index
comp_cast = self.oc.CastToComp(obj)
type_index = comp_cast.Kind - 10000
# Dynamically call the specific CastToCompX method based on type_index
cast_method_name = f"CastToComp{type_index}"
# Check if the method exists and call it, otherwise fallback to general casting
if hasattr(self.oc, cast_method_name):
comp_cast = getattr(self.oc, cast_method_name)(obj)
logging.info(f"Using method {cast_method_name} to cast the component.")
else:
logging.warning(f"No specific cast method for type_index {type_index}, using generic CastToComp.")
comp_cast = self.oc.CastToComp(obj)
# Get the human-readable type name of the component
type_name = ebs_objects.get(type_index, f"Unknown Type {type_index}")
# Exclude non-thermodynamic unit operators
if type_index not in non_thermodynamic_unit_operators:
# Collect component data
component_data = {
"name": comp_cast.Name,
"type": type_name,
"type_index": type_index,
"eta_s": (
comp_cast.ETAIN.Value if hasattr(comp_cast, "ETAIN") and comp_cast.ETAIN.Value is not None else None
),
"eta_mech": (
comp_cast.ETAMN.Value if hasattr(comp_cast, "ETAMN") and comp_cast.ETAMN.Value is not None else None
),
"eta_el": (
comp_cast.ETAEN.Value if hasattr(comp_cast, "ETAEN") and comp_cast.ETAEN.Value is not None else None
),
"eta_cc": (
comp_cast.ETAB.Value if hasattr(comp_cast, "ETAB") and comp_cast.ETAB.Value is not None else None
),
"lamb": (
comp_cast.ALAMN.Value if hasattr(comp_cast, "ALAMN") and comp_cast.ALAMN.Value is not None else None
),
"Q": (
convert_to_SI("heat", comp_cast.QT.Value, unit_id_to_string.get(comp_cast.QT.Dimension, "Unknown"))
if hasattr(comp_cast, "QT") and comp_cast.QT.Value is not None
else None
),
"Q_unit": fluid_property_data["heat"]["SI_unit"],
"P": (
convert_to_SI(
"power", comp_cast.QSHAFT.Value, unit_id_to_string.get(comp_cast.QSHAFT.Dimension, "Unknown")
)
if hasattr(comp_cast, "QSHAFT") and comp_cast.QSHAFT.Value is not None
else None
),
"P_unit": fluid_property_data["power"]["SI_unit"],
"kA": (comp_cast.KA.Value if hasattr(comp_cast, "KA") and comp_cast.KA.Value is not None else None),
"kA_unit": fluid_property_data["kA"]["SI_unit"],
"A": (comp_cast.A.Value if hasattr(comp_cast, "A") and comp_cast.A.Value is not None else None),
"A_unit": fluid_property_data["A"]["SI_unit"],
"mass_flow_1": (
convert_to_SI("m", comp_cast.M1N.Value, unit_id_to_string.get(comp_cast.M1N.Dimension, "Unknown"))
if hasattr(comp_cast, "M1N") and comp_cast.M1N.Value is not None
else None
),
"mass_flow_1_unit": fluid_property_data["m"]["SI_unit"],
"mass_flow_3": (
convert_to_SI("m", comp_cast.M3N.Value, unit_id_to_string.get(comp_cast.M3N.Dimension, "Unknown"))
if hasattr(comp_cast, "M3N") and comp_cast.M3N.Value is not None
else None
),
"mass_flow_3_unit": fluid_property_data["m"]["SI_unit"],
"energy_flow_1": (
convert_to_SI(
"heat", comp_cast.Q1N.Value, unit_id_to_string.get(comp_cast.Q1N.Dimension, "Unknown")
)
if hasattr(comp_cast, "Q1N") and comp_cast.Q1N.Value is not None
else None
),
"energy_flow_1_unit": fluid_property_data["heat"]["SI_unit"],
}
# Determine the group for the component based on its type
group = None
for group_name, type_list in grouped_components.items():
if type_index in type_list:
group = group_name
break
# If the component type doesn't belong to any predefined group, use its type name
if not group:
group = type_name
# Initialize the group in the components_data dictionary if not already present
if group not in self.components_data:
self.components_data[group] = {}
# Store the component data using the component's name as the key
self.components_data[group][comp_cast.Name] = component_data
# For components of type 46, set ambient temperature and pressure
elif type_index == 46:
comp46 = self.oc.CastToComp46(obj)
if comp46.FTYP.Value == 26:
self.Tamb = convert_to_SI(
"T", comp46.MEASM.Value, unit_id_to_string.get(comp46.MEASM.Dimension, "Unknown")
)
logging.info(f"Set ambient temperature (Tamb) to {self.Tamb} K from component {comp_cast.Name}")
elif comp46.FTYP.Value == 13:
self.pamb = convert_to_SI(
"p", comp46.MEASM.Value, unit_id_to_string.get(comp46.MEASM.Dimension, "Unknown")
)
logging.info(f"Set ambient pressure (pamb) to {self.pamb} Pa from component {comp_cast.Name}")
if type_index == 31:
comp31 = self.oc.CastToComp31(obj)
mul_signs = {}
for i in range(1, 11):
mul_attr = f"MUL{i}"
if hasattr(comp31, mul_attr):
mul_value = getattr(comp31, mul_attr).Value
mul_signs[connector_mapping[31][i]] = mul_value
self._power_bus_mul_data[comp31.Name] = mul_signs
# Also store in component_data so it persists to JSON
if "PowerBus" in self.components_data and comp31.Name in self.components_data["PowerBus"]:
self.components_data["PowerBus"][comp31.Name]["mul_signs"] = mul_signs
if type_index == 118:
storage = self.oc.CastToComp118(obj)
self._storages_to_postprocess.append(
{
"name": storage.Name,
"kind": storage.Kind,
"m_flow_load": storage.MLD.Value,
"m_flow_load_unit": unit_id_to_string.get(storage.MLD.Dimension, "Unknown"),
"m_flow_unload": storage.MUNLD.Value,
"m_flow_unload_unit": unit_id_to_string.get(storage.MUNLD.Dimension, "Unknown"),
"T_storage": storage.TNEW.Value,
"T_storage_unit": unit_id_to_string.get(storage.TNEW.Dimension, "Unknown"),
"p_storage": storage.PNEW.Value,
"p_storage_unit": unit_id_to_string.get(storage.PNEW.Dimension, "Unknown"),
"h_storage": storage.HNEW.Value,
"h_storage_unit": unit_id_to_string.get(storage.HNEW.Dimension, "Unknown"),
}
)
def _create_storage_connections(self):
"""
Create fictive charging/discharging connections for storage components.
After all real connections are parsed, uses stored storage parameters
and existing connections_data to generate and insert material connections.
Returns
-------
None
"""
for raw in self._storages_to_postprocess:
name = raw["name"]
m_load = raw["m_flow_load"]
m_unload = raw["m_flow_unload"]
sign = "charging" if m_load >= m_unload else "discharging"
delta_m = abs(m_load - m_unload)
prefix = f"{name}_{sign}"
new_conn = {
"name": prefix,
"kind": "material",
"source_component": name if sign == "charging" else None,
"target_component": name if sign == "discharging" else None,
"source_component_type": (raw["kind"] - 10000) if sign == "charging" else None,
"target_component_type": (raw["kind"] - 10000) if sign == "discharging" else None,
"source_connector": None,
"target_connector": None,
"m": convert_to_SI("m", delta_m, raw["m_flow_load_unit"]),
"m_unit": fluid_property_data["m"]["SI_unit"],
"T": convert_to_SI("T", raw["T_storage"], raw["T_storage_unit"]),
"T_unit": fluid_property_data["T"]["SI_unit"],
"p": convert_to_SI("p", raw["p_storage"], raw["p_storage_unit"]),
"p_unit": fluid_property_data["p"]["SI_unit"],
"h": convert_to_SI("h", raw["h_storage"], raw["h_storage_unit"]),
"h_unit": fluid_property_data["h"]["SI_unit"],
"s": next(
(
c["s"]
for c in self.connections_data.values()
if c.get("source_component") == name and c.get("source_connector") == connector_mapping[118][2]
),
None,
),
"s_unit": fluid_property_data["s"]["SI_unit"],
"e_PH": next(
(
c["e_PH"]
for c in self.connections_data.values()
if c.get("source_component") == name and c.get("source_connector") == connector_mapping[118][2]
),
None,
),
"e_PH_unit": fluid_property_data["e"]["SI_unit"],
"mass_composition": next(
(
c["mass_composition"]
for c in self.connections_data.values()
if c.get("source_component") == name and c.get("source_connector") == connector_mapping[118][2]
),
None,
),
}
self.connections_data[prefix] = new_conn
def _reclassify_power_bus_connections(self):
"""
Link Power Summarizer (type 31) logic connections to their corresponding
power connections (Electric/Shaft) by matching energy_flow values.
In Ebsilon, power flows (Electric/Shaft) are connected to the Power Summarizer
via intermediate Logic connections that carry the same energy_flow value.
All 10 Logic connections target the PowerBus regardless of MUL sign.
The MUL sign determines the linking direction:
- MUL >= 0 (inlet): the power connection's free target is linked to POWER
- MUL < 0 (outlet): the power connection's free source is linked to POWER
This method:
1. Finds Logic connections targeting type 31 components
2. Uses MUL signs to determine if the connection is an inlet or outlet
3. Matches them to power connections (Electric/Shaft) with equal energy_flow
4. Updates the power connection to point to/from the PowerBus directly
5. Removes the now-redundant Logic connection
Logic connections with no matching power connection (e.g. the net output ETOT)
are left unchanged.
"""
logic_conns_to_remove = []
for conn_name, conn_data in list(self.connections_data.items()):
# Only process Logic fluid connections connected to type 31
if conn_data.get("fluid_type_id") != 13:
continue
# Handle logic connections targeting POWER (all 10 inputs in Ebsilon)
if conn_data.get("target_component_type") == 31:
comp_name = conn_data["target_component"]
connector_idx = conn_data["target_connector"]
logic_energy = conn_data.get("energy_flow")
if logic_energy is None:
continue
# Determine direction from MUL sign
mul_value = self._power_bus_mul_data.get(comp_name, {}).get(connector_idx, 1.0)
is_power_inlet = mul_value >= 0 # positive MUL = power flows into POWER
# Find matching power connection (Electric/Shaft) by energy_flow
matched_power_name = None
for power_name, power_data in self.connections_data.items():
if power_name == conn_name:
continue
if power_data.get("fluid_type_id") not in {9, 10}: # Electric, Shaft
continue
power_energy = power_data.get("energy_flow")
if power_energy is None:
continue
if abs(power_energy - logic_energy) < 1e-6 and (
is_power_inlet
and power_data.get("target_component") is None
or not is_power_inlet
and power_data.get("source_component") is None
):
matched_power_name = power_name
break
if matched_power_name is None:
continue
matched_power_data = self.connections_data[matched_power_name]
if is_power_inlet:
# Positive MUL: power flows into POWER → link power target to POWER
matched_power_data["target_component"] = comp_name
matched_power_data["target_component_type"] = 31
matched_power_data["target_connector"] = connector_idx
logging.info(
f"Linked power connection {matched_power_name} to PowerBus "
f"{comp_name} (inlet, connector {connector_idx})"
)
else:
# Negative MUL: power flows out of POWER → link power source to POWER
matched_power_data["source_component"] = comp_name
matched_power_data["source_component_type"] = 31
matched_power_data["source_connector"] = connector_idx
logging.info(
f"Linked power connection {matched_power_name} to PowerBus "
f"{comp_name} (outlet, connector {connector_idx})"
)
logic_conns_to_remove.append(conn_name)
# Handle logic connections sourced from POWER (e.g. already-directed outlets)
elif conn_data.get("source_component_type") == 31:
comp_name = conn_data["source_component"]
connector_idx = conn_data["source_connector"]
logic_energy = conn_data.get("energy_flow")
if logic_energy is None:
continue
# Find matching power connection with free source
matched_power_name = None
for power_name, power_data in self.connections_data.items():
if power_name == conn_name:
continue
if power_data.get("fluid_type_id") not in {9, 10}:
continue
power_energy = power_data.get("energy_flow")
if power_energy is None:
continue
if abs(power_energy - logic_energy) < 1e-6 and power_data.get("source_component") is None:
matched_power_name = power_name
break
if matched_power_name is None:
continue
matched_power_data = self.connections_data[matched_power_name]
matched_power_data["source_component"] = comp_name
matched_power_data["source_component_type"] = 31
matched_power_data["source_connector"] = connector_idx
logging.info(
f"Linked power connection {matched_power_name} to PowerBus "
f"{comp_name} (outlet, connector {connector_idx})"
)
logic_conns_to_remove.append(conn_name)
# Remove the matched Logic connections
for name in logic_conns_to_remove:
del self.connections_data[name]
logging.info(f"Removed redundant Logic connection: {name}")
[docs]
def get_sorted_data(self) -> dict[str, Any]:
"""
Sorts the component and connection data alphabetically by name.
Returns:
dict: A dictionary containing sorted 'components', 'connections', and ambient conditions data.
"""
# Sort components within each group by component name
sorted_components = {
comp_type: dict(sorted(self.components_data[comp_type].items()))
for comp_type in sorted(self.components_data)
}
# Sort connections by their names
sorted_connections = dict(sorted(self.connections_data.items()))
# Return data including ambient conditions
return {
"components": sorted_components,
"connections": sorted_connections,
"ambient_conditions": {
"Tamb": self.Tamb,
"Tamb_unit": fluid_property_data["T"]["SI_unit"],
"pamb": self.pamb,
"pamb_unit": fluid_property_data["p"]["SI_unit"],
},
}
[docs]
def write_to_json(self, output_path: str):
"""
Writes the parsed and sorted data to a JSON file.
Parameters:
output_path (str): Path where the JSON file will be saved.
Raises:
Exception: If writing to JSON fails.
"""
data = self.get_sorted_data()
try:
# Write the data to a JSON file with indentation for readability
with open(output_path, "w") as json_file:
json.dump(data, json_file, indent=4)
logging.info(f"Data successfully written to {output_path}")
except Exception as e:
logging.error(f"Failed to write data to JSON: {e}")
raise
[docs]
def run_ebsilon(model_path: str, output_dir: str | None = None, split_physical_exergy: bool = True) -> dict[str, Any]:
"""
Main function to process the Ebsilon model and return parsed data.
Optionally writes the parsed data to a JSON file.
Parameters:
model_path (str): Path to the Ebsilon model file.
output_dir (str): Optional path where the parsed data should be saved as a JSON file.
split_physical_exergy (bool): Flag to split physical exergy into thermal and mechanical components.
Returns:
dict: Parsed data in dictionary format.
Raises:
FileNotFoundError: If the model file is not found at the specified path.
RuntimeError: For any error during model initialization, simulation, parsing, or writing.
"""
# Check if Ebsilon is available
if not is_ebsilon_available():
raise RuntimeError(
"Ebsilon functionality is required for running this function. "
"Please set the EBS environment variable to your Ebsilon installation path."
)
# Check if the model file exists at the specified path
if not os.path.exists(model_path):
error_msg = f"Model file not found at: {model_path}"
logging.error(error_msg)
raise FileNotFoundError(error_msg)
# Initialize the Ebsilon model parser with the model file path
try:
parser = EbsilonModelParser(model_path, split_physical_exergy=split_physical_exergy)
except RuntimeError as e:
# This will catch the RuntimeError raised in __init__ if Ebsilon is not available
logging.error(f"Failed to initialize EbsilonModelParser: {e}")
raise
try:
# Initialize the Ebsilon model within the parser
parser.initialize_model()
except FileNotFoundError:
# allow an invalid/corrupt‐model file to bubble up as FileNotFoundError
raise
except Exception:
# other COM/server errors should still be RuntimeErrors
error_msg = f"File not found: {model_path}"
logging.error(error_msg)
raise RuntimeError(error_msg)
try:
# Simulate the Ebsilon model
parser.simulate_model()
except Exception as e:
# Log and raise an error if something goes wrong during simulation
error_msg = f"An error occurred during model simulation: {e}"
logging.error(error_msg)
raise RuntimeError(error_msg)
try:
# Parse data from the simulated model
parser.parse_model()
except Exception as e:
# Log and raise an error if something goes wrong during parsing
error_msg = f"An error occurred during model parsing: {e}"
logging.error(error_msg)
raise RuntimeError(error_msg)
# Get the parsed and sorted data
parsed_data = parser.get_sorted_data()
if output_dir is not None:
try:
# Write the parsed data to the JSON file
parser.write_to_json(output_dir)
logging.info(f"Data successfully written to {output_dir}")
except Exception as e:
# Log and raise an error if something goes wrong while writing the output file
error_msg = f"An error occurred while writing the output file: {e}"
logging.error(error_msg)
raise RuntimeError(error_msg)
# Return the parsed data as a dictionary (not as a JSON string)
return parsed_data