Source code for watertap.unit_models.nanofiltration_ZO

#################################################################################
# WaterTAP Copyright (c) 2020-2024, The Regents of the University of California,
# through Lawrence Berkeley National Laboratory, Oak Ridge National Laboratory,
# National Renewable Energy Laboratory, and National Energy Technology
# Laboratory (subject to receipt of any required approvals from the U.S. Dept.
# of Energy). All rights reserved.
#
# Please see the files COPYRIGHT.md and LICENSE.md for full copyright and license
# information, respectively. These files are also available online at the URL
# "https://github.com/watertap-org/watertap/"
#################################################################################

# Import Pyomo libraries
from pyomo.environ import (
    Var,
    check_optimal_termination,
    Param,
    Suffix,
    NonNegativeReals,
    Reference,
    units as pyunits,
)
from pyomo.common.config import Bool, ConfigBlock, ConfigValue, In

# Import IDAES cores
from idaes.core import (
    declare_process_block_class,
    MaterialBalanceType,
    EnergyBalanceType,
    MomentumBalanceType,
    UnitModelBlockData,
    useDefault,
    MaterialFlowBasis,
)
from watertap.core.solvers import get_solver
from idaes.core.util.tables import create_stream_table_dataframe
from idaes.core.util.config import is_physical_parameter_block
from idaes.core.util.exceptions import ConfigurationError, InitializationError
import idaes.core.util.scaling as iscale
import idaes.logger as idaeslog

from watertap.core import ControlVolume0DBlock, InitializationMixin
from watertap.costing.unit_models.nanofiltration import cost_nanofiltration


_log = idaeslog.getLogger(__name__)


