Source code for exorad.utils.exolib

import glob
import logging
import os

import astropy.units as u
import mpmath
import numpy as np
import photutils
from import fits
from scipy import signal
from scipy.integrate import cumulative_trapezoid
from scipy.interpolate import interp1d
from scipy.special import j1
from scipy.stats import binned_statistic

logger = logging.getLogger("exorad.exolib")

[docs] def rebin(x, xp, fp): """Resample a function fp(xp) over the new grid x, rebinning if necessary, otherwise interpolates Parameters ---------- x : array like New coordinates fp : array like y-coordinates to be resampled xp : array like x-coordinates at which fp are sampled Returns ------- out : array like new samples """ if x.unit != xp.unit: logger.fatal( "Units mismatch in rebin {:s}, {:s}".format(x.unit, xp.unit) ) raise ValueError idx = np.where(np.logical_and(xp > 0.9 * x.min(), xp < 1.1 * x.max()))[0] xp = xp[idx] fp = fp[idx] if not hasattr(fp, "unit"): logger.debug("No units found for fp. Forced to None") fp *= u.Unit() funits = fp.unit # remove NaNs id = np.where(np.isnan(xp))[0] if id.size > 0: logger.debug("Nans found in input x array: removing it") xp = np.delete(xp, id) fp = np.delete(fp, id) id = np.where(np.isnan(x))[0] if id.size > 0: logger.debug("Nans found in new x array: removing it") x = np.delete(x, id) # remove duplicates while np.diff(xp).min() == 0: logger.debug("duplicate found in input x array: removing it") id = np.argmin(np.diff(xp)) xp = np.delete(xp, id) fp = np.delete(fp, id) while np.diff(x).min() == 0: logger.debug("duplicate found in new x array: removing it") id = np.argmin(np.diff(x)) x = np.delete(x, id) if np.diff(xp).max() < np.diff(x).min(): # Binning! logger.debug("binning") bin_x = 0.5 * (x[1:] + x[:-1]) x0 = x[0] - (bin_x[0] - x[0]) / 2.0 x1 = x[-1] + (x[-1] - bin_x[-1]) / 2.0 bin_x = np.insert(bin_x, [0], x0) bin_x = np.append(bin_x, x1) new_f = ( binned_statistic(xp, fp, bins=bin_x, statistic="mean")[0] * funits ) # idx = np.where(np.logical_and(xp >= x.min(), xp <= x.max()))[0] # print np.trapz(fp[idx], x=xp[idx]) # print np.trapz(new_f, x=x) # asd else: logger.debug("interpolating") # Interpolate ! # new_f = np.interp(x, xp, fp, left=0.0, right=0.0) # from scipy.interpolate import interp1d # interpolator = interp1d(xp, fp, kind='cubic', fill_value=0.0, assume_sorted=False, bounds_error=False) # new_f = interpolator(x) func = interp1d( xp, fp, fill_value=0.0, assume_sorted=False, bounds_error=False, kind="linear", ) new_f = func(x) if not hasattr(new_f, "unit"): logger.debug("output units force to inputs fp") new_f *= funits return x, new_f
[docs] def rebin_(x, xp, fp): """Resample a function fp(xp) over the new grid x, rebinning if necessary, otherwise interpolates Parameters ---------- x : array like New coordinates fp : array like y-coordinates to be resampled xp : array like x-coordinates at which fp are sampled Returns ------- out : array like new samples """ if x.unit != xp.unit: logger.fatal( "Units mismatch in rebin {:s}, {:s}".format(x.unit, xp.unit) ) raise ValueError idx = np.where(np.logical_and(xp > 0.9 * x.min(), xp < 1.1 * x.max()))[0] xp = xp[idx] fp = fp[idx] if np.diff(xp).max() < np.diff(x).min(): # Binning! c = cumulative_trapezoid(fp.value, x=xp.value) * fp.unit * xp.unit xpc = xp[1:] delta = np.gradient(x) new_c_1 = ( np.interp(x - 0.5 * delta, xpc, c, left=0.0, right=0.0) * c.unit ) new_c_2 = ( np.interp(x + 0.5 * delta, xpc, c, left=0.0, right=0.0) * c.unit ) new_f = (new_c_2 - new_c_1) / delta else: # Interpolate ! new_f = np.interp(x, xp, fp, left=0.0, right=0.0) * fp.unit return x, new_f
[docs] def planck(wl, T): """Planck function. Parameters __________ wl : array wavelength [micron] T : scalar Temperature [K] Spot temperature [K] Returns ------- spectrum: array The Planck spectrum [W m^-2 sr^-1 micron^-1] """ a = np.float64(1.191042768e8) ***5 * u.W / u.m**2 / / b = np.float64(14387.7516) * 1 * * u.K try: x = b / (wl * T) bb = a / wl**5 / (np.exp(x) - 1.0) except ArithmeticError: bb = np.zeros_like(wl) return bb
[docs] def load_standard_psf(F_x, F_y, wl, delta_pix, hdr): k_x = delta_pix / (F_x * wl * hdr["CDELT2"]) k_y = delta_pix / (F_y * wl * hdr["CDELT1"]) xmin = -(hdr["CRPIX2"] - 1) * F_x * wl * hdr["CDELT2"] xmax = (hdr["NAXIS2"] - (hdr["CRPIX2"] - 1)) * F_x * wl * hdr["CDELT2"] ymin = -(hdr["CRPIX1"] - 1) * F_y * wl * hdr["CDELT1"] ymax = (hdr["NAXIS1"] - (hdr["CRPIX1"] - 1)) * F_y * wl * hdr["CDELT1"] extent = (xmin, xmax, ymin, ymax) return k_x, k_y, extent
[docs] def binnedPSF( F_x, F_y, wl, delta_pix, filename=None, format=None, PSFtype="AIRY", plot=False, ): if filename and os.path.exists(filename): if filename[-5:] == ".fits": with as hdu: ima = hdu[0].data hdr = hdu[0].header # define a kernel representing the detector pixel response # and use fractional pixel k_x, k_y, extent = load_standard_psf( F_x, F_y, wl, delta_pix, hdr ) elif os.path.isdir(filename): filenames = np.sort(glob.glob(filename + "*.fits")) psf_wl = [] for file in filenames: with as hdu: psf_wl.append(hdu[0].header["WAVELEN"]) idx = (np.abs(np.asarray(psf_wl) - wl.value)).argmin() with[idx])) as hdu: ima = hdu[0].data hdr = hdu[0].header k_x, k_y, extent = load_standard_psf( F_x, F_y, wl, delta_pix, hdr ) else: x = np.linspace(-4.0, 4.0, 256) xx, yy = np.meshgrid(x, x) r = np.pi * np.sqrt(xx**2 + yy**2) + 1.0e-10 ima = (2.0 * j1(r) / r) ** 2 ima *= 0.25 * np.pi * (x[1] - x[0]) ** 2 # print ima.sum() k_x = delta_pix / (F_x * wl * (x[1] - x[0])) k_y = delta_pix / (F_y * wl * (x[1] - x[0])) # print k_y, k_x extent = ( -(ima.shape[1] // 2) * F_x * wl * (x[1] - x[0]), (ima.shape[1] // 2) * F_x * wl * (x[1] - x[0]), -(ima.shape[0] // 2) * F_y * wl * (x[1] - x[0]), (ima.shape[0] // 2) * F_y * wl * (x[1] - x[0]), ) # normalise ima /= ima.sum() fk_x, ik_x = np.modf(k_x) fk_y, ik_y = np.modf(k_y) kernel = np.ones((int(ik_y) + 2, int(ik_x) + 2)) * fk_x.unit kernel[:, 0] *= 0.5 * fk_x kernel[:, -1] *= 0.5 * fk_x kernel[0, :] *= 0.5 * fk_y kernel[-1, :] *= 0.5 * fk_y imac = signal.convolve2d(ima, kernel, mode="same") if plot: plot_imac(imac, extent) return imac, kernel, extent
[docs] def load_pixel_psf_size(delta_pix, hdr): xmin = [-hdr["NAXIS2"] / 2.0 * delta_pix.value] * u.micron xmax = [hdr["NAXIS2"] / 2.0 * delta_pix.value] * u.micron ymin = [-hdr["NAXIS1"] / 2.0 * delta_pix.value] * u.micron ymax = [hdr["NAXIS1"] / 2.0 * delta_pix.value] * u.micron extent = (xmin, xmax, ymin, ymax) return extent
[docs] def pixel_based_psf(wl, delta_pix, filename): if filename[-5:] == ".fits": with as hdu: ima = hdu[0].data hdr = hdu[0].header extent = load_pixel_psf_size(delta_pix, hdr) elif os.path.isdir(filename): filenames = np.sort(glob.glob(filename + "*.fits")) psf_wl = [] for file in filenames: with as hdu: psf_wl.append(hdu[0].header["WAVELEN"]) idx = (np.abs(np.asarray(psf_wl) - wl.value)).argmin() with[idx])) as hdu: ima = hdu[0].data hdr = hdu[0].header extent = load_pixel_psf_size(delta_pix, hdr) return ima, np.array([1.0]), extent
[docs] def load_paos_psf(wl_group): from exorad.output.hdf5 import load scale = 1.0e6 group = load(input_group=wl_group) img_key = list(group.keys())[-1] group = group[img_key] ima = group["amplitude"] ** 2 dx = group["dx"] * u.micron * scale dy = group["dy"] * u.micron * scale extent = group["extent"] * u.micron * scale fratio = group["fratio"] return ima, dx, dy, extent, fratio
[docs] def interpolate_paos_psf(fd, wl): wavelengths = np.asarray( [key for key in list(fd.keys()) if key != "info"] ).astype(np.float64) wavelengths.sort() if len(wavelengths) == 1: logger.error("Can't interpolate, PAOS file has only one wavelength") raise ValueError( "Can't interpolate, PAOS file has only one wavelength" ) idx0 = np.argmin(np.abs(wavelengths - wl)) if np.min(wavelengths) <= wl <= np.max(wavelengths): idx1 = idx0 - 1 if wavelengths[idx0] - wl > 0 else idx0 + 1 else: logger.warning( "Wavelength is outside interpolation region, extrapolating..." ) idx1 = idx0 + 1 if wavelengths[idx0] - wl > 0 else idx0 - 1 wl0, wl1 = wavelengths[idx0], wavelengths[idx1] ima0, dx0, dy0, extent0, fratio0 = load_paos_psf(wl_group=fd[f"{wl0}"]) ima1, dx1, dy1, extent1, fratio1 = load_paos_psf(wl_group=fd[f"{wl1}"]) dx0, dy0, extent0 = dx0.value, dy0.value, extent0.value dx1, dy1, extent1 = dx1.value, dy1.value, extent1.value ima = interp1d( x=[wl0, wl1], y=[ima0, ima1], kind="linear", fill_value="extrapolate", axis=0, )(wl) dx = interp1d( x=[wl0, wl1], y=[dx0, dx1], kind="linear", fill_value="extrapolate" )(wl) dy = interp1d( x=[wl0, wl1], y=[dy0, dy1], kind="linear", fill_value="extrapolate" )(wl) extent = interp1d( x=[wl0, wl1], y=[extent0, extent1], kind="linear", fill_value="extrapolate", axis=0, )(wl) fratio = interp1d( x=[wl0, wl1], y=[fratio0, fratio1], kind="linear", fill_value="extrapolate", )(wl) return ima, dx * u.micron, dy * u.micron, extent * u.micron, fratio
[docs] def paosPSF(wl, delta_pix, filename="", plot=False): import h5py from scipy import signal logger.debug("loading PAOS psf") assert wl.unit == u.micron, print("Wavelength unit should be micron. ") try: with h5py.File(os.path.expanduser(filename), mode="r") as fd: wavelength = wl.value[0] if str(wavelength) in list(fd.keys()): ima, dx, dy, extent, fratio = load_paos_psf( wl_group=fd[f"{wavelength}"] ) else: ima, dx, dy, extent, fratio = interpolate_paos_psf( fd, wavelength ) except OSError as e: logger.error("Error loading PAOS psf file. ") raise FileNotFoundError(e.errno) k_x = delta_pix / dx k_y = delta_pix / dy fk_x, ik_x = np.modf(k_x) fk_y, ik_y = np.modf(k_y) kernel = np.ones((int(ik_y) + 2, int(ik_x) + 2)) * fk_x.unit kernel[:, 0] *= 0.5 * fk_x kernel[:, -1] *= 0.5 * fk_x kernel[0, :] *= 0.5 * fk_y kernel[-1, :] *= 0.5 * fk_y imac = signal.convolve2d(ima, kernel, mode="same") if plot: plot_imac(imac, extent) return imac, kernel, extent
[docs] def plot_imac(imac, extent, xlim=None, ylim=None): import matplotlib.pyplot as plt from mpl_toolkits.axes_grid1 import make_axes_locatable extent = np.asarray([ext.value for ext in extent]).ravel() fig, ax = plt.subplots(1, 1, figsize=(8, 6)) im = ax.imshow(imac, cmap=plt.get_cmap("viridis"), origin="lower") extent = np.asarray(extent).astype(np.float64) im.set_extent(extent=extent) if (xlim, ylim) != (None, None): plt.xlim(*xlim) plt.ylim(*ylim) plt.grid(True) cax = make_axes_locatable(ax).append_axes("right", size="5%", pad=0.05) plt.colorbar(im, cax=cax, orientation="vertical") return 0
[docs] def wl_encircled_energy(filename, eec, format, Fnum_x, Fnum_y, delta_pix): filenames = np.sort(glob.glob(filename + "*.fits")) psf_wl, enc = [], [] for file in filenames: with as hdu: psf_wl.append(hdu[0].header["WAVELEN"]) if format == "pixel_based": ima, _, _ = pixel_based_psf( hdu[0].header["WAVELEN"], delta_pix, file ) enc += [find_aperture_radius(ima, eec, 1, 1, 1)] else: ima, _, _ = binnedPSF( Fnum_x, Fnum_y, hdu[0].header["WAVELEN"], delta_pix ) enc += [ find_aperture_radius( ima, eec, Fnum_x, Fnum_y, hdu[0].header["WAVELEN"] ) ] return psf_wl, enc
[docs] def find_aperture_radius(ima, eec, Fnum_x, Fnum_y, wavelength): """ It finds the aperture radius for a given PSF such that the desired Encircled Energy is contained. Parameters ---------- ima: psf image in micron scale eec: desired encircled energy Fnum_x: f number in the spectral direction Fnum_y: f number in the spatial direction wavelength: wavelength of the sampled psf Returns --------- float """ last_enc, i = 0, 0 rs, enc = [], [] while last_enc < (eec + 0.02): i += 1 rs += [i] enc += [encircled_energy(i, ima, Fnum_x, Fnum_y, wavelength)] last_enc = enc[-1] inter = interp1d(enc, rs) r = inter(eec) return r
[docs] def encircled_energy(r, ima, Fnum_x, Fnum_y, wavelength): r_pix_x = r * Fnum_x * wavelength r_pix_y = r * Fnum_y * wavelength aper = photutils.aperture.EllipticalAperture( (ima.shape[1] // 2, ima.shape[0] // 2), b=float(r_pix_y.value), a=float(r_pix_x.value), ) phot_ = photutils.aperture.aperture_photometry(ima, aper) phot = phot_["aperture_sum"].data[0] return phot / ima.sum()
[docs] def OmegaPix(Fnum_x, Fnum_y=None): """ Calculate the solid angle subtended by an elliptical aperture on-axis. Algorithm from "John T. Conway. Nuclear Instruments and Methods in Physics Research Section A: Accelerators, Spectrometers, Detectors and Associated Equipment, 614(1):17 ? 27, 2010. Equation n. 56 Parameters ---------- Fnum_x : scalar Input F-number along dispersion direction Fnum_y : scalar Optional, input F-number along cross-dispersion direction Returns ------- Omega : scalar The solid angle subtanded by an elliptical aperture in units """ if not Fnum_y: Fnum_y = Fnum_x if Fnum_x > Fnum_y: a = 1.0 / (2 * Fnum_y) b = 1.0 / (2 * Fnum_x) else: a = 1.0 / (2 * Fnum_x) b = 1.0 / (2 * Fnum_y) h = 1.0 A = 4.0 * h * b / (a * np.sqrt(h**2 + a**2)) k = np.sqrt((a**2 - b**2) / (h**2 + a**2)) alpha = np.sqrt(1 - (b / a) ** 2) Omega = 2.0 * np.pi - A * mpmath.ellippi(alpha**2, k**2) return Omega *
if __name__ == "__main__": Omega = OmegaPix(5.0, 5.0) sigma = np.arctan(1.0 / 10) OmegaCheck = 2.0 * np.pi * (1 - np.cos(sigma)) print((Omega, OmegaCheck))