Source code for ordflow.api

from enum import Enum
from collections.abc import MutableMapping
import os
import requests


[docs]class Transport(Enum): """ The different data transfer protocols supported by DataFlow """ GLOBUS = 0
[docs]class API(object): def __init__(self, api_key, server_url=None): """ Creates an instance of the API class to communicate with DataFlow Parameters ---------- api_key : str API key for accessing DataFlow Please omit the "Bearer " prefix before the API key server_url : str, Optional URL for DataFlow server. Default: staging server """ if not isinstance(api_key, str): raise TypeError("api_key should be a string. Generate this from DataFlow") if api_key.lower().startswith("bearer "): api_key = api_key.split(' ')[-1] if server_url: self.__validate_str_parm(server_url, "server_url") self._API_URL = server_url else: self._API_URL = "https://dataflow.ornl.gov/api/v1" print("Using server at: {} as default".format(self._API_URL)) self._API_KEY = api_key def __get(self, url): """ Internal function to send GET requests Parameters ---------- url : str URL for GET request Returns ------- dict Response to GET request """ headers = {"accept": "*/*", "Authorization": "Bearer " + self._API_KEY} response = requests.get(url, headers=headers) if not response.ok: raise ValueError("{}: {}".format(response.reason, response.text[1:-1])) return response.json() def __post(self, url, headers={}, json=None, data=None, files=None): """ Parameters ---------- url : str URL for POST request headers : dict, optional Headers besides the accept and auth token json : dict, optional Dict data : dict, optional Key-value pairs for the form files : dict Dict Returns ------- dict Response to POST request """ # TODO: Use **kwargs instead basic_headers = {"accept": "*/*", "Authorization": "Bearer " + self._API_KEY} basic_headers.update(headers) response = requests.post(url, headers=basic_headers, json=json, files=files, data=data) if not response.ok: raise ValueError("{}: {}".format(response.reason, response.text[1:-1])) return response.json() @staticmethod def __validate_integer(value, title, min_val=0): """ Validates integer parameter Parameters ---------- value : int value to validate Returns ------- None Raises ------ TypeError ValueError """ if not isinstance(value, int): raise TypeError("{} should be an int".format(title)) if value < min_val: raise ValueError("{} should be > {}".format(title, min_val)) @staticmethod def __validate_str_parm(value, title): """ Validate string parameter Parameters ---------- value : str Object to test title : str Name of variable or parameter Returns ------- None Raises ------ TypeError ValueError """ mesg = '{} should be a non empty string'.format(title) if not isinstance(value, str): raise TypeError(mesg) title = title.strip() if len(title) < 1: raise ValueError(mesg) @staticmethod def __flatten_dict(nested_dict, separator='-'): """ Flattens a nested dictionary Parameters ---------- nested_dict : dict Nested dictionary separator : str, Optional. Default='-' Separator between the keys of different levels Returns ------- dict Dictionary whose keys are flattened to a single level Notes ----- Taken from https://stackoverflow.com/questions/6027558/flatten-nested- dictionaries-compressing-keys """ if not isinstance(nested_dict, dict): raise TypeError('nested_dict should be a dict') def __flatten_dict_int(nest_dict, sep, parent_key=''): items = [] if sep == '_': repl = '-' else: repl = '_' for key, value in nest_dict.items(): if not isinstance(key, str): key = str(key) if sep in key: key = key.replace(sep, repl) new_key = parent_key + sep + key if parent_key else key if isinstance(value, MutableMapping): items.extend(__flatten_dict_int(value, sep, parent_key=new_key).items()) # nion files contain lists of dictionaries, oops elif isinstance(value, list): for i in range(len(value)): if isinstance(value[i], dict): for kk in value[i]: items.append(('dim-' + kk + '-' + str(i), value[i][kk])) else: if type(value) != bytes: items.append((new_key, value)) else: if type(value) != bytes: items.append((new_key, value)) return dict(items) return __flatten_dict_int(nested_dict, separator)
[docs] def settings_get(self): """ Gets current default user settings Returns ------- dict Response from GET request """ path = "user-settings" url = "%s/%s" % (self._API_URL, path) return self.__get(url)
[docs] def settings_set(self, setting, value): """ Set or update default user settings Parameters ---------- setting : str Name of parameter Currently, only "globus.destination_endpoint" and "transport.protocol" are supported value : obj New value for chosen parameter Returns ------- dict Response from POST request """ self.__validate_str_parm(setting, "setting") path = "user-settings/?setting={}&value={}".format(setting, value) url = "%s/%s" % (self._API_URL, path) return self.__post(url)
[docs] def instrument_list(self): """ List all instruments connected to this DataFlow server Returns ------- dict Response from GET request """ url = "%s/%s" % (self._API_URL, "instruments") return self.__get(url)
[docs] def instrument_info(self, instr_id): """ Show information about an Instrument Parameters ---------- instr_id : int ID for Insrtument Returns ------- dict Response from GET request """ self.__validate_integer(instr_id, "instr_id", min_val=0) path = 'instruments/{}'.format(instr_id) url = "%s/%s" % (self._API_URL, path) return self.__get(url)
[docs] def globus_endpoints_active(self, endpoint=None): """ Checks whether both source and destination Globus endpoints are active Parameters ---------- endpoint : str, Optional. UUID of endpoint whose status needs to be checked. Default = None - checks the default destination Globus endpoint along with the DataFlow server's endpoint Returns ------- dict Response from GET request """ url = "%s/%s" % (self._API_URL, 'transports/globus/activation') if isinstance(endpoint, str): url += "?endpoint=" + endpoint # TODO: What should this response look like to be pythonic? return self.__get(url)
[docs] def globus_endpoints_activate(self, username, password, encrypted=True, endpoint="destination"): """ Activates Globus endpoints necessary to transfer data. Notes ----- It is entirely possible that the source and destination endpoints are in different security enclaves in the same organization or even in different organizations. For now, the "source" endpoint typically uses ORNL XCAMS credentials (your three-character ID and password). By default, the "destination" endpoint points to ORNL's CADES Open-Research Network File System, which also uses ORNL XCAMS credentials. Parameters ---------- username : str user name associated with the specified endpoint password : str password associated with specified endpoint encrypted : bool, Optional Whether or not the password is encrypted (using the DataFlow web server's encryption). Default = encrypted password endpoint : str, Optional Endpoint to activate. The 3 valid options are: 1. "source" - DataFlow server's endpoint, 2. "destination" - Where data will be sent to, and 3. the UUID of some other endpoint Returns ------- dict Response from GET request """ pwd_prefix = "encrypted" if not encrypted: pwd_prefix = "unencrypted" path = 'transports/globus/activate?endpoint={}&username={}&{}_password={}'.format(endpoint, username, pwd_prefix, password) url = "%s/%s" % (self._API_URL, path) return self.__post(url)
[docs] def dataset_info(self, dset_id): """ Show information about a dataset Parameters ---------- dset_id : int ID for dataset Returns ------- dict Response from GET request """ self.__validate_integer(dset_id, "dset_id", min_val=0) path = 'datasets/{}'.format(dset_id) url = "%s/%s" % (self._API_URL, path) return self.__get(url)
@staticmethod def __mdata_dict_2_list(metadata): """ Converts metadata dictionary to a list amenable to DataFlow Parameters ---------- metadata: dict Metadata specified as {key_1: value_1, key_2: value_2} Returns ------- list Metadata reformatted as: [{"field_name": key_1, "field_value": value_1}, {"field_name": key_2, "field_value": value_2}] """ mdlist = list() for key, val in metadata.items(): mdlist.append({"field_name": key, "field_value": val}) return mdlist
[docs] def dataset_create(self, title, instrument_id=0, metadata=None): """ Create a new dataset Parameters ---------- title : str Title for dataset instrument_id : int, optional Instrument ID. Default = 0 - UnknownInstrument metadata : dict, optional Scientific metadata associated with this dataset. Metadata specified as {"param 1": value_1, "param 2": value_2} Nested dictionaries will be flattened with keys joined with a "-" separator Returns ------- dict Response from POST request """ self.__validate_str_parm(title, "title") if metadata: if not isinstance(metadata, dict): raise TypeError("metadata should be a dict") url = "%s/%s" % (self._API_URL, "datasets") data = {"name": title, "instrument_id": instrument_id} if isinstance(metadata, dict): flat_md = self.__flatten_dict(metadata) mdata_list = self.__mdata_dict_2_list(flat_md) data["metadata_field_values_attributes"] = mdata_list return self.__post(url, headers={"Content-Type": "application/json"}, json=data)
[docs] def file_upload(self, file_path, dataset_id, relative_path=None, transport=None): """ Upload the provided file to the specified Dataset. Parameters ---------- file_path : str Local path to file that needs to be uploaded dataset_id : int Dataset ID to upload this file to relative_path : str, optional Relative path in destination to place this file. Default - the file will be uploaded to the root directory of the dataset transport : ordflow.Transport, optional Transport protocol to use to transfer this specific file Returns ------- dict Response from POST request """ path = 'dataset-file-upload' url = "%s/%s" % (self._API_URL, path) self.__validate_str_parm(file_path, "file_path") if not os.path.exists(file_path): raise FileNotFoundError("{} not found".format(file_path)) self.__validate_integer(dataset_id, "dataset_id", min_val=0) if transport: if not isinstance(transport, Transport): raise TypeError("transport should be of type ordflow.Transport") file_handle = open(file_path, "rb") form_data = {'dataset_id': dataset_id, 'transport': 'globus'} if relative_path: if not isinstance(relative_path, str): raise TypeError("relative_path should be a string") form_data.update({'relative_path': relative_path}) """ if transport != Transport.GLOBUS: print("using Globus since other file transfer adapters have not been implemented") """ response = self.__post(url, files={'file': file_handle}, data=form_data) file_handle.close() return response