Source code for watertap.tools.parameter_sweep.paramter_sweep_parallel_utils

import copy
import numpy as np


class _ParameterSweepParallelUtils:
    @classmethod
    def remove_unpicklable_state(cls, parameter_sweep_instance):
        """
        Remove and return any state from the ParameterSweep object that cannot be
        pickled, to make the instance picklable. Needed in order to use the
        ConcurrentFuturesParallelManager.
        """
        saved_state = {
            "parallel_manager": parameter_sweep_instance.parallel_manager,
            "writer": parameter_sweep_instance.writer,
        }

        parameter_sweep_instance.parallel_manager = None
        parameter_sweep_instance.writer = None
        return saved_state

    @classmethod
    def restore_unpicklable_state(cls, parameter_sweep_instance, state):
        """
        Restore a collection of saved state that was removed in order to pickle
        the ParameterSweep object.
        """
        parameter_sweep_instance.parallel_manager = state.get("parallel_manager", None)
        parameter_sweep_instance.writer = state.get("writer", None)

    """
    Combine all of the results retrieved from calling gather().
    - all_results is a list of Result objects, each representing the 
    parameters and results from running the optimization on one process. 
    """

    def _combine_gather_results(self, all_results):
        if len(all_results) == 0:
            return None

        # create the output skeleton based on the first set of results
        # we assume the results are in dict format
        initial_results = all_results[0].results

        combined_results = copy.deepcopy(initial_results)

        # remove any lingering pyomo objects, and convert inner results to numpy arrays
        for key, val in combined_results.items():
            if key != "solve_successful":
                if isinstance(val, dict):
                    for subkey, subval in val.items():
                        if "_pyo_obj" in subval:
                            del subval["_pyo_obj"]

        # for each result, concat the "value" array of results into the
        # gathered results to combine them all

        # get length of data in first result for finding missing keys
        total_chunk_length = len(all_results[0].results["solve_successful"])

        for i, result in enumerate(all_results[1:]):
            results = result.results

            for key, val in results.items():
                if key == "solve_successful":
                    combined_results[key] = np.append(
                        combined_results[key], copy.deepcopy(val)
                    )
                    continue
                if key == "nominal_idx" or key == "differential_idx":
                    combined_results[key] = np.append(
                        combined_results[key], copy.deepcopy(val)
                    )
                    continue
                # print("vall all results!", key, val)
                for subkey, subval in val.items():
                    # lets catch any keys that don' exist in result[0] and
                    # create empty array with expected length, after which we will add
                    # additional values, or add nan's instead
                    if subkey not in combined_results[key]:
                        # create empty array, as none of results so far had this key\

                        combined_results[key][subkey] = {}
                        for sub_subkey, value in subval.items():
                            if sub_subkey == "value":
                                combined_results[key][subkey]["value"] = (
                                    np.zeros(total_chunk_length) * np.nan
                                )
                            else:
                                combined_results[key][subkey][sub_subkey] = value
                    combined_results[key][subkey]["value"] = np.append(
                        combined_results[key][subkey]["value"],
                        copy.deepcopy(
                            subval["value"],
                        ),
                    )
                    # keep track of our subchunk_length
                    sub_chunk_length = len(subval["value"])

                # make sure we add any empty value to missing keys

                for subkey in combined_results[key]:
                    if subkey not in val.keys():
                        empty_chunk = np.zeros(sub_chunk_length) * np.nan
                        combined_results[key][subkey]["value"] = np.append(
                            combined_results[key][subkey]["value"], empty_chunk
                        )
            total_chunk_length += sub_chunk_length
        return combined_results

    """
    Build up a list of the outputs for each result of the optimization.
    Returned as a list of lists, where each inner list is the results from
    one process's run.
    """

    def _combine_output_array(self, gathered_results):
        outputs = gathered_results["outputs"]
        if len(outputs) == 0:
            return []

        # assume all output arrays have the same length
        combined_outputs = [
            np.asarray([]) for _ in range(len(list(outputs.values())[0]["value"]))
        ]
        for _, output in outputs.items():
            for i in range(len(output["value"])):
                combined_outputs[i] = np.append(combined_outputs[i], output["value"][i])
        return np.asarray(combined_outputs)

    """
    Build up a list of the sweep_inputs for each result of the optimization.
    Returned as a list of lists, where each inner list is the results from
    one process's run.
    """

    def _combine_input_array(self, gathered_results):
        inputs = gathered_results["sweep_params"]
        if len(inputs) == 0:
            return []

        # assume all output arrays have the same length
        combined_inputs = [
            np.asarray([]) for _ in range(len(list(inputs.values())[0]["value"]))
        ]
        for _, inputv in inputs.items():
            for i in range(len(inputv["value"])):
                combined_inputs[i] = np.append(combined_inputs[i], inputv["value"][i])
        return np.asarray(combined_inputs)

    """
    Use the embedded ParallelManager to fan out and then back in the results.
    Args:
    - build_model: a function for building the flowsheet model
    - build_model_kwargs: any keyword args necessary for the build_model function
    - build_sweep_params: a function for building the sweep parameters
    - build_sweep_params_kwargs: any keyword args necessary for the build_sweep_params
    function
    - build_outputs: a function for building the outputs dictionary
    - all_parameter_combinations: a list where each element represents the parameters
    for a single local run
    Returns:
    - a list of LocalResults representing the results of the simulation runs 
    """

    def run_scatter_gather(self, all_parameter_combinations, class_reference):
        # save a reference to the parallel manager since it will be removed
        # along with the other unpicklable state
        parallel_manager = self.parallel_manager
        saved_state = class_reference.remove_unpicklable_state(self)

        do_build_kwargs = {"param_sweep_instance": self}

        parallel_manager.scatter(
            do_build,
            do_build_kwargs,
            do_execute,
            all_parameter_combinations,
        )

        # gather the results and combine them into the format we want
        all_results = parallel_manager.gather()
        class_reference.restore_unpicklable_state(self, saved_state)

        return all_results


[docs]def do_build( param_sweep_instance, ): """ Used to pass into the parallel manager to build the parameters necessary for the sweep function. Defined at the top level so it's picklable. """ ps_config = param_sweep_instance.config model = ps_config.build_model(**ps_config.build_model_kwargs) sweep_params = ps_config.build_sweep_params( model, **ps_config.build_sweep_params_kwargs ) sweep_params, sampling_type = param_sweep_instance._process_sweep_params( sweep_params ) outputs = ps_config.build_outputs(model, **ps_config.build_outputs_kwargs) # for when differential parameter tool is used if hasattr(param_sweep_instance, "_define_differential_sweep_outputs"): param_sweep_instance._define_differential_sweep_outputs(model, sweep_params) if outputs is not None: param_sweep_instance.assign_variable_names(model, outputs) return [param_sweep_instance, model, sweep_params, outputs]
[docs]def do_execute( local_combo_array, param_sweep_instance, model, sweep_params, outputs, ): """ Used to pass into the parallel manager in order to execute the sweep for a set of local values. Defined at the top level so it's picklable. """ if param_sweep_instance.config.custom_do_param_sweep is not None: return param_sweep_instance.config.custom_do_param_sweep( param_sweep_instance, sweep_params, outputs, local_combo_array ) return param_sweep_instance._do_param_sweep( sweep_params, outputs, local_combo_array )
[docs]def return_none(model, outputkeys=None): """ Used so that build_outputs=None is a valid usage of the parameter sweep tool without requiring the user to wrap it in a function. """ return None