Source code for watertap.tools.oli_api.client

#################################################################################
# WaterTAP Copyright (c) 2020-2024, The Regents of the University of California,
# through Lawrence Berkeley National Laboratory, Oak Ridge National Laboratory,
# National Renewable Energy Laboratory, and National Energy Technology
# Laboratory (subject to receipt of any required approvals from the U.S. Dept.
# of Energy). All rights reserved.
#
# Please see the files COPYRIGHT.md and LICENSE.md for full copyright and license
# information, respectively. These files are also available online at the URL
# "https://github.com/watertap-org/watertap/"
#################################################################################

###############################################################################
#
# OLI Systems, Inc. Copyright © 2022, all rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation and/or
# other materials provided with the distribution.
#
# 3. Neither the name of OLI Systems, Inc. nor the names of any contributors to
# the software made available herein may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
# OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
# SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
# OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
# TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
# EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# You are under no obligation whatsoever to provide any bug fixes, patches, or upgrades to the
# features, functionality or performance of the source code ("Enhancements") to anyone; however,
# if you choose to make your Enhancements available either publicly, or directly to OLI Systems, Inc.,
# without imposing a separate written license agreement for such Enhancements, then you hereby grant
# the following license: a non-exclusive, royalty-free perpetual license to install, use, modify, prepare
# derivative works, incorporate into other computer software, distribute, and sublicense such enhancements
# or derivative works thereof, in binary and source code form.
###############################################################################

__author__ = "Adam Atia, Adi Bannady, Paul Vecchiarelli"

import logging

import sys
import requests
import json
import time

from watertap.tools.oli_api.util.watertap_to_oli_helper_functions import get_oli_name

_logger = logging.getLogger(__name__)
handler = logging.StreamHandler()
formatter = logging.Formatter(
    "OLIAPI - %(asctime)s - %(levelname)s - %(message)s", "%H:%M:%S"
)
handler.setFormatter(formatter)
_logger.addHandler(handler)
_logger.setLevel(logging.DEBUG)


