watertap.tools.parallel package

Submodules

watertap.tools.parallel.concurrent_futures_parallel_manager module

class watertap.tools.parallel.concurrent_futures_parallel_manager.ConcurrentFuturesParallelManager(number_of_subprocesses=1, **kwargs)[source]

Bases: ParallelManager

__init__(number_of_subprocesses=1, **kwargs)[source]
combine_data_with_peers(data)[source]

Combine the data from each peer into a list. The return value will be a list of all the data elements provided by each process that calls this function.

With multiple processes, this must: - act as a synchronization point - return the list in the same order on each process

gather()[source]

Gather the results of the computation that was kicked off via a previous scatter. Returns: - a list of LocalResults, representing the results for each process. each result will be the return value of the do_execute function from scatter() for one process.

gather_arrays_to_root(sendbuf, recvbuf_spec)[source]

Gather the data in the send buffer to the root process. Parameters are: - sendbuf: the data to be sent from this process - recvbuf_spec: a tuple of (receive buffer, list of sizes) where the list of sizes is how much data will come from each process. Ignored if not the root process.

get_rank()[source]

Get the process’s rank number in its parallel peer group.

is_root_process()[source]

Return whether the current process should be considered as the root of the parallel group it is a part of.

number_of_worker_processes()[source]

Return how many total processes are running local simulations.

results_from_local_tree(results)[source]

Given a list of LocalResults objects, return a sublist of the ones the current process is responsible for.

scatter(do_build, do_build_kwargs, do_execute, all_parameters)[source]

Scatter the specified execution out, as defined by the implementation’s parallelism, for a list of parameters. Args: - do_build: a function that builds the arguments necessary for the execution function. expected to return a list that will be exploded and passed into the do_execute function as arguments. - do_build_kwargs: a dictionary of keyword arguments for the do_build function - do_execute: the execution function. expected to take in the list of local parameters as its first argument. any arguments after that should match the list returned by the do_build function. - all_parameters: a list of all parameters to be run. included so that different implementations of the parallel manager can make decisions about splitting and parallelization.

sum_values_and_sync(sendbuf, recvbuf)[source]

Sum a list of values from each process and sync the result to all processes. Parameters: - sendbuf: an array of values the local process is contributing to the sum - recvbuf: an array of a single value that will hold the summed result (same on each process) when the function returns.

Note: this is a specific case of a global reduce (with sum() as the reducing function). If more than sum() is needed in the future, this function should be extended to receive an Op argument.

sync_array_with_peers(data)[source]

Synchronize an array with all peers processes. The data parameter is either: - (if root) the source of truth that will be broadcast to all processes - (if not root) an empty buffer that will hold the data sent from root once the function returns

sync_pyobject_with_peers(obj)[source]

Synchronize a python object with all peer processes. The obj parameter is either: - (if root) the source of truth that will be broadcast to all processes - (if not root) ignored

Different from sync_array_with_peers in that it returns the synced object rather than using an existing buffer.

sync_with_peers()[source]

Implement a synchronization point with any peer processes.

watertap.tools.parallel.mpi_parallel_manager module

class watertap.tools.parallel.mpi_parallel_manager.MPIParallelManager(MPI, **kwargs)[source]

Bases: ParallelManager

__init__(MPI, **kwargs)[source]
combine_data_with_peers(data)[source]

Combine the data from each peer into a list. The return value will be a list of all the data elements provided by each process that calls this function.

With multiple processes, this must: - act as a synchronization point - return the list in the same order on each process

gather()[source]

Gather the results of the computation that was kicked off via a previous scatter. Returns: - a list of LocalResults, representing the results for each process. each result will be the return value of the do_execute function from scatter() for one process.

gather_arrays_to_root(sendbuf, recvbuf_spec)[source]

Gather the data in the send buffer to the root process. Parameters are: - sendbuf: the data to be sent from this process - recvbuf_spec: a tuple of (receive buffer, list of sizes) where the list of sizes is how much data will come from each process. Ignored if not the root process.

get_rank()[source]

Get the process’s rank number in its parallel peer group.

is_root_process()[source]

Return whether the current process should be considered as the root of the parallel group it is a part of.

number_of_worker_processes()[source]

Return how many total processes are running local simulations.

results_from_local_tree(results)[source]

Given a list of LocalResults objects, return a sublist of the ones the current process is responsible for.

scatter(do_build, do_build_kwargs, do_execute, all_parameters)[source]

Scatter the specified execution out, as defined by the implementation’s parallelism, for a list of parameters. Args: - do_build: a function that builds the arguments necessary for the execution function. expected to return a list that will be exploded and passed into the do_execute function as arguments. - do_build_kwargs: a dictionary of keyword arguments for the do_build function - do_execute: the execution function. expected to take in the list of local parameters as its first argument. any arguments after that should match the list returned by the do_build function. - all_parameters: a list of all parameters to be run. included so that different implementations of the parallel manager can make decisions about splitting and parallelization.

