Source code for watertap.unit_models.nanofiltration_0D

#################################################################################
# WaterTAP Copyright (c) 2020-2026, The Regents of the University of California,
# through Lawrence Berkeley National Laboratory, Oak Ridge National Laboratory,
# National Laboratory of the Rockies, and National Energy Technology
# Laboratory (subject to receipt of any required approvals from the U.S. Dept.
# of Energy). All rights reserved.
#
# Please see the files COPYRIGHT.md and LICENSE.md for full copyright and license
# information, respectively. These files are also available online at the URL
# "https://github.com/watertap-org/watertap/"
#################################################################################
"""
Simple zero-dimensional Nanofiltration unit model.
"""

from pyomo.environ import Block, Constraint, Set, value, Var
from pyomo.common.config import ConfigDict, ConfigValue, In

from idaes.core import (
    declare_process_block_class,
    EnergyBalanceType,
    MomentumBalanceType,
    useDefault,
    UnitModelBlockData,
    MaterialFlowBasis,
)
from idaes.core.initialization import ModularInitializerBase
from idaes.core.scaling import CustomScalerBase, ConstraintScalingScheme
from idaes.core.util.config import is_physical_parameter_block
from idaes.core.util.exceptions import ConfigurationError
import idaes.logger as idaeslog

_log = idaeslog.getLogger(__name__)


