Source code for exorad.tasks.instrumentHandler

from collections import OrderedDict

from astropy.io.misc.hdf5 import read_table_hdf5

from .task import Task
from exorad.models.instruments import Photometer
from exorad.models.instruments import Spectrometer
from exorad.output.hdf5 import load

instruments = {"photometer": Photometer, "spectrometer": Spectrometer}


[docs] class BuildInstrument(Task): """ Initialize and build an instrument Parameters ---------- type: str instrument class name: str instrument name description: dict instrument description dictionary payload: dict main payload. Default is None write: bool set to True to write the built dict to file. Default is None output: str output object Returns ------- Instrument: return the built instrument class Raises ------ KeyError if the indicated instrument class is not supported Examples -------- >>> buildInstrument = BuildInstrument() >>> instrument = buildInstrument(type='photometer', name='Phot', >>> description=payload['channel']['Phot'], >>> payload=payload, write=False, output=None) """ def __init__(self): self.addTaskParam("type", "instrument type") self.addTaskParam("name", "instrument name") self.addTaskParam("description", "instrument description dictionary") self.addTaskParam("payload", "main payload. Default is None") self.addTaskParam("write", "write processed instrument to output file") self.addTaskParam("output", "output object")
[docs] def execute(self): try: instrumentClass = instruments[self.get_task_param("type")] except KeyError: self.error("invalid instrument class") raise ValueError instrument = instrumentClass( self.get_task_param("name"), self.get_task_param("description"), self.get_task_param("payload"), ) instrument.build() if self.get_task_param("write"): instrument.write(self.get_task_param("output")) self.set_output(instrument)
[docs] class BuildChannels(Task): """ Initialize and build all the channels in the payload Parameters ---------- payload: dict main payload. Default is None write: bool set to True to write the built dict to file. Default is None output: str output object Returns ------- dict: return a dict of built Instrument classes Examples -------- >>> buildChannels = BuildChannels() >>> channels = buildChannels(payload=payload, write=False, output=None) """ def __init__(self): self.addTaskParam("payload", "main payload") self.addTaskParam("write", "write processed instrument to output file") self.addTaskParam("output", "output object")
[docs] def execute(self): self.info("building channel") channels = {} self.debug( "detectors found : {}".format( self.get_task_param("payload")["channel"].keys() ) ) ch = None if self.get_task_param("write"): inst = self.get_task_param("output").create_group("payload") inst.store_dictionary( self.get_task_param("payload"), group_name="payload description", ) ch = inst.create_group("channels") buildInstrument = BuildInstrument() if isinstance(self.get_task_param("payload")["channel"], OrderedDict): for det in self.get_task_param("payload")["channel"].keys(): channel_type = self.get_task_param("payload")["channel"][det][ "channelClass" ]["value"].lower() channels[det] = buildInstrument( type=channel_type, name=det, description=self.get_task_param("payload")["channel"][det], payload=self.get_task_param("payload"), write=False, output=None, ) if self.get_task_param("write"): channels[det].write(ch) else: channel_type = self.get_task_param("payload")["channel"][ "channelClass" ]["value"].lower() det = self.get_task_param("payload")["channel"]["value"] channels[det] = buildInstrument( type=channel_type, name=det, description=self.get_task_param("payload")["channel"], payload=self.get_task_param("payload"), write=False, output=None, ) if self.get_task_param("write"): channels[det].write(ch) self.debug("channels : {}".format(channels)) self.set_output(channels)
[docs] class LoadPayload(Task): """ Loads payload and channels from dict Parameters ---------- input: dict main dictionary Returns ------- dict: payload dict dict: return a dict of built Instrument classes Examples -------- >>> loadPayload = LoadPayload() >>> channels = loadPayload(input=input) """ def __init__(self): self.addTaskParam("input", "input data")
[docs] def execute(self): payload_dir = self.get_task_param("input")["payload"] payload = load(payload_dir["payload description"]) channels_dir = payload_dir["channels"] channels = {} for ch in channels_dir.keys(): ch_dir = channels_dir[ch] description = load(ch_dir["description"]) instrument = instruments[ description["channelClass"]["value"].lower().decode("utf-8") ] channels[ch] = instrument( name=ch, description=description, payload=payload, ) table = read_table_hdf5(ch_dir, path=ch) built_instr = load(ch_dir["built_instr"]) channels[ch].load(table, built_instr) self.debug("channels loaded: {}".format(channels)) self.set_output([payload, channels])
[docs] class PreparePayload(Task): """ It loads the payload information and returns a dictionary containing the payload, a dictionary with the channels already built, and a couple with minimum and maximum wavelength investigated by the payload Parameters ---------- payload_file: str xml file with payload description output: str h5 output file Returns ------- dict: payload dictionary dict: channels dictionary couple: minimum and maximum investigated wavelength Examples -------- >>> preparePayload = PreparePayload() >>> payload, channels, (wl_min, wl_max) = PreparePayload(payload_file=payload, output=output) """ def __init__(self): self.addTaskParam("payload_file", "payload xml file") self.addTaskParam("output", "output file")
[docs] def execute(self): import os from exorad.tasks import LoadOptions from exorad.output.hdf5 import HDF5Output loadOptions = LoadOptions() buildChannels = BuildChannels() loadPayload = LoadPayload() payload_file = self.get_task_param("payload_file") output = self.get_task_param("output") if isinstance(payload_file, str): ext = os.path.splitext(payload_file)[1] else: ext = None if (ext == ".xml") or isinstance(payload_file, dict): if ext == ".xml": payload = loadOptions(filename=payload_file) if isinstance(payload_file, dict): payload = payload_file if output is not None: if os.path.isfile(output): append = True else: append = False with HDF5Output(output, append=append) as out: channels = buildChannels( payload=payload, write=True, output=out ) else: channels = buildChannels( payload=payload, write=False, output=None ) elif ext in [".h5", ".hdf5"]: import h5py with h5py.File(payload_file, "r") as f: payload, channels = loadPayload(input=f) # payload, channels = loadPayload(input=payload_file) else: self.error("Unsupported payload format") raise OSError("Unsupported payload format") wl_min, wl_max = ( payload["common"]["wl_min"]["value"], payload["common"]["wl_max"]["value"], ) self.set_output([payload, channels, (wl_min, wl_max)])
[docs] class MergeChannelsOutput(Task): """ Merges the channels output tables Parameters ---------- channels: dict Returns --------- Table """ def __init__(self): self.addTaskParam("channels", "dict of channels")
[docs] def execute(self): from exorad.utils.util import vstack_tables channels = self.get_task_param("channels") table_list = [] for channel in channels.keys(): table_list.append(channels[channel].table) table = vstack_tables(table_list) self.set_output(table)
[docs] class GetChannelList(Task): """ Returns a list of the channels of the specific type that are contained in the instrument options file Parameters ---------- options: object payload description channel_type: string type of channel to find Returns ------- list: list of strings with channel names Examples -------- >>> from exorad.tasks.loadOptions import LoadOptions >>> loadOptions = LoadOptions() >>> options = loadOptions(filename = 'path/to/file.xml') >>> getDetectorList = GetChannelList() >>> photometers_list = getDetectorList(options = options, channel_type = 'Photometer') >>> spectrometers_list = getDetectorList(options = options, channel_type = 'Spectrometer') """ def __init__(self): self.addTaskParam("options", "instrument description class") self.addTaskParam("channel_type", "desired channel type")
[docs] def execute(self): self._opt = self.get_task_param("options") self._channel_type = self.get_task_param("channel_type") self.channel_lst = self.__get_channel_list__() self.info( "{} type channel found: {}".format( self._channel_type, self.channel_lst ) ) self.set_output(self.channel_lst)
def __get_channel_list__(self): channel_lst = [] for channel in self._opt["channel"].keys(): if ( self._opt["channel"][channel]["channelClass"]["value"].lower() == self._channel_type.lower() ): channel_lst.append(channel) self.debug( "{} added as {}".format(channel, self._channel_type) ) return channel_lst