sum_values_and_sync(sendbuf, recvbuf)[source]

Sum a list of values from each process and sync the result to all processes. Parameters: - sendbuf: an array of values the local process is contributing to the sum - recvbuf: an array of a single value that will hold the summed result (same on each process) when the function returns.

Note: this is a specific case of a global reduce (with sum() as the reducing function). If more than sum() is needed in the future, this function should be extended to receive an Op argument.

sync_array_with_peers(data)[source]

Broadcast the array to all processes. this call acts as a synchronization point when run by multiple peer mpi processes.

sync_pyobject_with_peers(obj)[source]

Broadcast the object to all processes from the root. this call acts as a synchronization point when run by multiple peer mpi processes.

sync_with_peers()[source]

Implement a synchronization point with any peer processes.

watertap.tools.parallel.multiprocessing_parallel_manager module

class watertap.tools.parallel.multiprocessing_parallel_manager.MultiprocessingParallelManager(number_of_subprocesses=1, **kwargs)[source]

Bases: ParallelManager

__init__(number_of_subprocesses=1, **kwargs)[source]
combine_data_with_peers(data)[source]

Combine the data from each peer into a list. The return value will be a list of all the data elements provided by each process that calls this function.

With multiple processes, this must: - act as a synchronization point - return the list in the same order on each process

gather()[source]

Gather the results of the computation that was kicked off via a previous scatter. Returns: - a list of LocalResults, representing the results for each process. each result will be the return value of the do_execute function from scatter() for one process.

gather_arrays_to_root(sendbuf, recvbuf_spec)[source]

Gather the data in the send buffer to the root process. Parameters are: - sendbuf: the data to be sent from this process - recvbuf_spec: a tuple of (receive buffer, list of sizes) where the list of sizes is how much data will come from each process. Ignored if not the root process.

get_rank()[source]

Get the process’s rank number in its parallel peer group.

is_root_process()[source]

Return whether the current process should be considered as the root of the parallel group it is a part of.

number_of_worker_processes()[source]

Return how many total processes are running local simulations.

results_from_local_tree(results)[source]

Given a list of LocalResults objects, return a sublist of the ones the current process is responsible for.

scatter(do_build, do_build_kwargs, do_execute, all_parameters)[source]

Scatter the specified execution out, as defined by the implementation’s parallelism, for a list of parameters. Args: - do_build: a function that builds the arguments necessary for the execution function. expected to return a list that will be exploded and passed into the do_execute function as arguments. - do_build_kwargs: a dictionary of keyword arguments for the do_build function - do_execute: the execution function. expected to take in the list of local parameters as its first argument. any arguments after that should match the list returned by the do_build function. - all_parameters: a list of all parameters to be run. included so that different implementations of the parallel manager can make decisions about splitting and parallelization.

sum_values_and_sync(sendbuf, recvbuf)[source]

Sum a list of values from each process and sync the result to all processes. Parameters: - sendbuf: an array of values the local process is contributing to the sum - recvbuf: an array of a single value that will hold the summed result (same on each process) when the function returns.

Note: this is a specific case of a global reduce (with sum() as the reducing function). If more than sum() is needed in the future, this function should be extended to receive an Op argument.

sync_array_with_peers(data)[source]

Synchronize an array with all peers processes. The data parameter is either: - (if root) the source of truth that will be broadcast to all processes - (if not root) an empty buffer that will hold the data sent from root once the function returns

sync_pyobject_with_peers(obj)[source]

Synchronize a python object with all peer processes. The obj parameter is either: - (if root) the source of truth that will be broadcast to all processes - (if not root) ignored

Different from sync_array_with_peers in that it returns the synced object rather than using an existing buffer.

sync_with_peers()[source]

Implement a synchronization point with any peer processes.

watertap.tools.parallel.parallel_manager module

class watertap.tools.parallel.parallel_manager.ParallelManager[source]

Bases: ABC

abstract combine_data_with_peers(data)[source]

Combine the data from each peer into a list. The return value will be a list of all the data elements provided by each process that calls this function.

With multiple processes, this must: - act as a synchronization point - return the list in the same order on each process

abstract gather()[source]

Gather the results of the computation that was kicked off via a previous scatter. Returns: - a list of LocalResults, representing the results for each process. each result will be the return value of the do_execute function from scatter() for one process.

abstract gather_arrays_to_root(sendbuf, recvbuf_spec)[source]

Gather the data in the send buffer to the root process. Parameters are: - sendbuf: the data to be sent from this process - recvbuf_spec: a tuple of (receive buffer, list of sizes) where the list of sizes is how much data will come from each process. Ignored if not the root process.

abstract get_rank()[source]