[docs]class Nanofiltration0DInitializer(ModularInitializerBase): """ Initializer for 0D Nanofiltration models. """ CONFIG = ModularInitializerBase.CONFIG()
[docs] def initialize_main_model( self, model: Block, ): """ Initialization routine for main Nanofiltration0D models. Args: model: Pyomo Block to be initialized. copy_inlet_state: bool (default=False). Whether to copy inlet state to other states or not. Copying will generally be faster, but inlet states may not contain all properties required elsewhere. Returns: Pyomo solver results object. """ # Get loggers init_log = idaeslog.getInitLogger( model.name, self.get_output_level(), tag="unit" ) solve_log = idaeslog.getSolveLogger( model.name, self.get_output_level(), tag="unit" ) # Create solver solver = self._get_solver() # --------------------------------------------------------------------- # Initialize inlet state prop_init = self.get_submodel_initializer(model.properties_in) if prop_init is not None: prop_init.initialize( model=model.properties_in, output_level=self.get_output_level(), ) else: raise ValueError( "No Initializer found for property package. Please set provide " "sub-model initializers or assign a default Initializer." ) init_log.info_high("Initialization Step 1 (Inlet State) Complete.") # --------------------------------------------------------------------- # Estimate outlet states from inlet and initialize for t, in_state in model.properties_in.items(): state_vars = in_state.define_state_vars() state_vars_ret = model.properties_retentate[t].define_state_vars() state_vars_per = model.properties_permeate[t].define_state_vars() for n, sv in state_vars.items(): sv_ret = state_vars_ret[n] sv_per = state_vars_per[n] if sv.local_name == "pressure" and not sv_ret.fixed: # For pressure, only retentate is linked to inlet if hasattr(model, "deltaP"): sv_ret.set_value(sv + model.deltaP[t]) else: sv_ret.set_value(sv) elif "flow" in sv.local_name: self._init_flow(model, t, in_state, sv, sv_ret, sv_per) elif any( sv.local_name.startswith(i) for i in ["mass_frac", "mole_frac"] ): self._init_frac(model, t, in_state, sv, sv_ret, sv_per) elif sv.local_name.startswith("conc"): self._init_conc(model, t, in_state, sv, sv_ret, sv_per) else: # For everything else, assume outlet similar to inlet for k, svd in sv.items(): if not sv_ret[k].fixed: sv_ret[k].set_value(svd) if not sv_per[k].fixed: sv_per[k].set_value(svd) prop_init.initialize( model=model.properties_retentate, output_level=self.get_output_level(), ) prop_init.initialize( model=model.properties_permeate, output_level=self.get_output_level(), ) init_log.info_high("Initialization Step 2 (Outlet States) Complete.") # --------------------------------------------------------------------- # Solve full model with idaeslog.solver_log(solve_log, idaeslog.DEBUG) as slc: res = solver.solve(model, tee=slc.tee) init_log.info("Initialization Completed, {}".format(idaeslog.condition(res))) return res
def _init_conc(self, model, t, in_state, sv, sv_ret, sv_per): # Component indexed flow - need to apply split fractions for k, svd in sv.items(): if isinstance(k, str): # Single indexing set, component is the index j = k else: # Component is always the last index j = k[-1] # Determine split based on type comp = in_state.params.get_component(j) if comp.is_solvent(): # Assume solvent concentrations unchanged sv_ret[k].set_value(svd) sv_per[k].set_value(svd) else: if j == model.config.electroneutrality_ion: # Guess electroneutrality ion will be based on solvent recovery split = model.recovery_solvent[t] else: # For initialization, assume density is roughly constant and rejection can be used as split fraction split = 1 - model.rejection_comp[t, j] if not sv_ret[k].fixed: sv_ret[k].set_value(svd * (1 - split)) if not sv_per[k].fixed: sv_per[k].set_value(svd * split) def _init_flow(self, model, t, in_state, sv, sv_ret, sv_per): # Determine if it is a component flow or not if sv.local_name.endswith("_comp"): # Component indexed flow - need to apply split fractions for k, svd in sv.items(): if isinstance(k, str): # Single indexing set, component is the index j = k else: # Component is always the last index j = k[-1] # Determine split based on type comp = in_state.params.get_component(j) if comp.is_solvent() or j == model.config.electroneutrality_ion: # Assume electroneutrality ion will have similar separation to solvents split = model.recovery_solvent[t] else: # For initialization, assume density is roughly constant and rejection can be used as split fraction split = 1 - model.rejection_comp[t, j] if not sv_ret[k].fixed: sv_ret[k].set_value(svd * (1 - split)) if not sv_per[k].fixed: sv_per[k].set_value(svd * split) else: # Assume a total flow basis, and use solvent recovery split = model.recovery_solvent[t] for k, svd in sv.items(): if not sv_ret[k].fixed: sv_ret[k].set_value(svd * (1 - split)) if not sv_per[k].fixed: sv_per[k].set_value(svd * split) def _init_frac(self, model, t, in_state, sv, sv_ret, sv_per): # First need to iterate over all indices to collect normalized flows # Assume a basis of 1 mass or mole unit # Also, only single phase property packages supported, so we only need to # worry about the component index nom_ret_comp_flow = {} nom_per_comp_flow = {} for k, svd in sv.items(): if isinstance(k, str): # Single indexing set, component is the index j = k else: # Component is always the last index j = k[-1] # Determine split based on type comp = in_state.params.get_component(j) if comp.is_solvent() or j == model.config.electroneutrality_ion: # Assume electroneutrality ion will have similar separation to solvents split = model.recovery_solvent[t] else: # For initialization, assume density is roughly constant and rejection can be used as split fraction split = 1 - model.rejection_comp[t, j] nom_ret_comp_flow[j] = value(svd * (1 - split)) nom_per_comp_flow[j] = value(svd * split) nom_ret_tot_flow = sum(f for f in nom_ret_comp_flow.values()) nom_per_tot_flow = sum(f for f in nom_per_comp_flow.values()) # # Next set value based on fraction of nominal total flow for k, svd in sv.items(): if isinstance(k, str): # Single indexing set, component is the index j = k else: # Component is always the last index j = k[-1] if not sv_ret[k].fixed: sv_ret[k].set_value(nom_ret_comp_flow[j] / nom_ret_tot_flow) if not sv_per[k].fixed: sv_per[k].set_value(nom_per_comp_flow[j] / nom_per_tot_flow)
[docs]class Nanofiltration0DScaler(CustomScalerBase): """ Scaler class for Nanofiltration0D models. """ DEFAULT_SCALING_FACTORS = { "deltaP": 1e-4, "rejection_comp": 1e2, "recovery_solvent": 10, }
[docs] def variable_scaling_routine( self, model, overwrite: bool = False, submodel_scalers: dict = None ): """ Apply variable scaling to model. Args: model: model to be scaled overwrite: whether to overwrite existing scaling factors submodel_scalers: ComponentMap of Scaler objects to be applied to sub-models Returns: None """ # Scale inlet state self.call_submodel_scaler_method( submodel=model.properties_in, method="variable_scaling_routine", submodel_scalers=submodel_scalers, overwrite=overwrite, ) # Propagate scaling from inlet state self.propagate_state_scaling( target_state=model.properties_retentate, source_state=model.properties_in, overwrite=overwrite, ) self.propagate_state_scaling( target_state=model.properties_permeate, source_state=model.properties_in, overwrite=overwrite, ) # Scale remaining states self.call_submodel_scaler_method( submodel=model.properties_retentate, method="variable_scaling_routine", submodel_scalers=submodel_scalers, overwrite=overwrite, ) self.call_submodel_scaler_method( submodel=model.properties_permeate, method="variable_scaling_routine", submodel_scalers=submodel_scalers, overwrite=overwrite, ) # Scale remaining variables # Variables with default scaling factors for n in self.DEFAULT_SCALING_FACTORS.keys(): c = model.find_component(n) for v in c.values(): self.scale_variable_by_default(v, overwrite=overwrite)
[docs] def constraint_scaling_routine( self, model, overwrite: bool = False, submodel_scalers: dict = None ): """ Apply constraint scaling to model. Args: model: model to be scaled overwrite: whether to overwrite existing scaling factors submodel_scalers: ComponentMap of Scaler objects to be applied to sub-models Returns: None """ # Call scaling methods for sub-models self.call_submodel_scaler_method( submodel=model.properties_in, method="constraint_scaling_routine", submodel_scalers=submodel_scalers, overwrite=overwrite, ) self.call_submodel_scaler_method( submodel=model.properties_retentate, method="constraint_scaling_routine", submodel_scalers=submodel_scalers, overwrite=overwrite, ) self.call_submodel_scaler_method( submodel=model.properties_permeate, method="constraint_scaling_routine", submodel_scalers=submodel_scalers, overwrite=overwrite, ) # Scale remaining constraints for c in model.component_data_objects(Constraint, descend_into=False): self.scale_constraint_by_nominal_value( c, scheme=ConstraintScalingScheme.inverseMaximum, overwrite=overwrite, )
[docs]@declare_process_block_class("Nanofiltration0D") class Nanofiltration0DData(UnitModelBlockData): """ Nanofiltration model. """ default_initializer = Nanofiltration0DInitializer default_scaler = Nanofiltration0DScaler CONFIG = ConfigDict() CONFIG.declare( "dynamic", ConfigValue( domain=In([False]), default=False, description="Dynamic model flag - must be False", doc="""Indicates whether this model will be dynamic or not, **default** = False. Product blocks are always steady-state.""", ), ) CONFIG.declare( "has_holdup", ConfigValue( default=False, domain=In([False]), description="Holdup construction flag - must be False", doc="""Product blocks do not contain holdup, thus this must be False.""", ), ) CONFIG.declare( "property_package", ConfigValue( default=useDefault, domain=is_physical_parameter_block, description="Property package to use for mixer", doc="""Property parameter object used to define property calculations, **default** - useDefault. **Valid values:** { **useDefault** - use default package from parent model or flowsheet, **PropertyParameterObject** - a PropertyParameterBlock object.}""", ), ) CONFIG.declare( "property_package_args", ConfigDict( implicit=True, description="Arguments to use for constructing property packages", doc="""A ConfigDict with arguments to be passed to a property block(s) and used when constructing these, **default** - None. **Valid values:** { see property package for documentation.}""", ), ) CONFIG.declare( "momentum_balance_type", ConfigValue( default=MomentumBalanceType.pressureTotal, domain=In(MomentumBalanceType), description="Retentate momentum balance construction flag", doc="""Indicates what type of momentum balance should be constructed for the retentate side. Only MomentumBalanceType.none and MomentumBalanceType.pressureTotal (default) are supported. **Valid values:** { **MomentumBalanceType.none** - exclude momentum balances, **MomentumBalanceType.pressureTotal** - single pressure balance for material}""", ), ) CONFIG.declare( "has_pressure_change", ConfigValue( default=False, domain=In([True, False]), description="Retentate pressure change term construction flag", doc="""Indicates whether terms for retentate pressure change should be constructed, **default** - False. **Valid values:** { **True** - include pressure change terms, **False** - exclude pressure change terms.}""", ), ) CONFIG.declare( "energy_balance_type", ConfigValue( default=EnergyBalanceType.isothermal, domain=In(EnergyBalanceType), description="Indicated what type of energy balance should be constructed.", doc="""Indicates what type of momentum balance should be constructed for the retentate side. Only EnergyBalanceType.none and EnergyBalanceType.isothermal (default) are supported.""", ), ) # TODO: Support multiple electroneutrality ions # This needs some rule for prioritizing each ion. CONFIG.declare( "electroneutrality_ion", ConfigValue( default="Cl_-", domain=str, description="Balancing ion to use to maintain electroneutrality", doc="""Name of ion to use to maintain electroneutrality in permeate stream. If an ion is provided, the split fraction for this ion will be solved implicitly to ensure electroneutrality is maintained in the permeate stream. If None, user must provide spl;it fractions for all monovalent ions and electroneutrality is not guaranteed.""", ), ) CONFIG.declare( "passing_species_list", ConfigValue( default=None, domain=list, description="List of solutes with low rejection", doc="""List of solute species which pass through the nanofiltration membrane with low rejection. If not provided (None), solutes will be classified based on their charge, with monovalent and uncharged species considered passing.""", ), ) CONFIG.declare( "default_passing_rejection", ConfigValue( default=0.1, domain=float, description="Default value for rejection of passing species", doc="""Default value to be assigned as the rejection fraction for species identified as passing through the membrane with low rejection.""", ), ) CONFIG.declare( "default_excluded_rejection", ConfigValue( default=1 - 1e-10, domain=float, description="Default value for rejection of excluded species", doc="""Default value to be assigned as the rejection fraction for species identified as be excluded by membrane (high rejection).""", ), )
[docs] def build(self): super().build() prop_params = self.config.property_package # Check that property package only supports single phase # TODO: Extend the model to handle phase equilibrium if len(prop_params.phase_list) > 1: raise ConfigurationError( "Nanofiltration0D model only supports single phase " "property packages." ) # Check that the electroneutrality ion is a valid passing species # I.e. it must be in the passing species list or a monovalent ion if self.config.electroneutrality_ion is not None: try: bal_ion = prop_params.get_component(self.config.electroneutrality_ion) except AttributeError: raise ConfigurationError( f"electroneutrality_ion ({self.config.electroneutrality_ion}) " "is not a valid component in property package." ) if self.config.passing_species_list is not None: if ( self.config.electroneutrality_ion not in self.config.passing_species_list ): raise ConfigurationError( f"electroneutrality_ion ({self.config.electroneutrality_ion}) " "must be a member of the passing species list." ) else: # Check that balancing ion is monovalent if ( not hasattr(bal_ion.config, "charge") or abs(bal_ion.config.charge) > 1 ): raise ConfigurationError( f"electroneutrality_ion ({self.config.electroneutrality_ion}) " "must be a monovalent ion." ) # Check energy balance type is supported if self.config.energy_balance_type not in ( EnergyBalanceType.none, EnergyBalanceType.isothermal, ): raise ConfigurationError( f"Nanofiltration0D model only supports isothermal operation or no energy balance " f"(assigned {self.config.energy_balance_type})" ) # Check that pressure balance arguments are consistent if self.config.momentum_balance_type not in ( MomentumBalanceType.none, MomentumBalanceType.pressureTotal, ): raise ConfigurationError( f"Nanofiltration0D model only supports total pressure balance or no pressure balance " f"(assigned {self.config.momentum_balance_type})" ) elif ( self.config.has_pressure_change and self.config.momentum_balance_type == MomentumBalanceType.none ): raise ConfigurationError( "Inconsistent configuration arguments. has_pressure_change=True " "requires that momentum_balance_type not equal MomentumBalanceType.none." ) tmp_dict_in = dict(**self.config.property_package_args) tmp_dict_in["has_phase_equilibrium"] = False tmp_dict_in["defined_state"] = True self.properties_in = self.config.property_package.build_state_block( self.flowsheet().time, doc="Material properties at inlet", **tmp_dict_in ) tmp_dict_out = dict(**self.config.property_package_args) tmp_dict_out["has_phase_equilibrium"] = False tmp_dict_out["defined_state"] = False self.properties_retentate = self.config.property_package.build_state_block( self.flowsheet().time, doc="Material properties at inlet", **tmp_dict_out ) self.properties_permeate = self.config.property_package.build_state_block( self.flowsheet().time, doc="Material properties at inlet", **tmp_dict_out ) self.add_port("inlet", self.properties_in, doc="Inlet Port") self.add_port("retentate", self.properties_retentate, doc="Retentate Port") self.add_port("permeate", self.properties_permeate, doc="Permeate Port") # NF separation variables self.recovery_solvent = Var(self.flowsheet().time, initialize=0.8) self._solute_set = Set( initialize=[ j for j in prop_params.component_list if not prop_params.get_component(j).is_solvent() and j != self.config.electroneutrality_ion ] ) def _init_rejection(b, _, j): if b.config.passing_species_list is not None: if j in b.config.passing_species_list: return b.config.default_passing_rejection return b.config.default_excluded_rejection else: if j in prop_params.ion_set: # Check charge comp = prop_params.get_component(j) if abs(comp.config.charge) == 1: # Monovalent - include in set return b.config.default_passing_rejection else: return b.config.default_excluded_rejection else: # Non-ionic solute - assume passing species return b.config.default_passing_rejection units = self.config.property_package.get_metadata().derived_units self.area = Var( initialize=1, bounds=(0.0, 1e6), units=units.LENGTH**2, doc="Membrane area", ) self.rejection_comp = Var( self.flowsheet().time, self._solute_set, initialize=_init_rejection, bounds=(1e-10, 1), doc="Solute rejection fractions", ) if self.config.has_pressure_change: self.deltaP = Var( self.flowsheet().time, initialize=0, units=units.PRESSURE, doc="Retentate side pressure change", ) # Material balance @self.Constraint(self.flowsheet().time, prop_params.component_list) def material_balances(b, t, j): pset = b.properties_in[t].phase_list pcset = b.properties_in[t].phase_component_set return sum( b.properties_in[t].get_material_flow_terms(p, j) for p in pset if (p, j) in pcset ) == sum( b.properties_retentate[t].get_material_flow_terms(p, j) for p in pset if (p, j) in pcset ) + sum( b.properties_permeate[t].get_material_flow_terms(p, j) for p in pset if (p, j) in pcset ) @self.Constraint(self.flowsheet().time, self.properties_in.phase_component_set) def separation_constraint(b, t, p, j): comp = prop_params.get_component(j) if comp.is_solvent(): # Permeate flows equal to recovery * inlet flow return b.recovery_solvent[t] * b.properties_in[ t ].get_material_flow_terms(p, j) == b.properties_permeate[ t ].get_material_flow_terms( p, j ) elif j == self.config.electroneutrality_ion: # Rejection of electroneutrality ion will be solved using electroneutrality constraint return Constraint.Skip # else: Rejection = 1 - C_permeate/C_feed return ( (1 - b.rejection_comp[t, j]) * b.properties_in[t].conc_mol_phase_comp[p, j] ) == b.properties_permeate[t].conc_mol_phase_comp[p, j] if self.config.electroneutrality_ion is not None: # Add electroneutrality constraint for permeate stream @self.Constraint(self.flowsheet().time) def permeate_electronegativity(b, t): perm_state = b.properties_permeate[t] return 0 == sum( perm_state.params.get_component(j).config.charge * perm_state.flow_mol_phase_comp[p, j] for p, j in perm_state.phase_component_set if j in perm_state.params.ion_set ) # Retentate pressure balance if self.config.momentum_balance_type == MomentumBalanceType.pressureTotal: @self.Constraint(self.flowsheet().time, doc="Retentate pressure balance") def retentate_pressure_balance(b, t): expr = b.properties_retentate[t].pressure if b.config.has_pressure_change: expr += -b.deltaP[t] return b.properties_in[t].pressure == expr # Temperature equalities if self.config.energy_balance_type == EnergyBalanceType.isothermal: @self.Constraint( self.flowsheet().time, doc="Retentate temperature equality" ) def retentate_temperature_equality(b, t): return ( b.properties_in[t].temperature == b.properties_retentate[t].temperature ) @self.Constraint(self.flowsheet().time, doc="Permeate temperature equality") def permeate_temperature_equality(b, t): return ( b.properties_in[t].temperature == b.properties_permeate[t].temperature ) @self.Expression( self.flowsheet().config.time, self.config.property_package.phase_list, self.config.property_package.solvent_set, doc="Solvent mass transfer", ) def flux_vol_solvent(b, t, p, j): if b.properties_in[t].get_material_flow_basis() == MaterialFlowBasis.mass: return -b.properties_permeate[t].flow_mass_phase_comp[p, j] / ( b.properties_in[t].dens_mass_phase[p] * b.area ) elif ( b.properties_in[t].get_material_flow_basis() == MaterialFlowBasis.molar ): # TODO- come up with better way to handle different locations of mw_comp in property models (generic vs simple ion prop model) if hasattr(b.properties_in[t].params, "mw_comp"): mw_comp = b.properties_in[t].params.mw_comp[j] elif hasattr(b.properties_in[t], "mw_comp"): mw_comp = b.properties_in[t].mw_comp[j] else: raise ConfigurationError("mw_comp was not found.") return ( -b.properties_permeate[t].flow_mass_phase_comp[p, j] * mw_comp ) / (b.properties_in[t].dens_mass_phase[p] * b.area) @self.Expression( self.flowsheet().config.time, self.config.property_package.phase_list, self.config.property_package.solvent_set | self._solute_set, doc="Mass-based component recovery", ) def recovery_mass_phase_comp(b, t, p, j): return ( b.properties_permeate[t].flow_mass_phase_comp[p, j] / b.properties_in[t].flow_mass_phase_comp[p, j] ) @self.Expression( self.flowsheet().config.time, self.config.property_package.phase_list, doc="Volumetric-based recovery", ) def recovery_vol_phase(b, t, p): return ( b.properties_permeate[t].flow_vol_phase[p] / b.properties_in[t].flow_vol_phase[p] )