watertap.tools.parallel package
Submodules
watertap.tools.parallel.concurrent_futures_parallel_manager module
- class watertap.tools.parallel.concurrent_futures_parallel_manager.ConcurrentFuturesParallelManager(number_of_subprocesses=1, **kwargs)[source]
Bases:
ParallelManager
- combine_data_with_peers(data)[source]
Combine the data from each peer into a list. The return value will be a list of all the data elements provided by each process that calls this function.
With multiple processes, this must: - act as a synchronization point - return the list in the same order on each process
- gather()[source]
Gather the results of the computation that was kicked off via a previous scatter. Returns: - a list of LocalResults, representing the results for each process. each result will be the return value of the do_execute function from scatter() for one process.
- gather_arrays_to_root(sendbuf, recvbuf_spec)[source]
Gather the data in the send buffer to the root process. Parameters are: - sendbuf: the data to be sent from this process - recvbuf_spec: a tuple of (receive buffer, list of sizes) where the list of sizes is how much data will come from each process. Ignored if not the root process.
- is_root_process()[source]
Return whether the current process should be considered as the root of the parallel group it is a part of.
- number_of_worker_processes()[source]
Return how many total processes are running local simulations.
- results_from_local_tree(results)[source]
Given a list of LocalResults objects, return a sublist of the ones the current process is responsible for.
- scatter(do_build, do_build_kwargs, do_execute, all_parameters)[source]
Scatter the specified execution out, as defined by the implementation’s parallelism, for a list of parameters. Args: - do_build: a function that builds the arguments necessary for the execution function. expected to return a list that will be exploded and passed into the do_execute function as arguments. - do_build_kwargs: a dictionary of keyword arguments for the do_build function - do_execute: the execution function. expected to take in the list of local parameters as its first argument. any arguments after that should match the list returned by the do_build function. - all_parameters: a list of all parameters to be run. included so that different implementations of the parallel manager can make decisions about splitting and parallelization.
- sum_values_and_sync(sendbuf, recvbuf)[source]
Sum a list of values from each process and sync the result to all processes. Parameters: - sendbuf: an array of values the local process is contributing to the sum - recvbuf: an array of a single value that will hold the summed result (same on each process) when the function returns.
Note: this is a specific case of a global reduce (with sum() as the reducing function). If more than sum() is needed in the future, this function should be extended to receive an Op argument.
- sync_array_with_peers(data)[source]
Synchronize an array with all peers processes. The data parameter is either: - (if root) the source of truth that will be broadcast to all processes - (if not root) an empty buffer that will hold the data sent from root once the function returns
- sync_pyobject_with_peers(obj)[source]
Synchronize a python object with all peer processes. The obj parameter is either: - (if root) the source of truth that will be broadcast to all processes - (if not root) ignored
Different from sync_array_with_peers in that it returns the synced object rather than using an existing buffer.
watertap.tools.parallel.mpi_parallel_manager module
- class watertap.tools.parallel.mpi_parallel_manager.MPIParallelManager(MPI, **kwargs)[source]
Bases:
ParallelManager
- combine_data_with_peers(data)[source]
Combine the data from each peer into a list. The return value will be a list of all the data elements provided by each process that calls this function.
With multiple processes, this must: - act as a synchronization point - return the list in the same order on each process
- gather()[source]
Gather the results of the computation that was kicked off via a previous scatter. Returns: - a list of LocalResults, representing the results for each process. each result will be the return value of the do_execute function from scatter() for one process.
- gather_arrays_to_root(sendbuf, recvbuf_spec)[source]
Gather the data in the send buffer to the root process. Parameters are: - sendbuf: the data to be sent from this process - recvbuf_spec: a tuple of (receive buffer, list of sizes) where the list of sizes is how much data will come from each process. Ignored if not the root process.
- is_root_process()[source]
Return whether the current process should be considered as the root of the parallel group it is a part of.
- number_of_worker_processes()[source]
Return how many total processes are running local simulations.
- results_from_local_tree(results)[source]
Given a list of LocalResults objects, return a sublist of the ones the current process is responsible for.
- scatter(do_build, do_build_kwargs, do_execute, all_parameters)[source]
Scatter the specified execution out, as defined by the implementation’s parallelism, for a list of parameters. Args: - do_build: a function that builds the arguments necessary for the execution function. expected to return a list that will be exploded and passed into the do_execute function as arguments. - do_build_kwargs: a dictionary of keyword arguments for the do_build function - do_execute: the execution function. expected to take in the list of local parameters as its first argument. any arguments after that should match the list returned by the do_build function. - all_parameters: a list of all parameters to be run. included so that different implementations of the parallel manager can make decisions about splitting and parallelization.
- sum_values_and_sync(sendbuf, recvbuf)[source]
Sum a list of values from each process and sync the result to all processes. Parameters: - sendbuf: an array of values the local process is contributing to the sum - recvbuf: an array of a single value that will hold the summed result (same on each process) when the function returns.
Note: this is a specific case of a global reduce (with sum() as the reducing function). If more than sum() is needed in the future, this function should be extended to receive an Op argument.
- sync_array_with_peers(data)[source]
Broadcast the array to all processes. this call acts as a synchronization point when run by multiple peer mpi processes.
watertap.tools.parallel.multiprocessing_parallel_manager module
- class watertap.tools.parallel.multiprocessing_parallel_manager.MultiprocessingParallelManager(number_of_subprocesses=1, **kwargs)[source]
Bases:
ParallelManager
- combine_data_with_peers(data)[source]
Combine the data from each peer into a list. The return value will be a list of all the data elements provided by each process that calls this function.
With multiple processes, this must: - act as a synchronization point - return the list in the same order on each process
- gather()[source]
Gather the results of the computation that was kicked off via a previous scatter. Returns: - a list of LocalResults, representing the results for each process. each result will be the return value of the do_execute function from scatter() for one process.
- gather_arrays_to_root(sendbuf, recvbuf_spec)[source]
Gather the data in the send buffer to the root process. Parameters are: - sendbuf: the data to be sent from this process - recvbuf_spec: a tuple of (receive buffer, list of sizes) where the list of sizes is how much data will come from each process. Ignored if not the root process.
- is_root_process()[source]
Return whether the current process should be considered as the root of the parallel group it is a part of.
- number_of_worker_processes()[source]
Return how many total processes are running local simulations.
- results_from_local_tree(results)[source]
Given a list of LocalResults objects, return a sublist of the ones the current process is responsible for.
- scatter(do_build, do_build_kwargs, do_execute, all_parameters)[source]
Scatter the specified execution out, as defined by the implementation’s parallelism, for a list of parameters. Args: - do_build: a function that builds the arguments necessary for the execution function. expected to return a list that will be exploded and passed into the do_execute function as arguments. - do_build_kwargs: a dictionary of keyword arguments for the do_build function - do_execute: the execution function. expected to take in the list of local parameters as its first argument. any arguments after that should match the list returned by the do_build function. - all_parameters: a list of all parameters to be run. included so that different implementations of the parallel manager can make decisions about splitting and parallelization.
- sum_values_and_sync(sendbuf, recvbuf)[source]
Sum a list of values from each process and sync the result to all processes. Parameters: - sendbuf: an array of values the local process is contributing to the sum - recvbuf: an array of a single value that will hold the summed result (same on each process) when the function returns.
Note: this is a specific case of a global reduce (with sum() as the reducing function). If more than sum() is needed in the future, this function should be extended to receive an Op argument.
- sync_array_with_peers(data)[source]
Synchronize an array with all peers processes. The data parameter is either: - (if root) the source of truth that will be broadcast to all processes - (if not root) an empty buffer that will hold the data sent from root once the function returns
- sync_pyobject_with_peers(obj)[source]
Synchronize a python object with all peer processes. The obj parameter is either: - (if root) the source of truth that will be broadcast to all processes - (if not root) ignored
Different from sync_array_with_peers in that it returns the synced object rather than using an existing buffer.
watertap.tools.parallel.parallel_manager module
- class watertap.tools.parallel.parallel_manager.ParallelManager[source]
Bases:
ABC
- abstract combine_data_with_peers(data)[source]
Combine the data from each peer into a list. The return value will be a list of all the data elements provided by each process that calls this function.
With multiple processes, this must: - act as a synchronization point - return the list in the same order on each process
- abstract gather()[source]
Gather the results of the computation that was kicked off via a previous scatter. Returns: - a list of LocalResults, representing the results for each process. each result will be the return value of the do_execute function from scatter() for one process.
- abstract gather_arrays_to_root(sendbuf, recvbuf_spec)[source]
Gather the data in the send buffer to the root process. Parameters are: - sendbuf: the data to be sent from this process - recvbuf_spec: a tuple of (receive buffer, list of sizes) where the list of sizes is how much data will come from each process. Ignored if not the root process.
- abstract is_root_process()[source]
Return whether the current process should be considered as the root of the parallel group it is a part of.
- abstract number_of_worker_processes()[source]
Return how many total processes are running local simulations.
- abstract results_from_local_tree(results)[source]
Given a list of LocalResults objects, return a sublist of the ones the current process is responsible for.
- abstract scatter(do_build, do_build_kwargs, do_execute, all_parameters)[source]
Scatter the specified execution out, as defined by the implementation’s parallelism, for a list of parameters. Args: - do_build: a function that builds the arguments necessary for the execution function. expected to return a list that will be exploded and passed into the do_execute function as arguments. - do_build_kwargs: a dictionary of keyword arguments for the do_build function - do_execute: the execution function. expected to take in the list of local parameters as its first argument. any arguments after that should match the list returned by the do_build function. - all_parameters: a list of all parameters to be run. included so that different implementations of the parallel manager can make decisions about splitting and parallelization.
- abstract sum_values_and_sync(sendbuf, recvbuf)[source]
Sum a list of values from each process and sync the result to all processes. Parameters: - sendbuf: an array of values the local process is contributing to the sum - recvbuf: an array of a single value that will hold the summed result (same on each process) when the function returns.
Note: this is a specific case of a global reduce (with sum() as the reducing function). If more than sum() is needed in the future, this function should be extended to receive an Op argument.
- abstract sync_array_with_peers(data)[source]
Synchronize an array with all peers processes. The data parameter is either: - (if root) the source of truth that will be broadcast to all processes - (if not root) an empty buffer that will hold the data sent from root once the function returns
- abstract sync_pyobject_with_peers(obj)[source]
Synchronize a python object with all peer processes. The obj parameter is either: - (if root) the source of truth that will be broadcast to all processes - (if not root) ignored
Different from sync_array_with_peers in that it returns the synced object rather than using an existing buffer.
- watertap.tools.parallel.parallel_manager.build_and_execute(do_build, do_build_kwargs, do_execute, local_parameters)[source]
Entrypoint for implementations of the parallel manager to use for running the build and execute functions. Defined at the top level so that it’s picklable.
For a description of the first three arguments, see the scatter() function above. The fourth argument is the list of local parameters that should be run by this process.
watertap.tools.parallel.parallel_manager_factory module
- watertap.tools.parallel.parallel_manager_factory.create_parallel_manager(parallel_manager_class=None, **kwargs)[source]
Create and return an instance of a ParallelManager, based on the libraries available in the runtime environment.
Allows an optional python class to be passed in as parallel_manager_class. If so, this class is instantiated and returned rather than checking the local environment.
- watertap.tools.parallel.parallel_manager_factory.get_mpi_comm_process()[source]
Returns mpi comm world
watertap.tools.parallel.ray_io_parallel_manager module
- class watertap.tools.parallel.ray_io_parallel_manager.RayIoParallelManager(number_of_subprocesses=1, **kwargs)[source]
Bases:
ParallelManager
- combine_data_with_peers(data)[source]
Combine the data from each peer into a list. The return value will be a list of all the data elements provided by each process that calls this function.
With multiple processes, this must: - act as a synchronization point - return the list in the same order on each process
- gather()[source]
Gather the results of the computation that was kicked off via a previous scatter. Returns: - a list of LocalResults, representing the results for each process. each result will be the return value of the do_execute function from scatter() for one process.
- gather_arrays_to_root(sendbuf, recvbuf_spec)[source]
Gather the data in the send buffer to the root process. Parameters are: - sendbuf: the data to be sent from this process - recvbuf_spec: a tuple of (receive buffer, list of sizes) where the list of sizes is how much data will come from each process. Ignored if not the root process.
- is_root_process()[source]
Return whether the current process should be considered as the root of the parallel group it is a part of.
- number_of_worker_processes()[source]
Return how many total processes are running local simulations.
- results_from_local_tree(results)[source]
Given a list of LocalResults objects, return a sublist of the ones the current process is responsible for.
- scatter(do_build, do_build_kwargs, do_execute, all_parameters)[source]
Scatter the specified execution out, as defined by the implementation’s parallelism, for a list of parameters. Args: - do_build: a function that builds the arguments necessary for the execution function. expected to return a list that will be exploded and passed into the do_execute function as arguments. - do_build_kwargs: a dictionary of keyword arguments for the do_build function - do_execute: the execution function. expected to take in the list of local parameters as its first argument. any arguments after that should match the list returned by the do_build function. - all_parameters: a list of all parameters to be run. included so that different implementations of the parallel manager can make decisions about splitting and parallelization.
- shut_down_ray()[source]
used to shutdown ray instance after run only in local mode if running on cluster or with head, we assume external head script will shut it down as all we do here is connect to it
- sum_values_and_sync(sendbuf, recvbuf)[source]
Sum a list of values from each process and sync the result to all processes. Parameters: - sendbuf: an array of values the local process is contributing to the sum - recvbuf: an array of a single value that will hold the summed result (same on each process) when the function returns.
Note: this is a specific case of a global reduce (with sum() as the reducing function). If more than sum() is needed in the future, this function should be extended to receive an Op argument.
- sync_array_with_peers(data)[source]
Synchronize an array with all peers processes. The data parameter is either: - (if root) the source of truth that will be broadcast to all processes - (if not root) an empty buffer that will hold the data sent from root once the function returns
- sync_pyobject_with_peers(obj)[source]
Synchronize a python object with all peer processes. The obj parameter is either: - (if root) the source of truth that will be broadcast to all processes - (if not root) ignored
Different from sync_array_with_peers in that it returns the synced object rather than using an existing buffer.
watertap.tools.parallel.results module
- class watertap.tools.parallel.results.LocalResults(process_number, parameters, results)[source]
Bases:
object
Class representing the results of one process’s run of an optimization routine. Parameters: - process_number: a unique number identifying the process in the group - parameters: a list containing an entry for each parameter group run by the process - results: the results of the optimization routine run for all parameter groups in the list
watertap.tools.parallel.single_process_parallel_manager module
- class watertap.tools.parallel.single_process_parallel_manager.SingleProcessParallelManager(**kwargs)[source]
Bases:
ParallelManager
- combine_data_with_peers(data)[source]
Combine the data from each peer into a list. The return value will be a list of all the data elements provided by each process that calls this function.
With multiple processes, this must: - act as a synchronization point - return the list in the same order on each process
- gather()[source]
Gather the results of the computation that was kicked off via a previous scatter. Returns: - a list of LocalResults, representing the results for each process. each result will be the return value of the do_execute function from scatter() for one process.
- gather_arrays_to_root(sendbuf, recvbuf_spec)[source]
Gather the data in the send buffer to the root process. Parameters are: - sendbuf: the data to be sent from this process - recvbuf_spec: a tuple of (receive buffer, list of sizes) where the list of sizes is how much data will come from each process. Ignored if not the root process.
- is_root_process()[source]
Return whether the current process should be considered as the root of the parallel group it is a part of.
- number_of_worker_processes()[source]
Return how many total processes are running local simulations.
- results_from_local_tree(results)[source]
Given a list of LocalResults objects, return a sublist of the ones the current process is responsible for.
- scatter(do_build, do_build_kwargs, do_execute, all_parameters)[source]
Scatter the specified execution out, as defined by the implementation’s parallelism, for a list of parameters. Args: - do_build: a function that builds the arguments necessary for the execution function. expected to return a list that will be exploded and passed into the do_execute function as arguments. - do_build_kwargs: a dictionary of keyword arguments for the do_build function - do_execute: the execution function. expected to take in the list of local parameters as its first argument. any arguments after that should match the list returned by the do_build function. - all_parameters: a list of all parameters to be run. included so that different implementations of the parallel manager can make decisions about splitting and parallelization.
- sum_values_and_sync(sendbuf, recvbuf)[source]
Sum a list of values from each process and sync the result to all processes. Parameters: - sendbuf: an array of values the local process is contributing to the sum - recvbuf: an array of a single value that will hold the summed result (same on each process) when the function returns.
Note: this is a specific case of a global reduce (with sum() as the reducing function). If more than sum() is needed in the future, this function should be extended to receive an Op argument.
- sync_array_with_peers(data)[source]
Synchronize an array with all peers processes. The data parameter is either: - (if root) the source of truth that will be broadcast to all processes - (if not root) an empty buffer that will hold the data sent from root once the function returns
- sync_pyobject_with_peers(obj)[source]
Synchronize a python object with all peer processes. The obj parameter is either: - (if root) the source of truth that will be broadcast to all processes - (if not root) ignored
Different from sync_array_with_peers in that it returns the synced object rather than using an existing buffer.