Get the process’s rank number in its parallel peer group.

abstract is_root_process()[source]

Return whether the current process should be considered as the root of the parallel group it is a part of.

abstract number_of_worker_processes()[source]

Return how many total processes are running local simulations.

abstract results_from_local_tree(results)[source]

Given a list of LocalResults objects, return a sublist of the ones the current process is responsible for.

abstract scatter(do_build, do_build_kwargs, do_execute, all_parameters)[source]

Scatter the specified execution out, as defined by the implementation’s parallelism, for a list of parameters. Args: - do_build: a function that builds the arguments necessary for the execution function. expected to return a list that will be exploded and passed into the do_execute function as arguments. - do_build_kwargs: a dictionary of keyword arguments for the do_build function - do_execute: the execution function. expected to take in the list of local parameters as its first argument. any arguments after that should match the list returned by the do_build function. - all_parameters: a list of all parameters to be run. included so that different implementations of the parallel manager can make decisions about splitting and parallelization.

abstract sum_values_and_sync(sendbuf, recvbuf)[source]

Sum a list of values from each process and sync the result to all processes. Parameters: - sendbuf: an array of values the local process is contributing to the sum - recvbuf: an array of a single value that will hold the summed result (same on each process) when the function returns.

Note: this is a specific case of a global reduce (with sum() as the reducing function). If more than sum() is needed in the future, this function should be extended to receive an Op argument.

abstract sync_array_with_peers(data)[source]

Synchronize an array with all peers processes. The data parameter is either: - (if root) the source of truth that will be broadcast to all processes - (if not root) an empty buffer that will hold the data sent from root once the function returns

abstract sync_pyobject_with_peers(obj)[source]

Synchronize a python object with all peer processes. The obj parameter is either: - (if root) the source of truth that will be broadcast to all processes - (if not root) ignored

Different from sync_array_with_peers in that it returns the synced object rather than using an existing buffer.

abstract sync_with_peers()[source]

Implement a synchronization point with any peer processes.

watertap.tools.parallel.parallel_manager.build_and_execute(do_build, do_build_kwargs, do_execute, local_parameters)[source]

Entrypoint for implementations of the parallel manager to use for running the build and execute functions. Defined at the top level so that it’s picklable.

For a description of the first three arguments, see the scatter() function above. The fourth argument is the list of local parameters that should be run by this process.

watertap.tools.parallel.parallel_manager_factory module

watertap.tools.parallel.parallel_manager_factory.create_parallel_manager(parallel_manager_class=None, **kwargs)[source]

Create and return an instance of a ParallelManager, based on the libraries available in the runtime environment.

Allows an optional python class to be passed in as parallel_manager_class. If so, this class is instantiated and returned rather than checking the local environment.

watertap.tools.parallel.parallel_manager_factory.get_mpi_comm_process()[source]

Returns mpi comm world

watertap.tools.parallel.parallel_manager_factory.has_mpi_peer_processes()[source]

Returns whether the process was run as part of an MPI group with > 1 processes.

watertap.tools.parallel.parallel_manager_factory.should_fan_out(number_of_subprocesses)[source]

Returns whether the manager should fan out the computation to subprocesses. This is mutually exclusive with the process running under MPI.

watertap.tools.parallel.ray_io_parallel_manager module

class watertap.tools.parallel.ray_io_parallel_manager.RayIoParallelManager(number_of_subprocesses=1, **kwargs)[source]

Bases: ParallelManager

__init__(number_of_subprocesses=1, **kwargs)[source]
combine_data_with_peers(data)[source]

Combine the data from each peer into a list. The return value will be a list of all the data elements provided by each process that calls this function.

With multiple processes, this must: - act as a synchronization point - return the list in the same order on each process

gather()[source]

Gather the results of the computation that was kicked off via a previous scatter. Returns: - a list of LocalResults, representing the results for each process. each result will be the return value of the do_execute function from scatter() for one process.

gather_arrays_to_root(sendbuf, recvbuf_spec)[source]

Gather the data in the send buffer to the root process. Parameters are: - sendbuf: the data to be sent from this process - recvbuf_spec: a tuple of (receive buffer, list of sizes) where the list of sizes is how much data will come from each process. Ignored if not the root process.

get_rank()[source]

Get the process’s rank number in its parallel peer group.

is_root_process()[source]

Return whether the current process should be considered as the root of the parallel group it is a part of.

number_of_worker_processes()[source]

Return how many total processes are running local simulations.

results_from_local_tree(results)[source]

Given a list of LocalResults objects, return a sublist of the ones the current process is responsible for.

scatter(do_build, do_build_kwargs, do_execute, all_parameters)[source]

