###############################################################################
# WaterTAP Copyright (c) 2021, The Regents of the University of California,
# through Lawrence Berkeley National Laboratory, Oak Ridge National
# Laboratory, National Renewable Energy Laboratory, and National Energy
# Technology Laboratory (subject to receipt of any required approvals from
# the U.S. Dept. of Energy). All rights reserved.
#
# Please see the files COPYRIGHT.md and LICENSE.md for full copyright and license
# information, respectively. These files are also available online at the URL
# "https://github.com/watertap-org/watertap/"
#
###############################################################################
"""
This module contains utility functions for initialization of WaterTAP models.
"""
__author__ = "Adam Atia"
from pyomo.environ import check_optimal_termination, Var
from idaes.core.util.model_statistics import degrees_of_freedom
from idaes.core.util.scaling import get_scaling_factor, __none_left_mult
from idaes.core.solvers import get_solver
import idaes.logger as idaeslog
_log = idaeslog.getLogger(__name__)
[docs]def check_solve(results, checkpoint=None, logger=_log, fail_flag=False):
"""
Check that solver termination is optimal and OK in an initialization routine.
If the check fails, proceed through initialization with only a logger warning by default,
or set fail_flag=True to raise an error. This should also work for checking a solve outside
of an initialization routine.
Keyword Arguments:
results : solver results
checkpoint : Optional string argument to specify the step of initialization being checked
(e.g., checkpoint="Initialization step 1: solve indexed blocks")
logger : Optional argument for loading idaes.getInitLogger object (e.g., logger=init_log)
fail_flag : Boolean argument to specify error or warning (Default: fail_flag=False produces logger warning.
set fail_flag=True to raise an error and stop the initialization routine.)
Returns:
None
"""
if check_optimal_termination(results):
if checkpoint is None:
logger.info(f"Solve successful.")
else:
logger.info(f"{checkpoint} successful.")
else:
if checkpoint is None:
msg = (
f"The solver failed to converge to an optimal solution. "
f"This suggests that the user provided infeasible inputs or that the model is poorly scaled."
)
else:
msg = (
f"{checkpoint} failed. The solver failed to converge to an optimal solution. "
f"This suggests that the user provided infeasible inputs or that the model is poorly scaled."
)
if fail_flag:
logger.error(msg)
raise ValueError(msg)
else:
logger.warning(msg)
[docs]def check_dof(blk, fail_flag=False, logger=_log, expected_dof=0):
"""
Check that degrees of freedom are 0, or the expected amount ``expected_dof``.
If not 0 or ``expected_dof``, either throw a warning and continue or throw an error and stop.
Keyword Arguments:
blk : block to check
fail_flag : Boolean argument to specify error or warning
(Default: fail_flag=False produces logger warning. Set fail_flag=True to raise an error and stop
the initialization routine.)
logger : Optional argument for loading idaes.getInitLogger object (e.g., logger=init_log)
expected_dof : Integer number of degrees of freedom ``blk`` should have
Returns:
None
"""
if degrees_of_freedom(blk) != expected_dof:
if expected_dof == 0:
msg = (
f"Non-zero degrees of freedom: Degrees of freedom on {blk} = {degrees_of_freedom(blk)}. "
f"Fix {degrees_of_freedom(blk)} more variable(s)"
)
elif degrees_of_freedom(blk) < expected_dof:
msg = (
f"Unexpected degrees of freedom: Degrees of freedom on {blk} = {degrees_of_freedom(blk)}. "
f"Expected {expected_dof}. Unfix {expected_dof - degrees_of_freedom(blk)} variable(s)"
)
elif degrees_of_freedom(blk) > expected_dof:
msg = (
f"Unexpected degrees of freedom: Degrees of freedom on {blk} = {degrees_of_freedom(blk)}. "
f"Expected {expected_dof}. Fix {degrees_of_freedom(blk) - expected_dof} variable(s)"
)
if fail_flag:
logger.error(msg)
raise ValueError(msg)
else:
logger.warning(msg)
[docs]def assert_degrees_of_freedom(blk, expected_dof):
"""
Assert that degrees of freedom are ``expected_dof``.
If not ``expected_dof``, throw an error and stop.
Keyword Arguments:
blk : block to check
expected_dof : Integer number of degrees of freedom ``blk`` should have
Returns:
None
"""
check_dof(blk, True, expected_dof=expected_dof)
[docs]def assert_no_degrees_of_freedom(blk):
"""
Assert that degrees of freedom are 0.
If ``blk`` has non-zero degrees of freedom, throw an error and stop.
Keyword Arguments:
blk : block to check
Returns:
None
"""
check_dof(blk, True)
[docs]def assert_no_initialization_perturbation(blk, optarg=None, solver=None):
"""
Assert that IPOPT will *not* move the initialization
Args:
blk: Pyomo block
optarg: IPOPT options (default=None) (Should be None if solver is specified)
solver: Pyomo IPOPT solver instance (default=None) (Should be None if optarg is specified).
Returns:
None
"""
if solver is not None and optarg is not None:
raise ValueError("Supply a solver or optarg, not both")
if optarg is None:
optarg = {}
if solver is None:
solver = get_solver(options=optarg)
if solver.name not in (
"ipopt",
"ipopt-watertap",
):
raise ValueError(f"Solver {solver.name} is not supported")
options = solver.options
bound_push = options.get("bound_push", 1e-2)
bound_frac = options.get("bound_frac", 1e-2)
bound_relax_factor = options.get("bound_relax_factor", 1e-8)
if solver.name == "ipopt-watertap":
user_scaling = True
else:
user_scaling = (
options.get("nlp_scaling_method", "gradient-based") == "user-scaling"
)
for v, val, result in generate_initialization_perturbation(
blk, bound_push, bound_frac, bound_relax_factor, user_scaling
):
raise ValueError(
f"IPOPT will move scaled initial value for variable {v.name} from {val:e} to {result:e}"
)
[docs]def print_initialization_perturbation(
blk, bound_push=1e-2, bound_frac=1e-2, bound_relax_factor=1e-8, user_scaling=False
):
"""
Print the initialization perturbations performed by IPOPT for a given Block
Args:
blk: Pyomo block
bound_push: bound_push to evaluate (same as IPOPT option) (default=1e-2)
bound_frac: bound_frac to evaluate (same as IPOPT option) (default=1e-2)
bound_relax_factor: bound_relax_factor to evaluate (same as IPOPT option) (default=1e-8)
user_scaling: If True, the variables are scaled as if `nlp_scaling_method = user-scaling`
is used. (default=False)
Returns:
None
"""
for v, val, result in generate_initialization_perturbation(
blk, bound_push, bound_frac, bound_relax_factor, user_scaling
):
print(
f"IPOPT will move scaled initial value for variable {v.name} from {val:e} to {result:e}"
)
[docs]def generate_initialization_perturbation(
blk, bound_push=1e-2, bound_frac=1e-2, bound_relax_factor=1e-8, user_scaling=False
):
"""
Generate the initialization perturbations performed by IPOPT for a given Block
Args:
blk: Pyomo block
bound_push: bound_push to evaluate (same as IPOPT option) (default=1e-2)
bound_frac: bound_frac to evaluate (same as IPOPT option) (default=1e-2)
bound_relax_factor: bound_relax_factor to evaluate (same as IPOPT option) (default=1e-8)
user_scaling: If True, the variables are scaled as if `nlp_scaling_method = user-scaling`
is used. (default=False)
Yields:
tuple: (pyo.Var object, current_value, perturbed_value)
"""
kappa1 = bound_push
kappa2 = bound_frac
for v in blk.component_data_objects(Var):
if v.value is None:
_log.warning(f"Variable {v.name} has no initial value")
continue
if v.fixed:
continue
if user_scaling:
sf = get_scaling_factor(v, default=1.0)
else:
sf = 1.0
v_lb = __none_left_mult(v.lb, sf)
if v_lb is not None:
v_lb -= bound_relax_factor * max(1, abs(v_lb))
v_value = v.value * sf
v_ub = __none_left_mult(v.ub, sf)
if v_ub is not None:
v_ub += bound_relax_factor * max(1, abs(v_ub))
if v_lb is not None:
if v_ub is not None:
pl = min(kappa1 * max(1, abs(v_lb)), kappa2 * (v_ub - v_lb))
else:
pl = kappa1 * max(1, abs(v_lb))
if v_value < v_lb + pl:
yield (v, v_value, v_lb + pl)
if v_ub is not None:
if v_lb is not None:
pu = min(kappa1 * max(1, abs(v_ub)), kappa2 * (v_ub - v_lb))
else:
pu = kappa1 * max(1, abs(v_ub))
if v_value > v_ub - pu:
yield (v, v_value, v_ub - pu)