Source code for watertap.core.membrane_channel0d

#################################################################################
# WaterTAP Copyright (c) 2020-2024, The Regents of the University of California,
# through Lawrence Berkeley National Laboratory, Oak Ridge National Laboratory,
# National Renewable Energy Laboratory, and National Energy Technology
# Laboratory (subject to receipt of any required approvals from the U.S. Dept.
# of Energy). All rights reserved.
#
# Please see the files COPYRIGHT.md and LICENSE.md for full copyright and license
# information, respectively. These files are also available online at the URL
# "https://github.com/watertap-org/watertap/"
#################################################################################

import collections
import weakref

from pyomo.environ import (
    NegativeReals,
    NonNegativeReals,
    Set,
    Var,
)
from pyomo.core.base.indexed_component import slicer_types
from idaes.core import (
    declare_process_block_class,
    FlowDirection,
)
from idaes.core.util import scaling as iscale
from idaes.core.util.exceptions import ConfigurationError
from idaes.core.util.misc import add_object_reference
from idaes.core.base.control_volume0d import ControlVolume0DBlockData
import idaes.logger as idaeslog

from watertap.core.membrane_channel_base import (
    MembraneChannelMixin,
    PressureChangeType,
    CONFIG_Template as Base_CONFIG_Template,
)

CONFIG_Template = Base_CONFIG_Template()


