Source code for watertap.core.plugins.solvers

#################################################################################
# WaterTAP Copyright (c) 2020-2024, The Regents of the University of California,
# through Lawrence Berkeley National Laboratory, Oak Ridge National Laboratory,
# National Renewable Energy Laboratory, and National Energy Technology
# Laboratory (subject to receipt of any required approvals from the U.S. Dept.
# of Energy). All rights reserved.
#
# Please see the files COPYRIGHT.md and LICENSE.md for full copyright and license
# information, respectively. These files are also available online at the URL
# "https://github.com/watertap-org/watertap/"
#################################################################################

import logging

import pyomo.environ as pyo
from pyomo.core.base.block import _BlockData
from pyomo.core.kernel.block import IBlock
from pyomo.solvers.plugins.solvers.IPOPT import IPOPT

import idaes.core.util.scaling as iscale
from idaes.core.util.scaling import (
    get_scaling_factor,
    set_scaling_factor,
    unset_scaling_factor,
)
from idaes.logger import getLogger

_log = getLogger("watertap.core")

_pyomo_nl_writer_log = logging.getLogger("pyomo.repn.plugins.nl_writer")


def _pyomo_nl_writer_logger_filter(record):
    msg = record.getMessage()
    if "scaling_factor" in msg and "model contains export suffix" in msg:
        return False
    return True


[docs]@pyo.SolverFactory.register( "ipopt-watertap", doc="The Ipopt NLP solver, with user-based variable and automatic Jacobian constraint scaling", ) class IpoptWaterTAP(IPOPT):
[docs] def __init__(self, **kwds): kwds["name"] = "ipopt-watertap" self._cleanup_needed = False super().__init__(**kwds)
def _presolve(self, *args, **kwds): if len(args) > 1 or len(args) == 0: raise TypeError( f"IpoptWaterTAP.solve takes 1 positional argument but {len(args)} were given" ) if not isinstance(args[0], (_BlockData, IBlock)): raise TypeError( "IpoptWaterTAP.solve takes 1 positional argument: a Pyomo ConcreteModel or Block" ) # until proven otherwise self._cleanup_needed = False self._tee = kwds.get("tee", False) # Set the default watertap options if "tol" not in self.options: self.options["tol"] = 1e-08 if "constr_viol_tol" not in self.options: self.options["constr_viol_tol"] = 1e-08 if "acceptable_constr_viol_tol" not in self.options: self.options["acceptable_constr_viol_tol"] = 1e-08 if "bound_relax_factor" not in self.options: self.options["bound_relax_factor"] = 0.0 if "honor_original_bounds" not in self.options: self.options["honor_original_bounds"] = "no" if not self._is_user_scaling(): super()._presolve(*args, **kwds) self._cleanup() return if self._tee: print( "ipopt-watertap: Ipopt with user variable scaling and IDAES jacobian constraint scaling" ) # These options are typically available with gradient-scaling, and they # have corresponding options in the IDAES constraint_autoscale_large_jac # function. Here we use their Ipopt names and default values, see # https://coin-or.github.io/Ipopt/OPTIONS.html#OPT_NLP_Scaling max_grad = self._get_option("nlp_scaling_max_gradient", 100) min_scale = self._get_option("nlp_scaling_min_value", 1e-08) # These options are custom for the IDAES constraint_autoscale_large_jac # function. We expose them as solver options as this has become part # of the solve process. ignore_variable_scaling = self._get_option("ignore_variable_scaling", False) ignore_constraint_scaling = self._get_option("ignore_constraint_scaling", False) self._model = args[0] self._cache_scaling_factors() self._cleanup_needed = True _pyomo_nl_writer_log.addFilter(_pyomo_nl_writer_logger_filter) # NOTE: This function sets the scaling factors on the # constraints. Hence we cache the constraint scaling # factors and reset them to their original values # so that repeated calls to solve change the scaling # each time based on the initial values, just like in Ipopt. try: _, _, nlp = iscale.constraint_autoscale_large_jac( self._model, ignore_constraint_scaling=ignore_constraint_scaling, ignore_variable_scaling=ignore_variable_scaling, max_grad=max_grad, min_scale=min_scale, ) except Exception as err: nlp = None if str(err) == "Error in AMPL evaluation": print( "ipopt-watertap: Issue in AMPL function evaluation; Jacobian constraint scaling not applied." ) halt_on_ampl_error = self.options.get("halt_on_ampl_error", "yes") if halt_on_ampl_error == "no": print( "ipopt-watertap: halt_on_ampl_error=no, so continuing with optimization." ) else: self._cleanup() raise RuntimeError( "Error in AMPL evaluation.\n" "Run ipopt with halt_on_ampl_error=yes and symbolic_solver_labels=True to see the affected function." ) else: print("Error in constraint_autoscale_large_jac") self._cleanup() raise # set different default for `alpha_for_y` if this is an LP # see: https://coin-or.github.io/Ipopt/OPTIONS.html#OPT_alpha_for_y if nlp is not None: if nlp.nnz_hessian_lag() == 0: if "alpha_for_y" not in self.options: self.options["alpha_for_y"] = "bound-mult" try: # this creates the NL file, among other things return super()._presolve(*args, **kwds) except: self._cleanup() raise def _cleanup(self): if self._cleanup_needed: self._reset_scaling_factors() # remove our reference to the model del self._model _pyomo_nl_writer_log.removeFilter(_pyomo_nl_writer_logger_filter) def _postsolve(self): self._cleanup() return super()._postsolve() def _cache_scaling_factors(self): self._scaling_cache = [ (c, get_scaling_factor(c)) for c in self._model.component_data_objects( pyo.Constraint, active=True, descend_into=True ) ] def _reset_scaling_factors(self): for c, s in self._scaling_cache: if s is None: unset_scaling_factor(c) else: set_scaling_factor(c, s) del self._scaling_cache def _get_option(self, option_name, default_value): # NOTE: options get reset to their original value at the end of the # OptSolver.solve. The options in _presolve (where this is called) # are already copies of the original, so it is safe to pop them so # they don't get sent to Ipopt. option_value = self.options.pop(option_name, None) if option_value is None: option_value = default_value else: if self._tee: print(f"ipopt-watertap: {option_name}={option_value}") return option_value def _is_user_scaling(self): if "nlp_scaling_method" not in self.options: self.options["nlp_scaling_method"] = "user-scaling" if self.options["nlp_scaling_method"] != "user-scaling": if self._tee: print( "The ipopt-watertap solver is designed to be run with user-scaling. " f"Ipopt with nlp_scaling_method={self.options['nlp_scaling_method']} will be used instead" ) return False return True
## reconfigure IDAES to use the ipopt-watertap solver import idaes _default_solver_config_value = idaes.cfg.get("default_solver") _idaes_default_solver = _default_solver_config_value._default _default_solver_config_value.set_default_value("ipopt-watertap") if not _default_solver_config_value._userSet: _default_solver_config_value.reset()