[docs]class OLIApi: """ A class to wrap OLI Cloud API calls and access functions for interfacing with WaterTAP. """
[docs] def __init__(self, credential_manager, interactive_mode=True, debug_level="INFO"): """ Construct all necessary attributes for OLIApi class. :param credential_manager_class: class used to manage credentials :param interactive_mode: enables direct interaction with user through prompts :param debug_level: string defining level of logging activity """ self.credential_manager = credential_manager self.interactive_mode = interactive_mode if self.interactive_mode: _logger.info( "OLI running in interactive mode, disable with 'interactive_mode=False'" ) if debug_level == "INFO": _logger.setLevel(logging.INFO) else: _logger.setLevel(logging.DEBUG)
# binds OLIApi instance to context manager def __enter__(self): self.session_dbs_files = [] return self # return False if no exceptions raised def __exit__(self, exc_type=None, exc_value=None, traceback=None): # delete all .dbs files created during session self.dbs_file_cleanup(self.session_dbs_files) return False def _prompt(self, msg, default=""): if self.interactive_mode: return input(msg) else: _logger.info(msg) return default
[docs] def upload_dbs_file(self, dbs_file_path, keep_file=False): """ Upload a DBS file to OLI Cloud given a full file path. :param dbs_file_path: string path to DBS file :param keep_file: bool to remove (default) or ignore DBS file during session cleanup :return dbs_file_id: string name for DBS file ID """ with open(dbs_file_path, "rb") as file: req = requests.post( self.credential_manager.upload_dbs_url, headers=self.credential_manager.headers, files={"files": file}, ) dbs_file_id = _request_status_test(req, ["UPLOADED"])["file"][0]["id"] if bool(dbs_file_id): if not keep_file: self.session_dbs_files.append(dbs_file_id) _logger.info(f"DBS file ID is {dbs_file_id}") return dbs_file_id else: raise RuntimeError("Unexpected failure getting DBS file ID.")
[docs] def generate_dbs_file( self, inflows, thermo_framework=None, model_name=None, phases=None, databanks=None, keep_file=False, ): """ Generate a DBS file on OLI Cloud given chemistry inputs. :param inflows: dictionary with inflows and optional custom parameters :param thermo_framework: string name of thermodynamic databank to use :param model_name: string name of model OLI will use :param phases: container dict for chemistry model parameters :param databanks: list of databanks to include in DBS file :param keep_file: bool to remove (default) or ignore DBS file during session cleanup :return dbs_file_id: string name for DBS file ID """ dbs_file_inputs = { "thermodynamicFramework": None, "modelName": None, "phases": None, "inflows": None, "privateDatabanks": None, } if not inflows: raise RuntimeError("Inflows must be defined for Water Analysis.") solute_list = [{"name": get_oli_name(k)} for k in inflows] if not solute_list: raise RuntimeError("No inflows extracted from {inflows}.") dbs_file_inputs["inflows"] = solute_list if thermo_framework is not None: if thermo_framework in ["MSE (H3O+ ion)", "Aqueous (H+ ion)"]: dbs_file_inputs["thermodynamicFramework"] = thermo_framework else: raise RuntimeError( "Failed DBS file generation. " + f"Unexpected thermo_framework: {thermo_framework}." ) else: dbs_file_inputs["thermodynamicFramework"] = "MSE (H3O+ ion)" if model_name is not None: dbs_file_inputs["modelName"] = str(model_name) else: dbs_file_inputs["modelName"] = "OLI_analysis" # TODO: unknown bug where only "liquid1" phase is found in Flash analysis valid_phases = ["liquid1", "vapor", "solid", "liquid2"] if phases is not None: invalid_phases = [p for p in phases if p not in valid_phases] if invalid_phases: raise RuntimeError( "Failed DBS file generation. " + f"Unexpected phase(s): {invalid_phases}" ) dbs_file_inputs["phases"] = phases else: dbs_file_inputs["phases"] = ["liquid1", "solid"] valid_databanks = ["XSC"] if databanks is not None: invalid_databanks = [db for db in databanks if db not in valid_databanks] if invalid_databanks: raise RuntimeError( "Failed DBS file generation. " + f"Unexpected databanks(s): {invalid_databanks}" ) dbs_file_inputs["privateDatabanks"] = databanks dbs_dict = { "method": "chemistrybuilder.generateDBS", "params": {k: v for k, v in dbs_file_inputs.items() if v is not None}, } _logger.debug(f"DBS input dictionary: {dbs_dict}") req = requests.post( self.credential_manager.dbs_url, headers=self.credential_manager.update_headers( {"Content-Type": "application/json"} ), data=json.dumps(dbs_dict), ) dbs_file_id = _request_status_test(req, ["SUCCESS"])["data"]["id"] if bool(dbs_file_id): if not keep_file: self.session_dbs_files.append(dbs_file_id) _logger.info(f"DBS file ID is {dbs_file_id}") return dbs_file_id else: raise RuntimeError("Unexpected failure getting DBS file ID.")
[docs] def get_dbs_file_summary(self, dbs_file_id): """ Get chemistry info and flash history for a DBS file. :param dbs_file_id: string name for DBS file ID :return dbs_file_summary: dictionary containing JSON results from OLI Cloud """ _logger.info(f"Getting summary for {dbs_file_id} ...") chemistry_info = self.call("chemistry-info", dbs_file_id) req_flash_hist = requests.get( f"{self.credential_manager.engine_url}/flash/history/{dbs_file_id}", headers=self.credential_manager.headers, ) flash_history = _request_status_test(req_flash_hist, None)["data"] dbs_file_summary = { "chemistry_info": chemistry_info, "flash_history": flash_history, } missing_keys = [k for k, v in dbs_file_summary.items() if not v] if len(missing_keys) > 0: raise RuntimeError( " Failed to create DBS file summary. " + f"Missing {missing_keys}." ) _logger.info(f"Completed DBS file summarization") return dbs_file_summary
[docs] def get_user_dbs_file_ids(self): """ Get all DBS files on user's cloud. :return user_dbs_file_ids: list of user DBS files saved on OLI Cloud """ _logger.info("Getting DBS file IDs for user ...") req = requests.get( self.credential_manager.dbs_url, headers=self.credential_manager.headers, ) user_dbs_file_ids = [ k["fileId"] for k in _request_status_test(req, None)["data"] ] _logger.info(f"{len(user_dbs_file_ids)} DBS files found for user") return user_dbs_file_ids
[docs] def dbs_file_cleanup(self, dbs_file_ids=None): """ Delete all (or specified) DBS files on OLI Cloud. :param dbs_file_ids: list of DBS file IDs to delete """ if dbs_file_ids is None: dbs_file_ids = self.get_user_dbs_file_ids() r = self._prompt( f"WaterTAP will delete {len(dbs_file_ids)} DBS files [y]/n ", "y" ) if (r.lower() == "y") or (r == ""): for dbs_file_id in dbs_file_ids: _logger.info(f"Deleting {dbs_file_id} ...") req = requests.request( "DELETE", f"{self.credential_manager._delete_dbs_url}{dbs_file_id}", headers=self.credential_manager.headers, ) req = _request_status_test(req, ["SUCCESS"]) if req["status"] == "SUCCESS": _logger.info(f"File deleted")
[docs] def get_corrosion_contact_surfaces(self, dbs_file_id): """ Get list of valid contact surfaces for corrosion analysis, given a DBS file ID. :param dbs_file_id: string name for DBS file ID :return contact_surfaces: JSON results from OLI Cloud """ _logger.info(f"Checking chemistry info for {dbs_file_id} ...") chemistry_info = self.call("chemistry-info", dbs_file_id) framework = chemistry_info["result"]["thermodynamicFramework"] if framework != "AQ": raise RuntimeError( f"Invalid thermodynamic framework for {dbs_file_id}. " f"Corrosion analyzer requires 'Aqueous (H+ ion)', not {framework}." ) _logger.info(f"Getting contact surfaces for {dbs_file_id} ...") contact_surfaces = self.call("corrosion-contact-surface", dbs_file_id) return contact_surfaces
def _get_flash_mode(self, dbs_file_id, flash_method, burst_job_tag=None): if not bool(flash_method): raise IOError("Specify a flash method to run.") if not bool(dbs_file_id): raise IOError("Specify a DBS file ID to flash.") headers = self.credential_manager.headers base_url = self.credential_manager.engine_url valid_get_flashes = ["corrosion-contact-surface", "chemistry-info"] valid_post_flashes = ["isothermal", "corrosion-rates", "wateranalysis"] if flash_method in valid_get_flashes: mode = "GET" url = f"{base_url}/file/{dbs_file_id}/{flash_method}" elif flash_method in valid_post_flashes: mode = "POST" url = f"{base_url}/flash/{dbs_file_id}/{flash_method}" headers = self.credential_manager.update_headers( {"Content-Type": "application/json"}, ) if burst_job_tag is not None: url = "{}?{}".format( url, "burst={}_{}".format("watertap_burst", burst_job_tag) ) else: valid_flashes = [*valid_get_flashes, *valid_post_flashes] raise RuntimeError( f" Unexpected value for flash_method: {flash_method}. " + "Valid values: {', '.join(valid_flashes)}." ) return mode, url, headers
[docs] def process_request_list(self, requests, **kwargs): """ Process a list of flash calculation requests for OLI. :param requests: list of request dictionaries containing flash_method, dbs_file_id, and json_input """ _logger.info("Collecting requested samples in serial mode ...") result_list = [] acquire_timer = time.time() for idx, request in enumerate(requests): _logger.info(f"Submitting sample #{idx+1} of {len(requests)} ...") result = self.call(**request) result["submitted_requests"] = request result_list.append(result) acquire_time = time.time() - acquire_timer num_samples = len(requests) _logger.info( f"Finished all {num_samples} jobs from OLI. " + f"Total: {acquire_time} s, " + f"Rate: {acquire_time/num_samples} s/sample" ) return result_list
[docs] def call( self, flash_method=None, dbs_file_id=None, input_params=None, poll_time=0.5, max_request=100, **kwargs, ): """ Make a call to the OLI Cloud API. :param flash_method: string indicating flash method :param dbs_file_id: string indicating DBS file :param input_params: dictionary for flash calculation inputs :param poll_time: seconds between each poll :param max_request: maximum number of times to try request before failure :return result: dictionary for JSON output result """ mode, url, headers = self._get_flash_mode(dbs_file_id, flash_method) req = requests.request( mode, url, headers=headers, data=json.dumps(input_params) ) req_json = _request_status_test(req, ["SUCCESS"]) result_link = _get_result_link(req_json) result = _poll_result_link(result_link, headers, max_request, poll_time) return result
def _get_result_link(req_json): """ Get result link from OLI Cloud request. :param req_json: JSON containing result of request :return result_link: string indicating URL to access call results """ if "data" in req_json: if "status" in req_json["data"]: if req_json["data"]["status"] in ["IN QUEUE", "IN PROGRESS"]: if "resultsLink" in req_json["data"]: result_link = req_json["data"]["resultsLink"] if "resultsLink" in req_json["data"]: result_link = req_json["data"]["resultsLink"] if not result_link: raise RuntimeError(f"Failed to get 'resultsLink'. Response: {req.json()}") return result_link def _request_status_test(req, target_keys): """ Check result of OLI request (except async). :param req: response object :param target_keys: list containing key(s) that indicate successful request :return req_json: response object converted to JSON """ req_json = req.json() func_name = sys._getframe().f_back.f_code.co_name _logger.debug(f"{func_name} response: {req_json}") if req.status_code == 200: if target_keys: if "status" in req_json: if req_json["status"] in target_keys: return req_json else: return req_json raise RuntimeError(f"Failure in {func_name}. Response: {req_json}") def _poll_result_link(result_link, headers, max_request, poll_time): """ Poll result link from OLI Flash calculation request. :param result_link: string indicating URL to access call results :param headers: dictionary for OLI Cloud headers :param max_request: integer for number of requests before poll limit error :param poll_time: float for time in between poll requests return result: JSON containing results from successful Flash calculation """ for _ in range(max_request): time.sleep(poll_time) result_req = requests.get(result_link, headers=headers) result_req = _request_status_test( result_req, ["IN QUEUE", "IN PROGRESS", "PROCESSED", "FAILED"] ) _logger.info(f"Polling result link: {result_req['status']}") if result_req["status"] in ["PROCESSED", "FAILED"]: if result_req["data"]: result = result_req["data"] return result raise RuntimeError("Poll limit exceeded.")