[docs]class Not0Or1Error(KeyError): pass
class _0DPropertyHelper(collections.abc.Mapping): """ Class to make blk.properties_in and blk.properties_out like blk.properties from a 1D model. """ def __init__(self, blk, reverse=False): self._blk_ref = weakref.ref(blk) if reverse: self._get_property_x = self._get_property_x_backward else: self._get_property_x = self._get_property_x_forward def __len__(self): return len(self.blk.properties_in) + len(self.blk.properties_out) def __iter__(self): for t in self._get_property_x(0): yield (t, 0) for t in self._get_property_x(1): yield (t, 1) def __contains__(self, index): idx0 = index[0] idx1 = index[1] try: return idx0 in self._get_property_x(idx1) except Not0Or1Error: return False def __getitem__(self, index): if index is Ellipsis: return (_ for _ in self._props_in_out(index)) try: index_len = len(index) except: raise KeyError(index) if index_len != 2: raise KeyError(index) idx0 = index[0] idx1 = index[1] if type(idx1) in slicer_types: if isinstance(idx1, slice): if ( idx1.start is not None or idx1.stop is not None or idx1.step is not None ): raise IndexError("Indexed components only support simple slices") return (_ for _ in self._props_in_out(idx0)) try: props = self._get_property_x(idx1) return props[idx0] except Not0Or1Error: raise KeyError(index) def keys(self): for t in self._get_property_x(0).keys(): yield (t, 0) for t in self._get_property_x(1).keys(): yield (t, 1) def items(self): for t, v in self._get_property_x(0).items(): yield (t, 0, v) for t, v in self._get_property_x(1).items(): yield (t, 1, v) def values(self): yield from self._get_property_x(0).values() yield from self._get_property_x(1).values() @property def blk(self): return self._blk_ref() def _get_property_x_forward(self, x): if x == 0: return self.blk.properties_in elif x == 1: return self.blk.properties_out else: raise Not0Or1Error def _get_property_x_backward(self, x): if x == 1: return self.blk.properties_in elif x == 0: return self.blk.properties_out else: raise Not0Or1Error def _props_in_out(self, idx0): if type(idx0) in slicer_types: yield from self._get_property_x(0)[idx0] yield from self._get_property_x(1)[idx0] else: yield self._get_property_x(0)[idx0] yield self._get_property_x(1)[idx0]
[docs]@declare_process_block_class("MembraneChannel0DBlock") class MembraneChannel0DBlockData(MembraneChannelMixin, ControlVolume0DBlockData): def _skip_element(self, x): return False # overwrite CV0D `add_geometry`
[docs] def add_geometry( self, length_var=None, width_var=None, flow_direction=FlowDirection.forward ): """ Method to create spatial domain and volume Var in ControlVolume. Args: length_var - (optional) external variable to use for the length of the channel. If a variable is provided, a reference will be made to this in place of the length Var. width_var - (optional) external variable to use for the width of the channel. If a variable is provided, a reference will be made to this in place of the length Var. flow_direction - argument indicating direction of material flow relative to length domain. Valid values: - FlowDirection.forward (default), flow goes from 0 to 1. - FlowDirection.backward, flow goes from 1 to 0 Returns: None """ # volume will be added be calling add_geometry from ControlVolume0D if not hasattr(self, "length") and not hasattr(self, "width"): self._add_var_reference(length_var, "length", "length_var") self._add_var_reference(width_var, "width", "width_var") if hasattr(self, "length") and hasattr(self, "width"): super().add_geometry() units_meta = self.config.property_package.get_metadata().get_derived_units if not hasattr(self, "channel_height"): self.channel_height = Var( initialize=1e-3, bounds=(1e-4, 5e-3), domain=NonNegativeReals, units=units_meta("length"), doc="membrane-channel height", ) # # TODO: negating spacer volume for now as it can be assumed negligible (Park et al., 2020: https://doi.org/10.1016/j.desal.2020.114625). Revisit. # # Note: considering that this volume relationship holds for both flat_sheet and spiral_wound types @self.Constraint( self.flowsheet().config.time, doc="Membrane-channel volume" ) def eq_volume(b, t): return b.volume[t] == b.length * b.width * b.channel_height # Validate and create flow direction attribute, like 1D if flow_direction in (flwd for flwd in FlowDirection): self._flow_direction = flow_direction else: raise ConfigurationError( "{} invalid value for flow_direction " "argument. Must be a FlowDirection Enum.".format(self.name) )
[docs] def add_state_blocks(self, has_phase_equilibrium=None): """ This method constructs the state blocks for the control volume. Args: has_phase_equilibrium: indicates whether equilibrium calculations will be required in state blocks Returns: None """ super().add_state_blocks(has_phase_equilibrium=has_phase_equilibrium) # quack like a 1D model self.length_domain = Set(ordered=True, initialize=(0.0, 1.0)) add_object_reference(self, "difference_elements", self.length_domain) self._set_nfe() if self._flow_direction == FlowDirection.forward: self.properties = _0DPropertyHelper(self, reverse=False) elif self._flow_direction == FlowDirection.backward: self.properties = _0DPropertyHelper(self, reverse=True) else: raise ConfigurationError( "FlowDirection must be set to FlowDirection.forward or FlowDirection.backward." ) self._add_interface_stateblock(has_phase_equilibrium)
def apply_transformation(self): pass def _add_pressure_change(self, pressure_change_type=PressureChangeType.calculated): if pressure_change_type == PressureChangeType.fixed_per_stage: return units_meta = self.config.property_package.get_metadata().get_derived_units if pressure_change_type == PressureChangeType.fixed_per_unit_length: # Pressure change equation when dP/dx = user-specified constant, self.dP_dx = Var( self.flowsheet().config.time, initialize=-5e4, bounds=(-2e5, None), domain=NegativeReals, units=units_meta("pressure") * units_meta("length") ** -1, doc="pressure drop per unit length across channel", ) @self.Constraint( self.flowsheet().config.time, doc="pressure change due to friction" ) def eq_pressure_change(b, t): return b.deltaP[t] == b.dP_dx[t] * b.length elif pressure_change_type == PressureChangeType.calculated: self.dP_dx = Var( self.flowsheet().config.time, self.length_domain, initialize=-5e4, bounds=(-2e5, -1e3), domain=NegativeReals, units=units_meta("pressure") * units_meta("length") ** -1, doc="Pressure drop per unit length of channel at inlet and outlet", ) @self.Constraint( self.flowsheet().config.time, doc="Total Pressure drop across channel" ) def eq_pressure_change(b, t): return b.deltaP[t] == sum( b.dP_dx[t, x] * b.length / b.nfe for x in b.length_domain ) else: raise ConfigurationError( f"Unrecognized pressure_change_type {pressure_change_type}" )
[docs] def initialize( self, state_args=None, outlvl=idaeslog.NOTSET, optarg=None, solver=None, hold_state=True, initialize_guess=None, ): """ Initialization routine for the membrane channel control volume Keyword Arguments: state_args : a dict of arguments to be passed to the property package(s) to provide an initial state for initialization (see documentation of the specific property package) (default = {}). outlvl : sets output log level of initialization routine optarg : solver options dictionary object (default=None, use default solver options) solver : str indicating which solver to use during initialization (default = None) hold_state : flag indicating whether the initialization routine should unfix any state variables fixed during initialization, **default** - True. **Valid values:** **True** - states variables are not unfixed, and a dict of returned containing flags for which states were fixed during initialization, **False** - state variables are unfixed after initialization by calling the release_state method. initialize_guess : a dict of guesses for solvent_recovery, solute_recovery, and cp_modulus. These guesses offset the initial values for the retentate, permeate, and membrane interface state blocks from the inlet feed (default = {'deltaP': -1e4, 'solvent_recovery': 0.5, 'solute_recovery': 0.01, 'cp_modulus': 1.1}) Returns: If hold_states is True, returns a dict containing flags for which states were fixed during initialization. """ # Get inlet state if not provided init_log = idaeslog.getInitLogger(self.name, outlvl, tag="control_volume") solve_log = idaeslog.getSolveLogger(self.name, outlvl, tag="control_volume") # TODO: this function needs to be changed for use on the permeate side state_args = self._get_state_args(initialize_guess, state_args) # intialize self.properties state_args_properties_in = state_args["feed_side"] if self._flow_direction == FlowDirection.forward: state_args_properties_out = state_args["retentate"] else: state_args_properties_out = state_args["permeate"] source_flags = self.properties_in.initialize( state_args=state_args_properties_in, outlvl=outlvl, optarg=optarg, solver=solver, hold_state=True, ) self.properties_out.initialize( state_args=state_args_properties_out, outlvl=outlvl, optarg=optarg, solver=solver, ) state_args_interface = self._get_state_args_interface( initialize_guess, state_args_properties_in, state_args_properties_out ) self.properties_interface.initialize( outlvl=outlvl, optarg=optarg, solver=solver, state_args=state_args_interface, ) init_log.info("Initialization Complete") if hold_state: return source_flags else: self.release_state(source_flags, outlvl)
def calculate_scaling_factors(self): # set volume scale factor first otherwise ControlVolume0D default will be set if hasattr(self, "volume"): for v in self.volume.values(): if iscale.get_scaling_factor(v) is None: iscale.set_scaling_factor(v, 1e3) super().calculate_scaling_factors() if hasattr(self, "area"): if iscale.get_scaling_factor(self.area) is None: iscale.set_scaling_factor(self.area, 100) if hasattr(self, "dP_dx"): for v in self.dP_dx.values(): if iscale.get_scaling_factor(v) is None: iscale.set_scaling_factor(v, 1e-4)