Source code for exorad.tasks.loadOptions

import os
import xml.etree.ElementTree as ET
from collections import OrderedDict

from astropy import units as u
from astropy.io import ascii
from astropy.io.ascii.core import InconsistentTableError

from .task import Task
from exorad.utils.passVal import PassVal

compactString = lambda string: string.replace("\n", "").strip()


[docs] class LoadOptions(Task): """ Reads the xml file with payload description and return an object with attributes related to the input data Parameters ---------- filename: string input xml file location config_path: string (optional) on-run setting for ConfigPat. Default is None. Returns ------- dict: parsed xml input file Raises ------ IOError if the indicated file is not found or the format is not supported Examples -------- >>> loadOptions = LoadOptions() >>> options = loadOptions(filename = 'path/to/file.xml') """ def __init__(self): self.addTaskParam("filename", "input option file name") self.addTaskParam("config_path", "on-run setting for ConfigPath", None)
[docs] def execute(self): self._filename = self.get_task_param("filename") self._config_path = self.get_task_param("config_path") self.__check_format__() root = self.__get_root__() self._dict = self.__parser__(root) # self.debug('loaded options: {}'.format(self._dict)) self.set_output(self._dict)
configPath = None def __check_format__(self): if not self._filename.endswith(".xml"): self.error("wrong input format") raise OSError def __get_root__(self): try: self.debug("input option file found %s" % self._filename) return ET.parse(self._filename).getroot() except OSError: self.error("No input option file found") raise OSError def __parser__(self, root): root_dict = {} for ch in root: retval = self.__parser__(ch) # parse all attributes for key in list(ch.attrib.keys()): retval[key] = ch.attrib[key] value = compactString(ch.text) if (value is not None) and (value != ""): try: value = int(value) except ValueError: if value.replace(".", ""): try: value = float(value) except ValueError: pass if "unit" in retval: unitName = retval["unit"] if unitName == "dimensionless": unitName = "" value = value * u.Unit(unitName) if value == "True": value = bool(True) if value == "False": value = bool(False) if isinstance(value, str): if "__ConfigPath__" in value: value = value.replace( "__ConfigPath__", self.configPath ) if ch.tag == "ConfigPath": self.configPath = value if self._config_path: self.configPath = self._config_path if ch.tag == "working_R": PassVal.working_R = value print(PassVal.working_R) # if isinstance(value, str): # retval = value # else: retval["value"] = value if ch.tag in root_dict: # if an other instance of same tag exists, transform into a dict attr = root_dict[ch.tag] if isinstance(attr, OrderedDict): attr[value] = retval else: if not isinstance(attr, str): dtmp = OrderedDict( [(attr["value"], attr), (value, retval)] ) root_dict[ch.tag] = dtmp else: # othewise, set new attr root_dict[ch.tag] = retval if "datafile" in root_dict: try: datatype_attr = root_dict["datatype"] datatype = datatype_attr["value"] except: datatype = "ecsv" attrValue = root_dict["datafile"] datafile = attrValue if "__ConfigPath__" in datafile["value"]: datafile = datafile["value"].replace( "__ConfigPath__", self.configPath ) else: datafile = datafile["value"] try: data = self.__read_datatable__(datafile, datatype) root_dict["data"] = data except OSError: self.error("Cannot read input file") raise OSError if "datadict" in root_dict: attrValue = root_dict["datadict"] datafile = attrValue datafile = datafile["value"].replace( "__ConfigPath__", self.configPath ) try: data = self.__read_datadict__(datafile) root_dict["data"] = data except OSError: self.error("Cannot read input file") raise OSError if "config_dict" in root_dict: attrValue = root_dict["config_dict"] datafile = attrValue datafile = datafile["value"].replace( "__ConfigPath__", self.configPath ) try: data = self.__read_datadict__(datafile) root_dict = data root_dict["config_dict"] = datafile self.debug( "Configuration dictionary found in {}".format(datafile) ) except OSError: self.error("Cannot read input file") raise OSError return root_dict
[docs] def getopt(self): return self.__obj__
def __read_datatable__(self, datafile, datatype): if datatype == "ecsv": try: data = ascii.read( os.path.expanduser(datafile), format="ecsv", fill_values=[("#N/A", "0"), ("N/A", "0"), ("", "0")], ) except InconsistentTableError: data = ascii.read( os.path.expanduser(datafile), format="csv", fill_values=[("#N/A", "0"), ("N/A", "0"), ("", "0")], delimiter=",", ) return data else: self.error("inconsistent table") raise OSError def __read_datadict__(self, datadict): ext = os.path.splitext(datadict)[1] if ext in [".h5", ".hdf5"]: from exorad.output.hdf5 import load data = load(datadict) return data elif ext == ".pickle": import pickle f = open(datadict, "rb") data = pickle.load(f) f.close() return data else: self.error("inconsistent table") raise OSError