[docs]@declare_process_block_class("NanofiltrationZO") class NanofiltrationData(InitializationMixin, UnitModelBlockData): """ Zero order nanofiltration model based on specified water flux and ion rejection. Default data from Table 9 in Labban et al. (2017) https://doi.org/10.1016/j.memsci.2016.08.062 """ CONFIG = ConfigBlock() CONFIG.declare( "dynamic", ConfigValue( domain=In([False]), default=False, description="Dynamic model flag - must be False", doc="""Indicates whether this model will be dynamic or not, **default** = False. NF units do not support dynamic behavior.""", ), ) CONFIG.declare( "has_holdup", ConfigValue( default=False, domain=In([False]), description="Holdup construction flag - must be False", doc="""Indicates whether holdup terms should be constructed or not. **default** - False. NF units do not have defined volume, thus this must be False.""", ), ) CONFIG.declare( "material_balance_type", ConfigValue( default=MaterialBalanceType.useDefault, domain=In(MaterialBalanceType), description="Material balance construction flag", doc="""Indicates what type of mass balance should be constructed, **default** - MaterialBalanceType.useDefault. **Valid values:** { **MaterialBalanceType.useDefault - refer to property package for default balance type **MaterialBalanceType.none** - exclude material balances, **MaterialBalanceType.componentPhase** - use phase component balances, **MaterialBalanceType.componentTotal** - use total component balances, **MaterialBalanceType.elementTotal** - use total element balances, **MaterialBalanceType.total** - use total material balance.}""", ), ) CONFIG.declare( "is_isothermal", ConfigValue( default=True, domain=Bool, description="""Assume isothermal conditions for control volume(s); energy_balance_type must be EnergyBalanceType.none, **default** - True.""", ), ) CONFIG.declare( "energy_balance_type", ConfigValue( default=EnergyBalanceType.none, domain=In(EnergyBalanceType), description="Energy balance construction flag", doc="""Indicates what type of energy balance should be constructed, **default** - EnergyBalanceType.none. **Valid values:** { **EnergyBalanceType.useDefault - refer to property package for default balance type **EnergyBalanceType.none** - exclude energy balances, **EnergyBalanceType.enthalpyTotal** - single enthalpy balance for material, **EnergyBalanceType.enthalpyPhase** - enthalpy balances for each phase, **EnergyBalanceType.energyTotal** - single energy balance for material, **EnergyBalanceType.energyPhase** - energy balances for each phase.}""", ), ) CONFIG.declare( "momentum_balance_type", ConfigValue( default=MomentumBalanceType.pressureTotal, domain=In(MomentumBalanceType), description="Momentum balance construction flag", doc="""Indicates what type of momentum balance should be constructed, **default** - MomentumBalanceType.pressureTotal. **Valid values:** { **MomentumBalanceType.none** - exclude momentum balances, **MomentumBalanceType.pressureTotal** - single pressure balance for material, **MomentumBalanceType.pressurePhase** - pressure balances for each phase, **MomentumBalanceType.momentumTotal** - single momentum balance for material, **MomentumBalanceType.momentumPhase** - momentum balances for each phase.}""", ), ) CONFIG.declare( "has_pressure_change", ConfigValue( default=False, domain=In([True, False]), description="Pressure change term construction flag", doc="""Indicates whether terms for pressure change should be constructed, **default** - False. **Valid values:** { **True** - include pressure change terms, **False** - exclude pressure change terms.}""", ), ) CONFIG.declare( "property_package", ConfigValue( default=useDefault, domain=is_physical_parameter_block, description="Property package to use for control volume", doc="""Property parameter object used to define property calculations, **default** - useDefault. **Valid values:** { **useDefault** - use default package from parent model or flowsheet, **PhysicalParameterObject** - a PhysicalParameterBlock object.}""", ), ) CONFIG.declare( "property_package_args", ConfigBlock( implicit=True, description="Arguments to use for constructing property packages", doc="""A ConfigBlock with arguments to be passed to a property block(s) and used when constructing these, **default** - None. **Valid values:** { see property package for documentation.}""", ), ) def _process_config(self): if len(self.config.property_package.solvent_set) > 1: raise ConfigurationError( "NF model only supports one solvent component," "the provided property package has specified {} solvent components".format( len(self.config.property_package.solvent_set) ) ) if len(self.config.property_package.solvent_set) == 0: raise ConfigurationError( "The NF model was expecting a solvent and did not receive it." ) if ( len(self.config.property_package.solute_set) == 0 and len(self.config.property_package.ion_set) == 0 ): raise ConfigurationError( "The NF model was expecting at least one solute or ion and did not receive any." ) def _validate_config(self): if ( self.config.is_isothermal and self.config.energy_balance_type != EnergyBalanceType.none ): raise ConfigurationError( "If the isothermal assumption is used then the energy balance type must be none" )
[docs] def build(self): # Call UnitModel.build to setup dynamics super().build() self.scaling_factor = Suffix(direction=Suffix.EXPORT) units_meta = self.config.property_package.get_metadata().get_derived_units # Check configs for errors self._validate_config() self._process_config() if hasattr(self.config.property_package, "ion_set"): solute_set = self.config.property_package.ion_set elif hasattr(self.config.property_package, "solute_set"): solute_set = self.config.property_package.solute_set solvent_solute_set = self.config.property_package.solvent_set | solute_set # Add unit parameters self.flux_vol_solvent = Var( self.flowsheet().config.time, self.config.property_package.solvent_set, initialize=1.67e-6, bounds=(0.0, 1e-4), units=units_meta("length") * units_meta("time") ** -1, doc="Solvent volumetric flux", ) self.rejection_phase_comp = Var( self.flowsheet().config.time, self.config.property_package.phase_list, solute_set, initialize=0.9, bounds=(-1 + 1e-6, 1 - 1e-6), units=pyunits.dimensionless, doc="Observed solute rejection", ) self.dens_solvent = Param( initialize=1000, units=units_meta("mass") * units_meta("length") ** -3, doc="Pure water density", ) # Add unit variables self.area = Var( initialize=1, bounds=(0.0, 1e6), domain=NonNegativeReals, units=units_meta("length") ** 2, doc="Membrane area", ) def recovery_mass_phase_comp_initialize(b, t, p, j): if j in b.config.property_package.solvent_set: return 0.8 elif j in solute_set: return 0.1 def recovery_mass_phase_comp_bounds(b, t, p, j): ub = 1 - 1e-6 if j in b.config.property_package.solvent_set: lb = 1e-2 elif j in solute_set: lb = 1e-5 else: lb = 1e-5 return lb, ub self.recovery_mass_phase_comp = Var( self.flowsheet().config.time, self.config.property_package.phase_list, solvent_solute_set, initialize=recovery_mass_phase_comp_initialize, bounds=recovery_mass_phase_comp_bounds, units=pyunits.dimensionless, doc="Mass-based component recovery", ) self.recovery_vol_phase = Var( self.flowsheet().config.time, self.config.property_package.phase_list, initialize=0.1, bounds=(1e-2, 1 - 1e-6), units=pyunits.dimensionless, doc="Volumetric-based recovery", ) # Build control volume for feed side self.feed_side = ControlVolume0DBlock( dynamic=False, has_holdup=False, property_package=self.config.property_package, property_package_args=self.config.property_package_args, ) self.feed_side.add_state_blocks(has_phase_equilibrium=False) self.feed_side.add_material_balances( balance_type=self.config.material_balance_type, has_mass_transfer=True ) self.feed_side.add_energy_balances( balance_type=self.config.energy_balance_type, has_enthalpy_transfer=False, ) if self.config.is_isothermal: self.feed_side.add_isothermal_assumption() self.feed_side.add_momentum_balances( balance_type=self.config.momentum_balance_type, has_pressure_change=self.config.has_pressure_change, ) # Add permeate block tmp_dict = dict(**self.config.property_package_args) tmp_dict["has_phase_equilibrium"] = False tmp_dict["parameters"] = self.config.property_package tmp_dict["defined_state"] = False # permeate block is not an inlet self.properties_permeate = self.config.property_package.state_block_class( self.flowsheet().config.time, doc="Material properties of permeate", **tmp_dict, ) # Add Ports self.add_inlet_port(name="inlet", block=self.feed_side) self.add_outlet_port(name="retentate", block=self.feed_side) self.add_port(name="permeate", block=self.properties_permeate) # References for control volume # pressure change if ( self.config.has_pressure_change is True and self.config.momentum_balance_type != "none" ): self.deltaP = Reference(self.feed_side.deltaP) # mass transfer self.mass_transfer_phase_comp = Var( self.flowsheet().config.time, self.config.property_package.phase_list, solvent_solute_set, initialize=1, bounds=(0.0, 1e6), domain=NonNegativeReals, units=units_meta("mass") * units_meta("time") ** -1, doc="Mass transfer to permeate", ) @self.Constraint( self.flowsheet().config.time, self.config.property_package.phase_list, solvent_solute_set, doc="Mass transfer term", ) def eq_mass_transfer_term(b, t, p, j): # TODO- come up with better way to handle different locations of mw_comp in property models (generic vs simple ion prop model) if ( b.feed_side.properties_in[0].get_material_flow_basis() == MaterialFlowBasis.mass ): return ( b.mass_transfer_phase_comp[t, p, j] == -b.feed_side.mass_transfer_term[t, p, j] ) elif ( b.feed_side.properties_in[0].get_material_flow_basis() == MaterialFlowBasis.molar ): if hasattr(b.feed_side.properties_in[0].params, "mw_comp"): mw_comp = b.feed_side.properties_in[0].params.mw_comp[j] elif hasattr(b.feed_side.properties_in[0], "mw_comp"): mw_comp = b.feed_side.properties_in[0].mw_comp[j] else: raise ConfigurationError("mw_comp was not found.") return ( b.mass_transfer_phase_comp[t, p, j] == -b.feed_side.mass_transfer_term[t, p, j] * mw_comp ) # NF performance equations @self.Constraint( self.flowsheet().config.time, self.config.property_package.phase_list, self.config.property_package.solvent_set, doc="Solvent mass transfer", ) def eq_solvent_transfer(b, t, p, j): if ( b.feed_side.properties_in[0].get_material_flow_basis() == MaterialFlowBasis.mass ): return ( b.flux_vol_solvent[t, j] * b.dens_solvent * b.area == -b.feed_side.mass_transfer_term[t, p, j] ) elif ( b.feed_side.properties_in[0].get_material_flow_basis() == MaterialFlowBasis.molar ): # TODO- come up with better way to handle different locations of mw_comp in property models (generic vs simple ion prop model) if hasattr(b.feed_side.properties_in[0].params, "mw_comp"): mw_comp = b.feed_side.properties_in[0].params.mw_comp[j] elif hasattr(b.feed_side.properties_in[0], "mw_comp"): mw_comp = b.feed_side.properties_in[0].mw_comp[j] else: raise ConfigurationError("mw_comp was not found.") return ( b.flux_vol_solvent[t, j] * b.dens_solvent * b.area == -b.feed_side.mass_transfer_term[t, p, j] * mw_comp ) @self.Constraint( self.flowsheet().config.time, self.config.property_package.phase_list, solvent_solute_set, doc="Permeate production", ) def eq_permeate_production(b, t, p, j): return ( b.properties_permeate[t].get_material_flow_terms(p, j) == -b.feed_side.mass_transfer_term[t, p, j] ) @self.Constraint( self.flowsheet().config.time, self.config.property_package.phase_list, solute_set, doc="Solute rejection", ) def eq_rejection_phase_comp(b, t, p, j): return b.properties_permeate[t].conc_mol_phase_comp[ "Liq", j ] == b.feed_side.properties_in[t].conc_mol_phase_comp["Liq", j] * ( 1 - b.rejection_phase_comp[t, p, j] ) @self.Constraint(self.flowsheet().config.time) def eq_recovery_vol_phase(b, t): return ( b.recovery_vol_phase[t, "Liq"] == b.properties_permeate[t].flow_vol / b.feed_side.properties_in[t].flow_vol ) @self.Constraint(self.flowsheet().config.time, solvent_solute_set) def eq_recovery_mass_phase_comp(b, t, j): return ( b.recovery_mass_phase_comp[t, "Liq", j] == b.properties_permeate[t].flow_mass_phase_comp["Liq", j] / b.feed_side.properties_in[t].flow_mass_phase_comp["Liq", j] ) @self.Constraint( self.flowsheet().config.time, doc="Isothermal assumption for permeate" ) def eq_permeate_isothermal(b, t): return ( b.feed_side.properties_in[t].temperature == b.properties_permeate[t].temperature )
[docs] def initialize_build( self, state_args=None, outlvl=idaeslog.NOTSET, solver=None, optarg=None, ): """ General wrapper for pressure changer initialization routines Keyword Arguments: state_args : a dict of arguments to be passed to the property package(s) to provide an initial state for initialization (see documentation of the specific property package) (default = {}). outlvl : sets output level of initialization routine optarg : solver options dictionary object (default=None) solver : str indicating which solver to use during initialization (default = None) Returns: None """ init_log = idaeslog.getInitLogger(self.name, outlvl, tag="unit") solve_log = idaeslog.getSolveLogger(self.name, outlvl, tag="unit") opt = get_solver(solver, optarg) # --------------------------------------------------------------------- # Initialize holdup block flags = self.feed_side.initialize( outlvl=outlvl, optarg=optarg, solver=solver, state_args=state_args, ) init_log.info_high("Initialization Step 1 Complete.") # --------------------------------------------------------------------- # Initialize permeate # Set state_args from inlet state if state_args is None: state_args = {} state_dict = self.feed_side.properties_in[ self.flowsheet().config.time.first() ].define_port_members() for k in state_dict.keys(): if state_dict[k].is_indexed(): state_args[k] = {} for m in state_dict[k].keys(): state_args[k][m] = state_dict[k][m].value else: state_args[k] = state_dict[k].value self.properties_permeate.initialize( outlvl=outlvl, optarg=optarg, solver=solver, state_args=state_args, ) init_log.info_high("Initialization Step 2 Complete.") # --------------------------------------------------------------------- # Solve unit with idaeslog.solver_log(solve_log, idaeslog.DEBUG) as slc: res = opt.solve(self, tee=slc.tee) init_log.info_high("Initialization Step 3 {}.".format(idaeslog.condition(res))) # --------------------------------------------------------------------- # Release Inlet state self.feed_side.release_state(flags, outlvl + 1) init_log.info("Initialization Complete: {}".format(idaeslog.condition(res))) if not check_optimal_termination(res): raise InitializationError(f"Unit model {self.name} failed to initialize")
def _get_performance_contents(self, time_point=0): for k in ("ion_set", "solute_set"): if hasattr(self.config.property_package, k): solute_set = getattr(self.config.property_package, k) break var_dict = {} expr_dict = {} var_dict["Volumetric Recovery Rate"] = self.recovery_vol_phase[ time_point, "Liq" ] var_dict["Solvent Mass Recovery Rate"] = self.recovery_mass_phase_comp[ time_point, "Liq", "H2O" ] var_dict["Membrane Area"] = self.area if hasattr(self, "deltaP"): var_dict["Pressure Change"] = self.deltaP[time_point] if self.feed_side.properties_in[time_point].is_property_constructed("flow_vol"): if self.feed_side.properties_in[time_point].flow_vol.is_variable_type(): obj_dict = var_dict elif self.feed_side.properties_in[ time_point ].flow_vol.is_named_expression_type(): obj_dict = expr_dict else: raise Exception( f"{self.feed_side.properties_in[time_point].flow_vol} isn't a variable nor expression" ) obj_dict["Volumetric Flowrate @Inlet"] = self.feed_side.properties_in[ time_point ].flow_vol if self.feed_side.properties_out[time_point].is_property_constructed( "flow_vol" ): if self.feed_side.properties_out[time_point].flow_vol.is_variable_type(): obj_dict = var_dict elif self.feed_side.properties_out[ time_point ].flow_vol.is_named_expression_type(): obj_dict = expr_dict else: raise Exception( f"{self.feed_side.properties_in[time_point].flow_vol} isn't a variable nor expression" ) obj_dict["Volumetric Flowrate @Outlet"] = self.feed_side.properties_out[ time_point ].flow_vol var_dict["Solvent Volumetric Flux"] = self.flux_vol_solvent[time_point, "H2O"] for j in solute_set: var_dict[f"{j} Rejection"] = self.rejection_phase_comp[time_point, "Liq", j] if ( self.feed_side.properties_in[time_point] .conc_mol_phase_comp["Liq", j] .is_expression_type() ): obj_dict = expr_dict elif ( self.feed_side.properties_in[time_point] .conc_mol_phase_comp["Liq", j] .is_variable_type() ): obj_dict = var_dict obj_dict[f"{j} Molar Concentration @Inlet"] = self.feed_side.properties_in[ time_point ].conc_mol_phase_comp["Liq", j] obj_dict[f"{j} Molar Concentration @Outlet"] = ( self.feed_side.properties_out[time_point].conc_mol_phase_comp["Liq", j] ) obj_dict[f"{j} Molar Concentration @Permeate"] = self.properties_permeate[ time_point ].conc_mol_phase_comp["Liq", j] return {"vars": var_dict, "exprs": expr_dict} def _get_stream_table_contents(self, time_point=0): return create_stream_table_dataframe( { "Feed Inlet": self.inlet, "Feed Outlet": self.retentate, "Permeate Outlet": self.permeate, }, time_point=time_point, ) def calculate_scaling_factors(self): super().calculate_scaling_factors() for k in ("ion_set", "solute_set"): if hasattr(self.config.property_package, k): solute_set = getattr(self.config.property_package, k) break # TODO: require users to set scaling factor for area or calculate it based on mass transfer and flux iscale.set_scaling_factor(self.area, 1e-1) # setting scaling factors for variables # these variables should have user input, if not there will be a warning if iscale.get_scaling_factor(self.area) is None: sf = iscale.get_scaling_factor(self.area, default=1, warning=True) iscale.set_scaling_factor(self.area, sf) # these variables do not typically require user input, # will not override if the user does provide the scaling factor # TODO: this default scaling assumes SI units rather than being based on the property package if iscale.get_scaling_factor(self.dens_solvent) is None: iscale.set_scaling_factor(self.dens_solvent, 1e-3) for t, v in self.flux_vol_solvent.items(): if iscale.get_scaling_factor(v) is None: iscale.set_scaling_factor(v, 1e6) for (t, p, j), v in self.rejection_phase_comp.items(): if iscale.get_scaling_factor(v) is None: iscale.set_scaling_factor(v, 1e1) for (t, p, j), v in self.mass_transfer_phase_comp.items(): if iscale.get_scaling_factor(v) is None: sf = 10 * iscale.get_scaling_factor( self.feed_side.properties_in[t].get_material_flow_terms(p, j) ) iscale.set_scaling_factor(v, sf) if iscale.get_scaling_factor(self.recovery_vol_phase) is None: iscale.set_scaling_factor(self.recovery_vol_phase, 1) for (t, p, j), v in self.recovery_mass_phase_comp.items(): if j in self.config.property_package.solvent_set: sf = 1 elif j in solute_set: sf = 10 if iscale.get_scaling_factor(v) is None: iscale.set_scaling_factor(v, sf) # transforming constraints for ind, c in self.eq_mass_transfer_term.items(): sf = iscale.get_scaling_factor(self.mass_transfer_phase_comp[ind]) iscale.constraint_scaling_transform(c, sf) for ind, c in self.eq_solvent_transfer.items(): sf = iscale.get_scaling_factor(self.mass_transfer_phase_comp[ind]) iscale.constraint_scaling_transform(c, sf) for ind, c in self.eq_permeate_production.items(): sf = iscale.get_scaling_factor(self.mass_transfer_phase_comp[ind]) iscale.constraint_scaling_transform(c, sf) for ind, c in self.eq_rejection_phase_comp.items(): sf = iscale.get_scaling_factor(self.rejection_phase_comp[ind]) iscale.constraint_scaling_transform(c, sf) for t, c in self.eq_permeate_isothermal.items(): sf = iscale.get_scaling_factor(self.feed_side.properties_in[t].temperature) iscale.constraint_scaling_transform(c, sf) for t, c in self.eq_recovery_vol_phase.items(): sf = iscale.get_scaling_factor(self.recovery_vol_phase[t, "Liq"]) iscale.constraint_scaling_transform(c, sf) for (t, j), c in self.eq_recovery_mass_phase_comp.items(): sf = iscale.get_scaling_factor(self.recovery_mass_phase_comp[t, "Liq", j]) iscale.constraint_scaling_transform(c, sf) @property def default_costing_method(self): return cost_nanofiltration