#################################################################################
# WaterTAP Copyright (c) 2020-2024, The Regents of the University of California,
# through Lawrence Berkeley National Laboratory, Oak Ridge National Laboratory,
# National Renewable Energy Laboratory, and National Energy Technology
# Laboratory (subject to receipt of any required approvals from the U.S. Dept.
# of Energy). All rights reserved.
#
# Please see the files COPYRIGHT.md and LICENSE.md for full copyright and license
# information, respectively. These files are also available online at the URL
# "https://github.com/watertap-org/watertap/"
#################################################################################
from abc import abstractmethod, ABC
[docs]class ParallelManager(ABC):
ROOT_PROCESS_RANK = 0
[docs] @abstractmethod
def is_root_process(self):
"""
Return whether the current process should be considered as the root of the parallel group it
is a part of.
"""
raise NotImplementedError
[docs] @abstractmethod
def get_rank(self):
"""
Get the process's rank number in its parallel peer group.
"""
raise NotImplementedError
[docs] @abstractmethod
def number_of_worker_processes(self):
"""
Return how many total processes are running local simulations.
"""
raise NotImplementedError
[docs] @abstractmethod
def sync_with_peers(self):
"""
Implement a synchronization point with any peer processes.
"""
raise NotImplementedError
[docs] @abstractmethod
def sync_array_with_peers(self, data):
"""
Synchronize an array with all peers processes. The data parameter is either:
- (if root) the source of truth that will be broadcast to all processes
- (if not root) an empty buffer that will hold the data sent from root once the function returns
"""
raise NotImplementedError
[docs] @abstractmethod
def sync_pyobject_with_peers(self, obj):
"""
Synchronize a python object with all peer processes. The obj parameter is either:
- (if root) the source of truth that will be broadcast to all processes
- (if not root) ignored
Different from sync_array_with_peers in that it returns the synced object rather than
using an existing buffer.
"""
raise NotImplementedError
[docs] @abstractmethod
def combine_data_with_peers(self, data):
"""
Combine the data from each peer into a list. The return value will be a list of all
the data elements provided by each process that calls this function.
With multiple processes, this must:
- act as a synchronization point
- return the list in the same order on each process
"""
raise NotImplementedError
[docs] @abstractmethod
def gather_arrays_to_root(self, sendbuf, recvbuf_spec):
"""
Gather the data in the send buffer to the root process. Parameters are:
- sendbuf: the data to be sent from this process
- recvbuf_spec: a tuple of (receive buffer, list of sizes) where the list of sizes is
how much data will come from each process. Ignored if not the root process.
"""
raise NotImplementedError
[docs] @abstractmethod
def sum_values_and_sync(self, sendbuf, recvbuf):
"""
Sum a list of values from each process and sync the result to all processes. Parameters:
- sendbuf: an array of values the local process is contributing to the sum
- recvbuf: an array of a single value that will hold the summed result (same on
each process) when the function returns.
Note: this is a specific case of a global reduce (with sum() as the reducing function).
If more than sum() is needed in the future, this function should be extended to receive
an Op argument.
"""
raise NotImplementedError
[docs] @abstractmethod
def scatter(
self,
do_build,
do_build_kwargs,
do_execute,
all_parameters,
):
"""
Scatter the specified execution out, as defined by the implementation's parallelism, for
a list of parameters.
Args:
- do_build: a function that builds the arguments necessary for the execution function. expected to
return a list that will be exploded and passed into the do_execute function as arguments.
- do_build_kwargs: a dictionary of keyword arguments for the do_build function
- do_execute: the execution function. expected to take in the list of local parameters
as its first argument. any arguments after that should match the list returned by the
do_build function.
- all_parameters: a list of all parameters to be run. included so that
different implementations of the parallel manager can make decisions about splitting and
parallelization.
"""
raise NotImplementedError
[docs] @abstractmethod
def gather(self):
"""
Gather the results of the computation that was kicked off via a previous scatter.
Returns:
- a list of LocalResults, representing the results for each process. each result will
be the return value of the do_execute function from scatter() for one process.
"""
raise NotImplementedError
[docs] @abstractmethod
def results_from_local_tree(self, results):
"""
Given a list of LocalResults objects, return a sublist of the ones the current process
is responsible for.
"""
raise NotImplementedError
# TODO this probably should be owned by parameer sweep as its PS specific
[docs]def build_and_execute(do_build, do_build_kwargs, do_execute, local_parameters):
"""
Entrypoint for implementations of the parallel manager to use for running the
build and execute functions. Defined at the top level so that it's picklable.
For a description of the first three arguments, see the scatter() function above.
The fourth argument is the list of local parameters that should be run by this process.
"""
pa = parallelActor(do_build, do_build_kwargs, do_execute, local_parameters)
results = pa.execute(local_parameters)
# execute_args = do_build(**do_build_kwargs)
# results = do_execute(local_parameters, *execute_args)
return results
# TODO this probably should be owned by parameer sweep as its PS specific
class parallelActor:
def __init__(self, do_build, do_build_kwargs, do_execute, local_parameters):
self.do_build = do_build
self.do_build_kwargs = do_build_kwargs
self.do_execute = do_execute
self.local_parameters = local_parameters
self.build_model()
def build_model(self):
self.exec_args = self.do_build(**self.do_build_kwargs)
def execute(self, local_parameters):
result = self.do_execute(local_parameters, *self.exec_args)
return result