Source code for watertap.tools.oli_api.credentials

#################################################################################
# WaterTAP Copyright (c) 2020-2023, The Regents of the University of California,
# through Lawrence Berkeley National Laboratory, Oak Ridge National Laboratory,
# National Renewable Energy Laboratory, and National Energy Technology
# Laboratory (subject to receipt of any required approvals from the U.S. Dept.
# of Energy). All rights reserved.
#
# Please see the files COPYRIGHT.md and LICENSE.md for full copyright and license
# information, respectively. These files are also available online at the URL
# "https://github.com/watertap-org/watertap/"
#################################################################################

__author__ = "Paul Vecchiarelli"

import json
import requests
from pathlib import Path

from pyomo.common.dependencies import attempt_import

cryptography, cryptography_available = attempt_import("cryptography", defer_check=False)
if cryptography_available:
    from cryptography.fernet import Fernet


[docs]class CredentialManager: """ A class to handle credentials for OLI Cloud. """
[docs] def __init__( self, username="", password="", root_url="", auth_url="", config_file="./credentials.txt", encryption_key="", access_keys=[], test=False, ): """ Manages credentials for OLIApi authentication requests. :param username: user's username :param password: user's password :param root_url: root url :param auth_url: authorization url :param config_file: existing/desired path (absolute, or relative to the working directory) to encrypted oli_config_file :param encryption_key: fernet key generated by credential manager object :param access_keys: list of access keys generated by user :param test: bool switch for automation during tests """ self.test = test self.access_key = "" self.encryption_key = encryption_key self.config_file = Path(config_file).resolve() self._manage_credentials( username, password, root_url, auth_url, access_keys, ) if not self.test: self.login()
def _manage_credentials(self, username, password, root_url, auth_url, access_keys): """ Method to save/load OLI credentials. :param username: user's username :param password: user's password :param root_url: root url :param auth_url: authorization url :param access_keys: list of access keys generated by user """ if not cryptography_available: raise ModuleNotFoundError("Module 'cryptography' not available.") if self.encryption_key: if self.config_file.is_file(): self.credentials = self._decrypt_credentials() else: raise OSError( f" Config file {self.config_file} does not exist." + " Provide login credentials to generate encrypted file." ) else: self.credentials = { "username": username, "password": password, "root_url": root_url, "auth_url": auth_url, "access_keys": access_keys, } if not access_keys: self._check_credentials( ["username", "password", "root_url", "auth_url"] ) else: self._check_credentials(["root_url", "access_keys"]) if self._write_permission(): self.encryption_key = self._encrypt_credentials() self.access_key = self.set_access_key() self.dbs_url = self.credentials["root_url"] + "/channel/dbs" self.upload_dbs_url = self.credentials["root_url"] + "/channel/upload/dbs" self.access_key_url = self.credentials["root_url"] + "/user/api-key" self.engine_url = self.credentials["root_url"] + "/engine/" self._delete_dbs_url = self.credentials["root_url"] + "/channel/file/" def _decrypt_credentials(self): """ Basic decryption method for credentials. :return credentials: login credentials for OLI Cloud """ try: with open(self.config_file, "rb") as f: encrypted_credentials = f.read() cipher = Fernet(self.encryption_key) decrypted_credentials = cipher.decrypt(encrypted_credentials).decode() credentials = json.loads(decrypted_credentials) return credentials except: raise RuntimeError(" Failed decryption.") def _check_credentials(self, keys): """ Check to see if required credentials are missing. :param keys: keys required for login method """ e = [k for k, v in self.credentials.items() if k in keys if not v] if e: raise IOError(f" Incomplete credentials for the following keys: {e}.") def _write_permission(self): """ Ensures user permits deletion of specified files. :return boolean: status of user permission (to write encrypted config_file to disk) """ if self.test: return True else: r = input( "WaterTAP will write encrypted file to store OLI Cloud credentials: [y]/n: " ) if (r.lower() == "y") or (r == ""): return True return False def _encrypt_credentials(self): """ Basic encryption method for credentials """ encryption_key = Fernet.generate_key() if not self.test: print(f"Your secret key is:\n{encryption_key.decode()}\nKeep it safe.\n") try: cipher = Fernet(encryption_key) encrypted_credentials = cipher.encrypt( json.dumps(self.credentials).encode() ) with open(self.config_file, "wb") as f: f.write(encrypted_credentials) return encryption_key except: raise RuntimeError(f" Failed encryption.")
[docs] def set_access_key(self): """ Allows access key to be selected from list if more than one is provided. """ if len(self.credentials["access_keys"]) == 0: return "" elif len(self.credentials["access_keys"]) == 1: return self.credentials["access_keys"][0] else: print(" Specify an access key:") for i in range(len(self.credentials["access_keys"])): print(f"{i}\t{self.credentials['access_keys'][i]}") return self.credentials["access_keys"][int(input(" "))]
# TODO: improve header management for class
[docs] def generate_oliapi_access_key(self): """ Generate an access key for OLI Cloud. :return string: Response text containing the access key information or an error message. """ if self.access_key: headers = {"authorization": "API-KEY " + self.access_key} else: headers = {"authorization": "Bearer " + self.jwt_token} headers.update({"Content-Type": "application/json"}) payload = json.dumps({}) response = requests.post(self.access_key_url, headers=headers, data=payload) self.credentials["access_keys"].append( json.loads(response.text)["data"]["apiKey"] ) if not self.test: print(response.text) return response.text
[docs] def delete_oliapi_access_key(self, api_key): """ Delete an access key for OLI Cloud. :param api_key: The access key to delete. :return string: Response text containing the success message or an error message. """ if self.access_key: headers = {"authorization": "API-KEY " + self.access_key} else: headers = {"authorization": "Bearer " + self.jwt_token} headers.update({"Content-Type": "application/json"}) payload = json.dumps({"apiKey": api_key}) response = requests.delete(self.access_key_url, headers=headers, data=payload) if not self.test: print(response.text) return response.text
[docs] def login(self): """ Login into user credentials for the OLI Cloud. :return boolean: True on success, False on failure """ headers = {"Content-Type": "application/x-www-form-urlencoded"} if self.access_key: headers.update({"authorization": "API-KEY " + self.access_key}) req_result = requests.get(self.dbs_url, headers=headers) if req_result.status_code == 200: if not self.test: print(f"Status code is {req_result.status_code}.\n") return True else: body = { "username": self.credentials["username"], "password": self.credentials["password"], "grant_type": "password", "client_id": "apiclient", } req_result = requests.post( self.credentials["auth_url"], headers=headers, data=body ) if req_result.status_code == 200: if not self.test: print(f"Status code is {req_result.status_code}.\n") req_result = req_result.json() if "access_token" in req_result: self.jwt_token = req_result["access_token"] if "refresh_token" in req_result: self.refresh_token = req_result["refresh_token"] return True if not self.test: raise ConnectionError( f" OLI login failed. Status code is {req_result.status_code}." ) else: return False
[docs] def get_refresh_token(self): """ Uses refresh token to update access token. :return boolean: True on success, False on failure """ headers = {"Content-Type": "application/x-www-form-urlencoded"} body = { "refresh_token": self.refresh_token, "grant_type": "refresh_token", "client_id": "apiclient", } req_result = requests.post( self.credentials["auth_url"], headers=headers, data=body ) if req_result.status_code == 200: if not self.test: print(f"Status code is {req_result.status_code}.\n") req_result = req_result.json() if bool(req_result): if "access_token" in req_result: self.jwt_token = req_result["access_token"] if "refresh_token" in req_result: self.refresh_token = req_result["refresh_token"] return True if not self.test: raise ConnectionError( f" OLI login failed. Status code is {req_result.status_code}.\n" ) else: return False
[docs] def request_auto_login(self, req_func=None): """ Gets a new access token if the request returns with an expired token error. :param req_func: function to call :return boolean: True on success, False on failure """ num_tries = 1 while num_tries <= 3: if self.access_key: headers = {"authorization": "API-KEY " + self.access_key} else: headers = {"authorization": "Bearer " + self.jwt_token} req_result = req_func(headers) if req_result.status_code == 200: return json.loads(req_result.text) elif num_tries >= 1 and req_result.status_code == 400 and self.access_key: req_result = req_result.json() if not self.login(): raise RuntimeError( "Login failed. Please check your API access key.\n" ) elif num_tries >= 1 and req_result.status_code == 401: req_result = req_result.json() if not self.get_refresh_token(): if not self.login(): break else: break num_tries = num_tries + 1 if not self.test: raise ConnectionError( f" OLI request failed. Status code is {req_result.status_code}.\n" ) else: return False