Source code for watertap.tools.parameter_sweep.parameter_sweep_differential

import numpy as np
from pyomo.common.config import ConfigValue

from watertap.tools.parameter_sweep.sampling_types import NormalSample
from watertap.tools.parameter_sweep.parameter_sweep import (
    _ParameterSweepBase,
    ParameterSweep,
)
from watertap.tools.MPI.dummy_mpi import DummyCOMM


[docs]class DifferentialParameterSweep(_ParameterSweepBase): CONFIG = _ParameterSweepBase.CONFIG() CONFIG.declare( "num_diff_samples", ConfigValue( default=1, domain=int, description="Number of differntial sweep samples", ), ) CONFIG.declare( "guarantee_solves", ConfigValue( default=False, domain=bool, description="Guarantee a pre-specified number of solves.", ), ) CONFIG.declare( "differential_sweep_specs", ConfigValue( default=dict(), domain=dict, description="Dictionary containing the specifications for the differential sweep", doc=""" A specification dictionary that contains details for how to construct the parameter sweep dictionary for differential sweep. This is a nested dictionary where the first level denotes the variable names for which the differential sweep needs to be carried out. The second level denotes various options to be used for wach variable. The number of samples for each differential sweep is specified while initializing the DifferentialParameterSweep object wsing the keyword `num_diff_samples` e.g. { "fs.a": { "diff_mode": "sum", "diff_sample_type": NormalSample, "std_dev": 0.01, "pyomo_object": m.fs.input["a"], }, "fs.b": { "diff_mode": "product", "diff_sample_type": UniformSample, "relative_lb": 0.01, "relative_ub": 0.01, "pyomo_object": m.fs.input["b"], }, "fs.c": { "diff_mode": "sum", "diff_sample_type": GeomSample, "relative_lb": 0.01, "relative_ub": 10.0, "pyomo_object": m.fs.input["c"], }, } """, ), )
[docs] def __init__( self, **options, ): # Initialize the base Class super().__init__(**options) if self.config.guarantee_solves: raise NotImplementedError
def _create_differential_sweep_params(self, local_values): differential_sweep_specs = self.config.differential_sweep_specs diff_sweep_param = {} for ctr, (param, specs) in enumerate(differential_sweep_specs.items()): nominal_val = local_values[self.diff_spec_index[ctr]] pyomo_object = specs["pyomo_object"] if specs["diff_sample_type"] == NormalSample: std_dev = specs["std_dev"] diff_sweep_param[param] = NormalSample( pyomo_object, nominal_val, std_dev, self.config.num_diff_samples ) else: relative_lb = specs["relative_lb"] relative_ub = specs["relative_ub"] if specs["diff_mode"] == "sum": lb = nominal_val * (1 - relative_lb) ub = nominal_val * (1 + relative_ub) elif specs["diff_mode"] == "product": lb = nominal_val * relative_lb ub = nominal_val * relative_ub elif specs["diff_mode"] == "percentile": lower_nominal = specs["nominal_lb"] upper_nominal = specs["nominal_ub"] delta_nominal = abs(upper_nominal - lower_nominal) lb = nominal_val + delta_nominal * relative_lb ub = nominal_val + delta_nominal * relative_ub else: raise NotImplementedError diff_sweep_param[param] = specs["diff_sample_type"]( pyomo_object, lb, ub, self.config.num_diff_samples ) return diff_sweep_param def _check_differential_sweep_key_validity(self, sweep_params): diff_specs_keys = list(self.config.differential_sweep_specs.keys()) sweep_param_keys = list(sweep_params.keys()) if all(key in sweep_param_keys for key in diff_specs_keys): self.diff_spec_index = [ sweep_param_keys.index(key) for key in diff_specs_keys ] else: raise ValueError( "differential_sweep_specs keys don't match with sweep_param keys" ) def _define_differential_sweep_outputs(self, sweep_params): self.differential_outputs = self.outputs if self.outputs is not None: for key in sweep_params.keys(): if key not in self.config.differential_sweep_specs.keys(): self.differential_outputs[key] = sweep_params[key].pyomo_object def _append_differential_results(self, local_output_dict, diff_results_dict): for idx, diff_sol in diff_results_dict.items(): for key, item in diff_sol.items(): # Solve status if key == "solve_successful": local_output_dict["solve_successful"].extend(item) else: for subkey, subitem in item.items(): if subkey in local_output_dict["sweep_params"].keys(): local_output_dict["sweep_params"][subkey][ "value" ] = np.concatenate( ( local_output_dict["sweep_params"][subkey]["value"], subitem["value"], ) ) else: local_output_dict[key][subkey]["value"] = np.concatenate( ( local_output_dict[key][subkey]["value"], subitem["value"], ) ) def _collect_local_inputs(self, local_results_dict): num_local_samples = len(local_results_dict["solve_successful"]) local_inputs = np.zeros( (num_local_samples, len(local_results_dict["sweep_params"])), dtype=float, ) for i, (key, item) in enumerate(local_results_dict["sweep_params"].items()): local_inputs[:, i] = item["value"] return local_inputs def _aggregate_input_arr(self, global_results_dict, num_global_samples): global_values = np.zeros( (num_global_samples, len(global_results_dict["sweep_params"])), dtype=float, ) if self.rank == 0: for i, (key, item) in enumerate( global_results_dict["sweep_params"].items() ): global_values[:, i] = item["value"] self.comm.Bcast(global_values, root=0) return global_values def _aggregate_results(self, local_output_dict): # Create the global results dictionary global_results_dict = self._create_global_output(local_output_dict) # Broadcast the number of global samples to all ranks num_global_samples = len(global_results_dict["solve_successful"]) num_global_samples = self.comm.bcast(num_global_samples, root=0) global_results_arr = self._aggregate_results_arr( global_results_dict, num_global_samples ) global_input_values = self._aggregate_input_arr( global_results_dict, num_global_samples ) return ( global_results_dict, global_results_arr, global_input_values, num_global_samples, ) def _run_differential_sweep(self, model, local_value): diff_sweep_param_dict = self._create_differential_sweep_params(local_value) # We want this instance of the parameter sweep to run in serial diff_ps = ParameterSweep( optimize_function=self.config.optimize_function, optimize_kwargs=self.config.optimize_kwargs, reinitialize_function=self.config.reinitialize_function, reinitialize_kwargs=self.config.reinitialize_kwargs, reinitialize_before_sweep=self.config.reinitialize_before_sweep, comm=DummyCOMM, ) _, differential_sweep_output_dict = diff_ps.parameter_sweep( model, diff_sweep_param_dict, outputs=self.differential_outputs, num_samples=self.config.num_diff_samples, seed=self.seed, ) return differential_sweep_output_dict def _run_sample( self, model, reinitialize_values, local_value_k, k, sweep_params, local_output_dict, ): run_successful = super()._run_sample( model, reinitialize_values, local_value_k, k, sweep_params, local_output_dict, ) self.differential_sweep_output_dict[k] = self._run_differential_sweep( model, local_value_k ) return run_successful def _do_param_sweep(self, model, sweep_params, outputs, local_values): self.differential_sweep_output_dict = {} local_output_dict = super()._do_param_sweep( model, sweep_params, outputs, local_values ) # Now append the outputs of the differential solves self._append_differential_results( local_output_dict, self.differential_sweep_output_dict ) return local_output_dict def parameter_sweep( self, model, sweep_params, outputs=None, num_samples=None, seed=None, ): # Create a base sweep_params sweep_params, sampling_type = self._process_sweep_params(sweep_params) # Check if the keys in the differential sweep specs exist in sweep params self._check_differential_sweep_key_validity(sweep_params) # Define differential sweep outputs self.outputs = outputs self._define_differential_sweep_outputs(sweep_params) # Set the seed before sampling self.seed = seed np.random.seed(self.seed) # Enumerate/Sample the parameter space global_values = self._build_combinations( sweep_params, sampling_type, num_samples ) # divide the workload between processors local_values = self._divide_combinations(global_values) # Create a dictionary to store all the differential ps_objects self.diff_ps_dict = {} # Do the Loop local_results_dict = self._do_param_sweep( model, sweep_params, outputs, local_values, ) # re-writing local_values local_values = self._collect_local_inputs(local_results_dict) # Aggregate results on Master ( global_results_dict, global_results_arr, global_input_arr, num_global_samples, ) = self._aggregate_results(local_results_dict) # Save to file global_save_data = self.writer.save_results( sweep_params, local_values, global_input_arr, local_results_dict, global_results_dict, global_results_arr, ) return global_results_dict, global_save_data