Scatter the specified execution out, as defined by the implementation’s parallelism, for a list of parameters. Args: - do_build: a function that builds the arguments necessary for the execution function. expected to return a list that will be exploded and passed into the do_execute function as arguments. - do_build_kwargs: a dictionary of keyword arguments for the do_build function - do_execute: the execution function. expected to take in the list of local parameters as its first argument. any arguments after that should match the list returned by the do_build function. - all_parameters: a list of all parameters to be run. included so that different implementations of the parallel manager can make decisions about splitting and parallelization.

shut_down_ray()[source]

used to shutdown ray instance after run only in local mode if running on cluster or with head, we assume external head script will shut it down as all we do here is connect to it

sum_values_and_sync(sendbuf, recvbuf)[source]

Sum a list of values from each process and sync the result to all processes. Parameters: - sendbuf: an array of values the local process is contributing to the sum - recvbuf: an array of a single value that will hold the summed result (same on each process) when the function returns.

Note: this is a specific case of a global reduce (with sum() as the reducing function). If more than sum() is needed in the future, this function should be extended to receive an Op argument.

sync_array_with_peers(data)[source]

Synchronize an array with all peers processes. The data parameter is either: - (if root) the source of truth that will be broadcast to all processes - (if not root) an empty buffer that will hold the data sent from root once the function returns

sync_pyobject_with_peers(obj)[source]

Synchronize a python object with all peer processes. The obj parameter is either: - (if root) the source of truth that will be broadcast to all processes - (if not root) ignored

Different from sync_array_with_peers in that it returns the synced object rather than using an existing buffer.

sync_with_peers()[source]

Implement a synchronization point with any peer processes.

watertap.tools.parallel.results module

class watertap.tools.parallel.results.LocalResults(process_number, parameters, results)[source]

Bases: object

Class representing the results of one process’s run of an optimization routine. Parameters: - process_number: a unique number identifying the process in the group - parameters: a list containing an entry for each parameter group run by the process - results: the results of the optimization routine run for all parameter groups in the list

__init__(process_number, parameters, results)[source]

watertap.tools.parallel.single_process_parallel_manager module

class watertap.tools.parallel.single_process_parallel_manager.SingleProcessParallelManager(**kwargs)[source]

Bases: ParallelManager

__init__(**kwargs)[source]
combine_data_with_peers(data)[source]

Combine the data from each peer into a list. The return value will be a list of all the data elements provided by each process that calls this function.

With multiple processes, this must: - act as a synchronization point - return the list in the same order on each process

gather()[source]

Gather the results of the computation that was kicked off via a previous scatter. Returns: - a list of LocalResults, representing the results for each process. each result will be the return value of the do_execute function from scatter() for one process.

gather_arrays_to_root(sendbuf, recvbuf_spec)[source]

Gather the data in the send buffer to the root process. Parameters are: - sendbuf: the data to be sent from this process - recvbuf_spec: a tuple of (receive buffer, list of sizes) where the list of sizes is how much data will come from each process. Ignored if not the root process.

get_rank()[source]

Get the process’s rank number in its parallel peer group.

is_root_process()[source]

Return whether the current process should be considered as the root of the parallel group it is a part of.

number_of_worker_processes()[source]

Return how many total processes are running local simulations.

results_from_local_tree(results)[source]

Given a list of LocalResults objects, return a sublist of the ones the current process is responsible for.

scatter(do_build, do_build_kwargs, do_execute, all_parameters)[source]

Scatter the specified execution out, as defined by the implementation’s parallelism, for a list of parameters. Args: - do_build: a function that builds the arguments necessary for the execution function. expected to return a list that will be exploded and passed into the do_execute function as arguments. - do_build_kwargs: a dictionary of keyword arguments for the do_build function - do_execute: the execution function. expected to take in the list of local parameters as its first argument. any arguments after that should match the list returned by the do_build function. - all_parameters: a list of all parameters to be run. included so that different implementations of the parallel manager can make decisions about splitting and parallelization.

sum_values_and_sync(sendbuf, recvbuf)[source]

Sum a list of values from each process and sync the result to all processes. Parameters: - sendbuf: an array of values the local process is contributing to the sum - recvbuf: an array of a single value that will hold the summed result (same on each process) when the function returns.

Note: this is a specific case of a global reduce (with sum() as the reducing function). If more than sum() is needed in the future, this function should be extended to receive an Op argument.

sync_array_with_peers(data)[source]

Synchronize an array with all peers processes. The data parameter is either: - (if root) the source of truth that will be broadcast to all processes - (if not root) an empty buffer that will hold the data sent from root once the function returns

sync_pyobject_with_peers(obj)[source]

Synchronize a python object with all peer processes. The obj parameter is either: - (if root) the source of truth that will be broadcast to all processes - (if not root) ignored

Different from sync_array_with_peers in that it returns the synced object rather than using an existing buffer.

sync_with_peers()[source]

Implement a synchronization point with any peer processes.

Module contents