#################################################################################
# WaterTAP Copyright (c) 2020-2024, The Regents of the University of California,
# through Lawrence Berkeley National Laboratory, Oak Ridge National Laboratory,
# National Renewable Energy Laboratory, and National Energy Technology
# Laboratory (subject to receipt of any required approvals from the U.S. Dept.
# of Energy). All rights reserved.
#
# Please see the files COPYRIGHT.md and LICENSE.md for full copyright and license
# information, respectively. These files are also available online at the URL
# "https://github.com/watertap-org/watertap/"
#################################################################################
import logging
import multiprocessing
from queue import Empty as EmptyQueue
import numpy
from watertap.tools.parallel.results import LocalResults
from watertap.tools.parallel.parallel_manager import (
parallelActor,
ParallelManager,
)
_logger = logging.getLogger(__name__)
[docs]class MultiprocessingParallelManager(ParallelManager):
[docs] def __init__(
self,
number_of_subprocesses=1,
**kwargs,
):
self.max_number_of_subprocesses = number_of_subprocesses
# this will be updated when child processes are kicked off
self.actual_number_of_subprocesses = None
# Future -> (process number, parameters). Used to keep track of the process number and parameters for
# all in-progress futures
self.running_futures = dict()
[docs] def is_root_process(self):
return True
[docs] def get_rank(self):
return self.ROOT_PROCESS_RANK
[docs] def number_of_worker_processes(self):
if self.actual_number_of_subprocesses is None:
return self.max_number_of_subprocesses
return self.actual_number_of_subprocesses
[docs] def sync_with_peers(self):
pass
[docs] def sync_array_with_peers(self, data):
pass
[docs] def sync_pyobject_with_peers(self, obj):
return obj
[docs] def combine_data_with_peers(self, data):
return [data]
[docs] def sum_values_and_sync(self, sendbuf, recvbuf):
recvbuf[:] = sendbuf
[docs] def gather_arrays_to_root(self, sendbuf, recvbuf_spec):
receive_arr = recvbuf_spec[0]
receive_sizes = recvbuf_spec[1]
assert len(receive_arr) == sum(
receive_sizes
), "Gathering arrays to root cannot be done with mismatched sizes"
receive_arr[:] = sendbuf[:]
[docs] def scatter(
self,
do_build,
do_build_kwargs,
do_execute,
all_parameters,
):
# constrain the number of child processes to the number of unique values to be run
self.actual_number_of_subprocesses = min(
self.max_number_of_subprocesses, len(all_parameters)
)
# split the parameters prameters for async run
self.expected_samples = len(all_parameters)
divided_parameters = numpy.array_split(all_parameters, self.expected_samples)
# create queues, run queue will be used to store paramters we want to run
# and return_queue is used to store results
self.run_queue = multiprocessing.Queue()
self.return_queue = multiprocessing.Queue()
for i, param in enumerate(divided_parameters):
# print(param)
self.run_queue.put([i, param])
# setup multiprocessing actors
self.actors = []
for cpu in range(self.actual_number_of_subprocesses):
self.actors.append(
multiprocessing.Process(
target=multiProcessingActor,
args=(
self.run_queue,
self.return_queue,
do_build,
do_build_kwargs,
do_execute,
divided_parameters[0],
),
)
)
self.actors[-1].start()
[docs] def gather(self):
results = []
# collect result from the actors
while len(results) < self.expected_samples:
try:
i, values, result = self.return_queue.get()
results.append(LocalResults(i, values, result))
except EmptyQueue:
break
self._shut_down()
# sort the results by the process number to keep a deterministic ordering
results.sort(key=lambda result: result.process_number)
return results
def _shut_down(self):
n_shut_down = 0
for process in self.actors:
_logger.debug("Attempting to shut down %s", process)
process.kill()
n_shut_down += 1
self.actors.clear()
_logger.debug("Shut down %d processes", n_shut_down)
[docs] def results_from_local_tree(self, results):
return results
# This function is used for running the actors in multprocessing
def multiProcessingActor(
queue,
return_queue,
do_build,
do_build_kwargs,
do_execute,
local_parameters,
):
actor = parallelActor(do_build, do_build_kwargs, do_execute, local_parameters)
while True:
try:
msg = queue.get()
except EmptyQueue:
return
i, local_parameters = msg
result = actor.execute(local_parameters)
return_queue.put([i, local_parameters, result])