Source code for watertap.tools.oli_api.flash
#################################################################################
# WaterTAP Copyright (c) 2020-2024, The Regents of the University of California,
# through Lawrence Berkeley National Laboratory, Oak Ridge National Laboratory,
# National Renewable Energy Laboratory, and National Energy Technology
# Laboratory (subject to receipt of any required approvals from the U.S. Dept.
# of Energy). All rights reserved.
#
# Please see the files COPYRIGHT.md and LICENSE.md for full copyright and license
# information, respectively. These files are also available online at the URL
# "https://github.com/watertap-org/watertap/"
#################################################################################
###############################################################################
#
# OLI Systems, Inc. Copyright © 2022, all rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation and/or
# other materials provided with the distribution.
#
# 3. Neither the name of OLI Systems, Inc. nor the names of any contributors to
# the software made available herein may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
# OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
# SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
# OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
# TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
# EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# You are under no obligation whatsoever to provide any bug fixes, patches, or upgrades to the
# features, functionality or performance of the source code ("Enhancements") to anyone; however,
# if you choose to make your Enhancements available either publicly, or directly to OLI Systems, Inc.,
# without imposing a separate written license agreement for such Enhancements, then you hereby grant
# the following license: a non-exclusive, royalty-free perpetual license to install, use, modify, prepare
# derivative works, incorporate into other computer software, distribute, and sublicense such enhancements
# or derivative works thereof, in binary and source code form.
###############################################################################
__author__ = "Oluwamayowa Amusat, Alexander Dudchenko, Paul Vecchiarelli"
import logging
import json
from pathlib import Path
from copy import deepcopy
from itertools import product
from watertap.tools.oli_api.util.watertap_to_oli_helper_functions import (
get_oli_name,
get_charge,
get_charge_group,
)
from watertap.tools.oli_api.util.fixed_keys_dict import (
optional_properties,
input_unit_set,
output_unit_set,
)
_logger = logging.getLogger(__name__)
handler = logging.StreamHandler()
formatter = logging.Formatter(
"OLIAPI - %(asctime)s - %(levelname)s - %(message)s", "%H:%M:%S"
)
handler.setFormatter(formatter)
_logger.addHandler(handler)
_logger.setLevel(logging.DEBUG)
[docs]class Flash:
"""
A class to execute OLI Cloud flash calculations.
:param optional_properties: dictionary for optional properties to attach to OLI calls, all True by default
:param input_unit_set: dictionary for conversions between OLI and Pyomo unit names
:param output_unit_set: dictionary for preferred output units
:param relative_inflows: bool switch for surveys - true to add specified value to initial value, false to replace initial value with specified value
:param debug_level: string defining level of logging activity
"""
[docs] def __init__(
self,
optional_properties=optional_properties,
input_unit_set=input_unit_set,
output_unit_set=output_unit_set,
relative_inflows=True,
debug_level="INFO",
):
self.optional_properties = optional_properties
self.input_unit_set = input_unit_set
self.output_unit_set = output_unit_set
self.relative_inflows = relative_inflows
if debug_level == "INFO":
_logger.setLevel(logging.INFO)
else:
_logger.setLevel(logging.DEBUG)
[docs] def configure_water_analysis(
self,
inflows=None,
temperature=None,
pressure=None,
reconciliation=None,
electroneutrality=None,
makeup_ion=None,
ph=None,
acid_titrant=None,
base_titrant=None,
alkalinity=None,
alkalinity_ph=None,
alkalinity_titrant=None,
tic=None,
allow_solids=False,
included_solids=None,
excluded_solids=None,
calc_alkalinity=False,
use_scaling_rigorous=True,
file_name=None,
):
"""
Configure Water Analysis JSON input.
:param inflows: dictionary of solutes
:param temperature: float for temperature in Kelvins
:param pressure: float for pressure in Pascals
:param reconciliation: string for method of reconciliation: "EquilCalcOnly" (default), "ReconcilePh", "ReconcilePhAndAlkalinity", or "ReconcilePhAndAlkalinityAndTic"; "ReconcileCo2Gas" not supported currently.
:param electroneutrality: string for method of electroneutrality calculation: "DominantIon", "ProrateCations", "ProrateAnions", "Prorate", "AutoNACL", or "MakeupIon" are supported
:param makeup_ion: string for ion to use for electroneutrality balance, if "MakeupIon,
:param ph: float for pH to reconcile solution to, required for pH based reconciliation
:param acid_titrant: string for acidification titrant, used in pH based reconciliation
:param base_titrant: string for basification titrant, used in pH based reconciliation
:param alkalinity: float for alkalinity to reconcile solution to, required for Alk based reconciliation
:param alkalinity_ph: float for alkalinity endpoint ph, used in Alk based reconciliation
:param alkalinity_titrant: string for alkalinity titration species, used in Alk based reconciliation
:param tic: float for total inorganic carbon concentration to reconcile solution to, required for TIC based reconcilation
:param allow_solids: bool to enable solid phase formation
:param included_solids: list of solids to include in analysis
:param excluded_solids: list of solids to exclude from analysis
:param calc_alkalinity: bool to calculate alkalinity of solution
:param use_scaling_rigorous: bool to switch between Rigorous (default) and Estimated scaling computations
:param file_name: string for file to write, if any
:param mesh_grid: if True (default) the input array will be combined to generate combination of all possible samples
if False, the direct values in survey_arrays will be used
:return json_input: JSON for Water Analysis
"""
_logger.info("Configuring Water Analysis JSON ...")
input_list = []
if not inflows:
raise RuntimeError("Inflows must be defined for Water Analysis.")
temp_input = {
"group": "Properties",
"name": "Temperature",
"unit": self.input_unit_set["temperature"]["oli_unit"],
"value": 273.15,
}
if temperature is not None:
if float(temperature):
temp_input.update({"value": float(temperature)})
else:
raise ValueError(f"Invalid temperature: {temperature}. Expected number")
input_list.append(temp_input)
pres_input = {
"group": "Properties",
"name": "Pressure",
"unit": self.input_unit_set["pressure"]["oli_unit"],
"value": 101325,
}
if pressure is not None:
if float(pressure):
pres_input.update({"value": float(pressure)})
else:
raise ValueError(f"Invalid pressure: {pressure}. Expected number")
input_list.append(pres_input)
reconciliation_options = [
"EquilCalcOnly",
"ReconcilePh",
"ReconcilePhAndAlkalinity",
"ReconcilePhAndAlkalinityAndTic",
]
rec_input = {
"group": "Calculation Options",
"name": "CalcType",
"value": "EquilCalcOnly",
}
if reconciliation is not None:
if reconciliation in reconciliation_options:
rec_input.update({"value": reconciliation})
else:
raise RuntimeError(
f"Invalid reconciliation option: {reconciliation}."
+ f" Use one of {reconciliation_options}"
)
else:
reconciliation = "EquilCalcOnly"
input_list.append(rec_input)
additional_req_input = []
additional_req_args = []
if "Ph" in reconciliation:
additional_req_args.append([ph, acid_titrant, base_titrant])
if not acid_titrant:
acid_titrant = "HCl"
if not base_titrant:
base_titrant = "NaOH"
additional_req_input.extend(
[
{
"group": "Properties",
"name": "pH",
"value": ph,
},
{
"group": "Calculation Options",
"name": "PhAcidTitrant",
"value": get_oli_name(acid_titrant),
},
{
"group": "Calculation Options",
"name": "PhBaseTitrant",
"value": get_oli_name(base_titrant),
},
]
)
if "Alk" in reconciliation:
additional_req_args.append([alkalinity, alkalinity_ph, alkalinity_titrant])
if not alkalinity_titrant:
alkalinity_titrant = "H2SO4"
if not alkalinity_ph:
alkalinity_ph = 4.5
_logger.info("No alkalinity endpoint pH specified. Assuming 4.5.")
additional_req_input.extend(
[
{
"group": "Properties",
"name": "Alkalinity",
"unit": self.input_unit_set["alkalinity"]["oli_unit"],
"value": alkalinity,
},
{
"group": "Properties",
"name": "AlkalinityTitrationEndPointpH",
"value": alkalinity_ph,
},
{
"group": "Calculation Options",
"name": "AlkalinityPhTitrant",
"value": alkalinity_titrant,
},
]
)
if "Tic" in reconciliation:
additional_req_args.append([tic])
additional_req_input.append(
{
"group": "Properties",
"name": "TIC",
"unit": self.input_unit_set["TIC"]["oli_unit"],
"value": tic,
}
)
missing_keys = [arg for arg in additional_req_args if arg is None]
if missing_keys:
raise RuntimeError(f"Missing keys for {reconciliation}: {missing_keys}")
input_list.extend(additional_req_input)
electroneutrality_options = [
"DominantIon",
"ProrateCations",
"ProrateAnions",
"Prorate",
"AutoNACL",
"MakeupIon",
]
elec_input = {
"group": "Electroneutrality Options",
"name": "ElectroNeutralityBalanceType",
"value": "DominantIon",
}
if electroneutrality is not None:
if electroneutrality in electroneutrality_options:
elec_input.update({"value": electroneutrality})
else:
raise RuntimeError(
f"Invalid reconciliation option: {electroneutrality}."
+ f" Use one of {electroneutrality_options}"
)
input_list.append(elec_input)
if electroneutrality == "MakeupIon":
if makeup_ion is not None:
input_list.append(
{
"group": "Electroneutrality Options",
"name": "MakeupIonBaseTag",
"value": get_oli_name(makeup_ion),
}
)
input_list.extend(
[
{
"group": "Calculation Options",
"name": "AllowSolidsToForm",
"value": bool(allow_solids),
},
{
"group": "Calculation Options",
"name": "CalcAlkalnity",
"value": bool(calc_alkalinity),
},
]
)
conc_unit = self.input_unit_set["molecularConcentration"]["oli_unit"]
_logger.info(f"Using {conc_unit} for inflows input")
for k, v in inflows.items():
charge = get_charge(k)
input_list.append(
{
"group": get_charge_group(charge),
"name": get_oli_name(k),
"unit": self.input_unit_set["molecularConcentration"]["oli_unit"],
"value": v,
"charge": charge,
}
)
json_input = self._add_to_json(
"wateranalysis",
input_list,
included_solids,
excluded_solids,
use_scaling_rigorous,
file_name,
)
return json_input
[docs] def configure_flash_analysis(
self,
inflows=None,
flash_method=None,
temperature=None,
pressure=None,
calculated_variable=None,
enthalpy=None,
vapor_amount=None,
vapor_fraction=None,
volume=None,
ph=None,
acid_titrant=None,
base_titrant=None,
formed_solid=None,
precipitant_inflow=None,
included_solids=None,
excluded_solids=None,
contact_surface=None,
flow_type=None,
diameter=None,
liq_velocity=None,
gas_velocity=None,
rot_velocity=None,
shear_stress=None,
roughness=None,
nonaqueous_visc=None,
water_cut_inversion=None,
relative_visc_inversion=None,
use_scaling_rigorous=True,
file_name=None,
):
"""
Configure Flash Analysis JSON input.
:param inflows: dictionary of solutes, of the form {"unit": unit, "values": {solute: concentration}}
:param flash_method: string for flash calculation name
:param temperature: float for temperature in Kelvins
:param pressure: float for pressure in Pascals
:param calculated_variable: string for variable to calculate, such as temperature or pressure, used in 'bubblepoint', 'dewpoint', 'vapor-amount', 'vapor-fraction', and 'isochoric' flashes
:param enthalpy: float for total enthalpy in Joules, used in 'isenthalpic' flash
:param vapor_amount: float for vapor phase Moles, used in 'vapor-amount' flash
:param vapor_fraction: float for vapor phase in Mole %, used in 'vapor-fraction' flash
:param volume: float for total volume in Cubic Meters, used in 'isochoric' flash
:param ph: float for target pH, used in 'setph' flash
:param acid_titrant: string for acidification titrant, used in 'setph' flash
:param base_titrant: string for basification titrant, used in 'setph' flash
:param formed_solid: string for solid species to precipitate based on inflow sweep, used in 'precipitation-point'
:param precipitant_inflow: string for inflow species to sweep, used in 'precipitation-point'
:param included_solids: list of solids to include in analysis
:param excluded_solids: list of solids to exclude from analysis
:param contact_surface: string for contact surface metal name
:param flow_type: string for flow configuration
:param diameter: float for diameter of surface (i.e., pipe or rotor)
:param liq_velocity: float for velocity of liquid flow
:param gas_velocity: float for velocity of vapor flow, used in 'approximateMultiPhaseFlow'
:param rot_velocity: float for rotational velocity
:param shear_stress: float for defined shear stress, used in 'definedShearStress'
:param roughness: float for pipe roughness, used in 'approximateMultiPhaseFlow'
:param nonaqueous_visc: float for absolute viscosity of nonaqueous phase, used in 'approximateMultiPhaseFlow'
:param water_cut_inversion: float for water cut at point of dispersion inversion, used in 'approximateMultiPhaseFlow'
:param relative_visc_inversion: float for maximum relative viscosity of dispersion at inversion, used in 'approximateMultiPhaseFlow'
:param use_scaling_rigorous: bool to switch between Rigorous (default) and Estimated scaling computations
:param file_name: string for file to write, if any
:return json_input: JSON for Water Analysis
"""
_logger.info(f"Configuring {flash_method} Flash JSON ...")
if flash_method not in [
"isothermal",
"isenthalpic",
"bubblepoint",
"dewpoint",
"vapor-amount",
"vapor-fraction",
"isochoric",
"setph",
"precipitation-point",
"corrosion-rates",
]:
raise RuntimeError(
f"Failed to configure Flash. Invalid method: {flash_method}"
)
if not inflows:
raise RuntimeError("Inflows must be defined for Flash Analysis.")
input_dict = {}
temp_input = {
"unit": self.input_unit_set["temperature"]["oli_unit"],
"value": 273.15,
}
if temperature:
if float(temperature):
temp_input.update({"value": float(temperature)})
else:
raise ValueError(f"Invalid temperature: {temperature}. Expected number")
input_dict["temperature"] = temp_input
pres_input = {
"unit": self.input_unit_set["pressure"]["oli_unit"],
"value": 101325,
}
if pressure:
if float(pressure):
pres_input.update({"value": float(pressure)})
else:
raise ValueError(f"Invalid pressure: {pressure}. Expected number")
input_dict["pressure"] = pres_input
if flash_method in [
"bubblepoint",
"dewpoint",
"vapor-amount",
"vapor-fraction",
"isochoric",
]:
if calculated_variable is not None:
if calculated_variable not in ["temperature", "pressure"]:
raise RuntimeError(
f"Invalid input for 'calculated_variable': {calculated_variable}; 'temperature' or 'pressure' supported."
)
_logger.info(
f"{flash_method} will calculate {calculated_variable} as its variable"
)
input_dict["calculatedVariable"] = calculated_variable
else:
raise RuntimeError(
f"Missing argument for {flash_method}: 'calculated_variable'"
)
if flash_method == "isenthalpic":
enth_input = {
"unit": self.input_unit_set["enthalpy"]["oli_unit"],
"value": None,
}
if float(enthalpy):
enth_input.update({"value": float(enthalpy)})
else:
raise ValueError(f"Invalid enthalpy: {enthalpy}. Expected number")
input_dict["enthalpy"] = enth_input
if flash_method == "vapor-amount":
vapor_amount_input = (
{
"unit": input_unit_set["vaporAmountMoles"]["oli_unit"],
"value": None,
},
)
if float(vapor_amount):
vapor_amount_input.update({"value": float(vapor_amount)})
else:
raise ValueError(
f"Invalid vapor amount: {vapor_amount}. Expected number"
)
input_dict["vaporAmountMoles"] = vapor_amount_input
if flash_method == "vapor-fraction":
vapor_fraction_amount = (
{
"unit": input_unit_set["vaporMolFrac"]["oli_unit"],
"value": None,
},
)
if float(vapor_fraction):
vapor_fraction_amount.update({"value": float(vapor_fraction)})
else:
raise ValueError(
f"Invalid vapor fraction: {vapor_fraction}. Expected number"
)
input_dict["vaporMolFrac"] = vapor_fraction_input
if flash_method == "isochoric":
volume_input = {
"unit": input_unit_set["totalVolume"]["oli_unit"],
"value": None,
}
if float(volume):
volume_input.update({"value": float(volume)})
else:
raise ValueError(f"Invalid volume: {volume}. Expected number")
input_dict["totalVolume"] = volume_input
if flash_method == "setph":
ph_input = {
"targetPH": {
"unit": "",
"value": None,
},
}
if float(ph):
ph_input["targetPH"].update({"value": float(ph)})
else:
raise ValueError(f"Invalid ph: {ph}. Expected number")
input_dict["targetPH"] = ph_input
if not acid_titrant:
acid_titrant = "HCl"
input_dict["pHAcidTitrant"] = get_oli_name(acid_titrant)
if not base_titrant:
base_titrant = "NaOH"
input_dict["pHBaseTitrant"] = get_oli_name(base_titrant)
if flash_method == "precipitation-point":
missing_args = [
arg for arg in [formed_solid, precipitant_inflow] if arg is None
]
if missing_args:
raise RuntimeError(
f"Missing argument(s) for {flash_method}: {missing_args}"
)
else:
input_dict.update(
{
"solidToPrecipitate": formed_solid,
"inflowToAdjust": precipitant_inflow,
}
)
input_dict["inflows"] = inflows
if flash_method == "corrosion-rates":
_logger.info(
f"Ensure DBS file uses 'AQ' thermodynamic framework to use Corrosion Analyzer"
)
input_dict["corrosionParameters"] = self._configure_corrosion(
contact_surface,
flow_type,
diameter,
liq_velocity,
gas_velocity,
rot_velocity,
shear_stress,
roughness,
nonaqueous_visc,
water_cut_inversion,
relative_visc_inversion,
)
json_input = self._add_to_json(
flash_method,
input_dict,
included_solids,
excluded_solids,
use_scaling_rigorous,
file_name,
)
return json_input
def _configure_corrosion(
self,
contact_surface,
flow_type,
diameter,
liq_velocity,
gas_velocity,
rot_velocity,
shear_stress,
roughness,
nonaqueous_visc,
water_cut_inversion,
relative_visc_inversion,
):
"""
Configure input dict for Corrosion Rates flash.
:param contact_surface: string for contact surface metal name
:param flow_type: string for flow configuration
:param diameter: float for diameter of surface (i.e., pipe or rotor)
:param liq_velocity: float for velocity of liquid flow
:param gas_velocity: float for velocity of vapor flow, used in 'approximateMultiPhaseFlow'
:param rot_velocity: float for rotational velocity
:param shear_stress: float for defined shear stress, used in 'definedShearStress'
:param roughness: float for pipe roughness, used in 'approximateMultiPhaseFlow'
:param nonaqueous_visc: float for absolute viscosity of nonaqueous phase, used in 'approximateMultiPhaseFlow'
:param water_cut_inversion: float for water cut at point of dispersion inversion, used in 'approximateMultiPhaseFlow'
:param relative_visc_inversion: float for maximum relative viscosity of dispersion at inversion, used in 'approximateMultiPhaseFlow'
:return config: dictionary for corrosion analysis parameters
"""
valid_flow_types = [
"static",
"pipeFlow",
"rotatingDisk",
"rotatingCylinder",
"completeAgitation",
"definedShearStress",
"approximateMultiPhaseFlow",
]
if flow_type not in valid_flow_types:
raise RuntimeError(
f"Invalid flow_type: {flow_type}."
f"Expected one of {', '.join(t for t in valid_flow_types)}"
)
config = {
"calculationType": "isothermal",
"corrosionParameters": {
"contactSurface": contact_surface,
"flowType": flow_type,
},
}
def _try_float(v):
try:
val = float(v)
except:
val = None
return val
_check_args = lambda args: [arg for arg in args if arg is None]
_check_floats = lambda args: [arg for arg in args if _try_float(arg) is None]
if flow_type == "pipeFlow":
args = [diameter, liq_velocity]
missing_args = _check_args(args)
not_floats = _check_floats(args)
elif flow_type in ["rotatingDisk", "rotatingCylinder"]:
args = [diameter, rot_velocity]
missing_args = _check_args(args)
not_floats = _check_floats(args)
elif flow_type == "definedShearStress":
args = [shear_stress]
missing_args = _check_args(args)
not_floats = _check_floats(args)
elif flow_type == "approximateMultiPhaseFlow":
args = [
diameter,
liq_velocity,
gas_velocity,
roughness,
nonaqueous_visc,
water_cut_inversion,
relative_visc_inversion,
]
missing_args = _check_args(args)
not_floats = _check_floats(args)
if missing_args:
raise RuntimeError(
f"Missing argument(s) for {flash_method}: {missing_args}"
)
if not_floats:
raise RuntimeError(
f"Invalid values for argument(s): {not_floats}. Expected value"
)
if flow_type == "pipeFlow":
config["corrosionParameters"].update(
{
"pipeDiameter": {
"value": diameter,
"unit": self.input_unit_set["pipeDiameter"]["oli_unit"],
},
"pipeFlowVelocity": {
"value": liq_velocity,
"unit": self.input_unit_set["pipeFlowVelocity"]["oli_unit"],
},
}
)
elif flow_type == "rotatingDisk":
config["corrosionParameters"].update(
{
"diskDiameter": {
"value": diameter,
"unit": self.input_unit_set["diskDiameter"]["oli_unit"],
},
"diskRotationSpeed": {
"value": rot_velocity,
"unit": self.input_unit_set["diskRotationSpeed"]["oli_unit"],
},
},
)
elif flow_type == "rotatingCylinder":
config["corrosionParameters"].update(
{
"rotorDiameter": {
"value": diameter,
"unit": self.input_unit_set["rotorDiameter"]["oli_unit"],
},
"rotorRotation": {
"value": rot_velocity,
"unit": self.input_unit_set["rotorRotation"]["oli_unit"],
},
},
)
elif flow_type == "definedShearStress":
config["corrosionParameters"].update(
{
"shearStress": {
"value": shear_stress,
"unit": self.input_unit_set["shearStress"]["oli_unit"],
},
},
)
elif flow_type == "approximateMultiPhaseFlow":
config["corrosionParameters"].update(
{
"pipeDiameter": {
"value": diameter,
"unit": self.input_unit_set["pipeDiameter"]["oli_unit"],
},
"liquidFlowInPipe": {
"value": liq_velocity,
"unit": self.input_unit_set["liquidFlowInPipe"]["oli_unit"],
},
"gasFlowInPipe": {
"value": gas_velocity,
"unit": self.input_unit_set["gasFlowInPipe"]["oli_unit"],
},
"pipeRoughness": {
"value": roughness,
"unit": self.input_unit_set["pipeRoughness"]["oli_unit"],
},
"viscAbs2ndLiq": {
"value": nonaqueous_visc,
"unit": self.input_unit_set["viscAbs2ndLiq"]["oli_unit"],
},
"waterCutAtPointOfDispersionInversion": water_cut_inversion,
"maxRelViscosityOfDispersionAtInversion": relative_visc_inversion,
}
)
return config
def _add_to_json(
self,
flash_method,
input_data,
included_solids,
excluded_solids,
use_scaling_rigorous,
file_name,
):
"""
Add input data to JSON.
:param flash_method: string for flash calculation name
:param input_data: data object from flash configuration function
:param included_solids: list of solids to include in analysis
:param excluded_solids: list of solids to exclude from analysis
:param use_scaling_rigorous: bool to switch between Rigorous (default) and Estimated scaling computations
:param file_name: string for file to write, if any
"""
self._set_prescaling_calculation_mode(use_scaling_rigorous)
json_input = {"params": {}}
if flash_method == "wateranalysis":
json_input["params"].update({"waterAnalysisInputs": input_data})
else:
json_input["params"] = input_data
additional_params = {
"optionalProperties": dict(self.optional_properties),
"unitSetInfo": dict(self.output_unit_set),
}
if included_solids and excluded_solids:
raise RuntimeError(
"Invalid argument combination. "
"Only one of included_solids and excluded_solids "
"may be specified at once."
)
else:
if included_solids:
additional_params.update({"included_solids": list(included_solids)})
if excluded_solids:
additional_params.update({"excluded_solids": list(excluded_solids)})
json_input["params"].update(additional_params)
if file_name is not None:
write_output(json_input, file_name)
return json_input
def _set_prescaling_calculation_mode(self, use_scaling_rigorous):
"""
Set prescaling computation method based on argument.
:param use_scaling_rigorous: boolean indicating desired state of 'rigorous' and 'estimated' optional properties
"""
props = self.optional_properties
if bool(use_scaling_rigorous) == bool(props["prescalingTendenciesRigorous"]):
return
new_values = {k: not v for k, v in props.items() if "prescaling" in k}
props.update(new_values)
[docs] def run_flash(
self,
flash_method,
oliapi_instance,
dbs_file_id,
json_input,
survey=None,
file_name=None,
# max_concurrent_processes=1000,
# burst_job_tag=None,
# batch_size=None,
):
"""
Conduct single point analysis with initial JSON input, or conduct a survey on that input.
:param flash_method: string for flash calculation name
:param oliapi_instance: instance of OLI Cloud API
:param dbs_file_id: string ID of DBS file
:param json_input: JSON input for flash calculation
:param survey: dictionary containing names and input values to modify in JSON
:param file_name: string for file to write, if any
:return processed_requests: results from processed OLI flash requests
"""
if flash_method == "corrosion-rates":
# check if DBS file is using AQ thermodynamic framework
oliapi_instance.get_corrosion_contact_surfaces(dbs_file_id)
if self.relative_inflows:
_logger.info(
f"relative_inflows={self.relative_inflows},"
+ " surveys will add values to initial state"
)
if survey is None:
survey = {}
num_samples = None
for k, v in survey.items():
if num_samples is None:
num_samples = len(v)
elif num_samples != len(v):
raise RuntimeError(f"Length of list for key {k} differs from prior key")
if num_samples is None:
num_samples = 1
requests_to_process = []
for idx in range(num_samples):
_logger.info(f"Flash sample #{idx+1} of {num_samples}")
requests_to_process.append(
{
"flash_method": flash_method,
"dbs_file_id": dbs_file_id,
"input_params": self.get_clone(
flash_method, json_input, idx, survey
),
}
)
processed_requests = oliapi_instance.process_request_list(
requests_to_process,
# burst_job_tag=burst_job_tag,
# max_concurrent_processes=max_concurrent_processes,
# batch_size=batch_size,
)
_logger.info("Completed running flash calculations")
result = flatten_results(processed_requests)
if file_name:
write_output(result, file_name)
return result
[docs] def get_clone(self, flash_method, json_input, index, survey=None):
"""
Iterate over a survey to create a modified clone from JSON input.
:param flash_method: string for flash calculation name
:param json_input: JSON input for flash calculation
:param index: integer for index of incoming data
:param survey: dictionary containing names and input values to modify in JSON
:return clone: dictionary containing modified state variables and survey index
"""
if survey is None:
return json_input
valid_flashes = [
"wateranalysis",
"isothermal",
"isenthalpic",
"bubblepoint",
"dewpoint",
"vapor-amount",
"vapor-fraction",
"isochoric",
"setph",
"precipitation-point",
"corrosion-rates",
]
if flash_method not in valid_flashes:
raise RuntimeError(
"Invalid flash_method: {flash_method}. Use one of {', '.join(valid_flashes)}"
)
clone = deepcopy(json_input)
for k, v in survey.items():
d = clone["params"]
if flash_method == "wateranalysis":
d = d["waterAnalysisInputs"]
for param in d:
if param["name"].lower() == k.lower():
if self.relative_inflows:
param["value"] += v[index]
else:
param["value"] = v[index]
_logger.info(
f"Updating {k} for sample #{index} clone: new value = {param['value']}"
)
else:
if k in d:
pass
elif k in d["inflows"]["values"]:
d = d["inflows"]["values"]
elif k in d["corrosionParameters"]:
d = d["corrosionParameters"]
else:
_logger.warning(f"Survey key {k} not found in JSON input.")
if self.relative_inflows:
if isinstance(d[k], dict):
d[k]["value"] += v[index]
val = d[k]["value"]
else:
d[k] += v[index]
val = d[k]
else:
if isinstance(d[k], dict):
d[k]["value"] = v[index]
val = d[k]["value"]
else:
d[k] = v[index]
val = d[k]
_logger.info(
f"Updating {k} for sample #{index} clone: new value = {val}"
)
return clone
[docs] def get_apparent_species_from_true(
self,
true_species_json,
oliapi_instance,
dbs_file_id,
phase=None,
file_name=None,
):
"""
Run Water Analysis to get apparent species.
:param true_species_json: JSON generated from true species
:param oliapi_instance: instance of OLI Cloud API to call
:param dbs_file_id: string ID of DBS file
:param phase: string for inflows phase
:param file_name: string for file to write, if any
:return apparent_species: dictionary for molecular concentrations
"""
stream_output = self.run_flash(
"wateranalysis",
oliapi_instance,
dbs_file_id,
true_species_json,
)
if phase is None:
phase = "total"
extracted_result = stream_output["result"][f"molecularConcentration_{phase}"]
unit = self.input_unit_set["molecularConcentration"]["oli_unit"]
concentrations = {k: v["values"][0] for k, v in extracted_result.items()}
inflows = {"unit": unit, "values": concentrations}
if file_name:
write_output(inflows, file_name)
return inflows
def flatten_results(processed_requests):
_logger.info("Flattening OLI stream output ... ")
props = []
terminal_keys = ["unit", "value", "found", "fullVersion", "values"]
def _find_props(data, path=None):
"""
Get the path to all nested items in input data (recursive search).
:param data: dictionary containing OLI flash output
:param path: list of paths to endpoint
:return props: list of nested path lists
"""
path = path if path is not None else []
if isinstance(data, dict):
for k, v in data.items():
if isinstance(v, (str, bool)):
props.append([*path, k])
elif isinstance(v, list):
if all(k not in terminal_keys for k in v):
_find_props(v, [*path, k])
elif isinstance(v, dict):
if all(k not in terminal_keys for k in v):
_find_props(v, [*path, k])
else:
props.append([*path, k])
elif isinstance(data, list):
for idx, v in enumerate(data):
if isinstance(v, (dict, list)):
if all(k not in terminal_keys for k in v):
_find_props(v, [*path, idx])
else:
props.append([*path, idx])
else:
raise RuntimeError(f"Unexpected type for data: {type(data)}")
def _get_nested_data(data, keys):
for key in keys:
data = data[key]
return data
def _extract_values(data, keys):
values = _get_nested_data(data, keys)
extracted_values = {}
if isinstance(values, str):
extracted_values = values
elif isinstance(values, bool):
extracted_values = bool(values)
elif isinstance(values, dict):
if any(k in values for k in ["group", "name", "fullVersion"]):
if "value" in values:
extracted_values.update({"values": values["value"]})
if "unit" in values:
unit = values["unit"] if values["unit"] else "dimensionless"
extracted_values.update({"units": unit})
elif all(k in values for k in ["found", "phase"]):
extracted_values = values
else:
unit = values["unit"] if values["unit"] else "dimensionless"
if "value" in values:
extracted_values = {
"units": unit,
"values": values["value"],
}
else:
extracted_values = {
k: {
"units": unit,
"values": values["values"][k],
}
for k, v in values["values"].items()
}
else:
raise RuntimeError(f"Unexpected type for data: {type(values)}")
return extracted_values
def _create_input_dict(props, result):
input_dict = {k: {} for k in set([prop[0] for prop in props])}
for prop in props:
k = prop[0]
phase_tag = ""
if "metaData" in prop:
prop_tag = prop[-1]
elif "result" in prop:
# get property tag
if isinstance(prop[-1], int):
prop_tag = prop[-2]
else:
prop_tag = prop[-1]
# get phase tag
if any(k in prop for k in ["phases", "total"]):
if "total" in prop:
phase_tag = "total"
else:
phase_tag = prop[prop.index("phases") + 1]
elif "submitted_requests" in prop:
prop_tag = prop[-1]
if "params" in prop:
if isinstance(prop[-1], int):
prop_tag = _get_nested_data(result, prop)["name"]
else:
_logger.warning(f"Unexpected result in result")
label = f"{prop_tag}_{phase_tag}" if phase_tag else prop_tag
input_dict[k][label] = _extract_values(result, prop)
return input_dict
float_nan = float("nan")
def _add_to_output(input_dict, output_dict, index, number_samples):
"""
Add incoming flash results to output data.
:param input_dict: dictionary for incoming data
:param output_dict: dictionary for output data
:param index: integer for index of incoming data
:param number_samples: integer for total number of incoming data samples
"""
for k, v in input_dict.items():
try:
val = float(v)
except:
val = None
if val is not None:
if k not in output_dict:
output_dict[k] = [float_nan] * number_samples
output_dict[k][index] = val
elif isinstance(v, str):
if k not in output_dict:
if k in ["fullVersion", "units"]:
output_dict[k] = v
else:
output_dict[k] = [float_nan] * number_samples
if k in ["fullVersion", "units"]:
if input_dict[k] != output_dict[k]:
raise Exception(f"Input and output do not agree for key {k}")
else:
output_dict[k][index] = v
elif isinstance(v, dict):
if k not in output_dict:
output_dict[k] = {}
_add_to_output(input_dict[k], output_dict[k], index, number_samples)
else:
raise Exception(f"Unexpected value: {v}")
output_dict = {}
num_samples = len(processed_requests)
for idx, result in enumerate(processed_requests):
props = []
_find_props(result)
input_dict = _create_input_dict(props, result)
_add_to_output(input_dict, output_dict, idx, num_samples)
return output_dict
[docs]def write_output(content, file_name):
"""
Write dictionary-based content to file.
:param content: dictionary of content to write
:param file_name: string for name of file to write
:param file_path: string for full path of written file
"""
_logger.info(f"Saving content to {file_name}")
with open(file_name, "w", encoding="utf-8") as f:
json.dump(content, f)
_logger.info("Save complete")
file_path = Path(file_name).resolve()
return file_path
[docs]def build_survey(survey_arrays, get_oli_names=False, file_name=None, mesh_grid=True):
"""
Build a dictionary for modifying flash calculation parameters.
:param survey_arrays: dictionary for variables and values to survey
:param get_oli_names: bool switch to convert name into OLI name
:param file_name: string for file to write, if any
:param mesh_grid: if True (default) the input array will be combined to generate combination of all possible samples
if False, the direct values in survey_arrays will be used
:return survey: dictionary for product of survey variables and values
"""
_name = lambda k: get_oli_name(k) if get_oli_names else k
if mesh_grid:
keys = [get_oli_name(k) if get_oli_names else k for k in survey_arrays]
values = list(product(*(survey_arrays.values())))
survey = {_name(keys[i]): [val[i] for val in values] for i in range(len(keys))}
else:
survey = {}
values = None
for key, arr in survey_arrays.items():
survey[_name(key)] = arr
if values is not None and len(values) != len(arr):
raise ValueError(f"Length of list for key {key} differs from prior key")
values = arr
_logger.info(f"Survey contains {len(values)} items.")
if file_name:
write_output(survey, file_name)
return survey
[docs]def get_survey_sample_conditions(survey, sample_points):
"""
Return survey parameter values for one or more sample points.
:param survey: dictionary for product of survey conditions and values
:param sample_points: list of indices to get parameter values from
:return sample_conditions: dictionary for parameter values for given samples
"""
sample_conditions = {}
for point in sample_points:
sample_conditions[point] = {}
for k, v in survey.items():
sample_conditions[point][k] = v[point]
_logger.debug(sample_conditions)
return sample_conditions