Source code for soxspipe.commonutils.response_function

#!/usr/bin/env python
# encoding: utf-8
"""
*Given a standard star extracted spectrum, generate the instrument response function needed to flux calibrate science spectra*

:Author:
    Marco Landoni & David Young

:Date Created:
    July 28, 2023
"""

import sys
import os
from builtins import object

from soxspipe.commonutils.toolkit import extinction_correction_factor

os.environ["TERM"] = "vt100"


[docs] class response_function(object): """ *Given a standard star extracted spectrum, generate the instrument response function needed to flux calibrate science spectra* **Key Arguments:** - ``log`` -- logger - ``settings`` -- the settings dictionary - ``stdExtractionPath`` -- fits binary table containing the extracted standard spectrum - ``recipeName`` -- name of the recipe as it appears in the settings dictionary - ``settings`` -- the pipeline settings - ``sofName`` -- name of the originating SOF file - ``qcTable`` -- the data frame to collect measured QC metrics - ``productsTable`` -- the data frame to collect output products - ``startNightDate`` -- YYYY-MM-DD date of the observation night. Default "" - ``stdNotFlatExtractionPath`` -- fits binary table containing the extracted standard spectrum without flat correction. Default "". - ``orderJoins`` -- a list of tuples indicating the orders to be joined together in the response function fitting. Default None. **Usage:** To setup your logger, settings and database connections, please use the ``fundamentals`` package (`see tutorial here <http://fundamentals.readthedocs.io/en/latest/#tutorial>`_). To initiate a response_function object, use the following: ```python from soxspipe.commonutils import response_function response = response_function( log=log, settings=settings, recipeName=recipeName, sofName=sofName, stdExtractionPath=stdExtractionPath qcTable=qcTable, productsTable=productsTable, startNightDate=startNightDate stdNotFlatExtractionPath=stdNotFlatExtractionPath, orderJoins=orderJoins, ) qcTable, productsTable = response.get() ``` """ def __init__( self, log, stdExtractionPath, recipeName, sofName, settings=False, qcTable=False, productsTable=False, startNightDate="", stdNotFlatExtractionPath="", orderJoins=None, ): self.log = log log.debug("instansiating a new 'response_function' object") self.settings = settings self.stdExtractionPath = stdExtractionPath self.recipeSettings = settings["soxs-response"] self.instrument = self.settings["instrument"].lower() self.qc = qcTable self.products = productsTable self.recipeName = recipeName self.sofName = sofName self.orderJoins = orderJoins from soxspipe.commonutils.toolkit import get_calibrations_path from astropy.table import Table from astropy.io import fits # CONVERTING EXTRACTION (BACK) TO DATAFRAME self.stdExtractionDF = Table.read(self.stdExtractionPath, format="fits") self.stdExtractionDF = self.stdExtractionDF.to_pandas() # SORT BY COLUMN NAME self.stdExtractionDF.sort_values(["WAVE"], inplace=True) self.calibrationRootPath = get_calibrations_path(log=self.log, settings=self.settings) from soxspipe.commonutils import keyword_lookup # KEYWORD LOOKUP OBJECT - LOOKUP KEYWORD FROM DICTIONARY IN RESOURCES # FOLDER self.kw = keyword_lookup(log=self.log, settings=self.settings).get kw = self.kw stdNames = ["LTT7987", "EG274", "LTT3218", "EG21"] stdAkas = ["CD-3017706", "CD-3810980", "CD-325613", "CPD-69177"] hdul = fits.open(self.stdExtractionPath) self.header = hdul[0].header self.dateObs = self.header[kw("DATE_OBS")] self.texp = float(self.header[kw("EXPTIME")]) if self.instrument == "xsh": # Name is in the format 'EG 274' self.std_objName = self.header[kw("OBS_TARG_NAME")].strip().upper() else: # Name is in the format 'EG 274' self.std_objName = self.header[kw("OBJECT")].strip().upper() if "STD," in self.std_objName: try: self.std_objName = self.header[kw("OBS_TARG_NAME")].strip().upper() except: pass self.std_objName = self.std_objName.split(" V")[0].replace(" ", "") # Hack to reduce xsh data # REMOVE SPACES IN NAME self.std_objName = self.std_objName.replace(" ", "") self.std_objName = self.std_objName.replace("_NOD", "") if stdNotFlatExtractionPath and len(stdNotFlatExtractionPath) > 1: # STD STAR GIVEN, READING THE NON FLAT FIELDED SPECTRUM self.stdExtractionNotFlatDF = Table.read(stdNotFlatExtractionPath, format="fits") self.stdExtractionNotFlatDF = self.stdExtractionNotFlatDF.to_pandas() if self.std_objName in stdAkas: for s, a in zip(stdNames, stdAkas): if self.std_objName == a: self.std_objName = s self.log.print(f"STANDARD-STAR: {self.std_objName}") # USING THE AVERAGE AIR MASS if self.instrument == "soxs": # NEED TO UPDATE THE LOOKUP TABLE EVENTUALLY airmass_start = float(self.header["HIERARCH ESO TEL AIRM START"]) airmass_end = float(self.header["HIERARCH ESO TEL AIRM END"]) else: airmass_start = float(self.header[kw("AIRM_START")]) airmass_end = float(self.header[kw("AIRM_END")]) self.airmass = (airmass_start + airmass_end) / 2 self.arm = self.header[kw("SEQ_ARM")].strip().upper() # KW lookup # DETECTOR PARAMETERS LOOKUP OBJECT from soxspipe.commonutils import detector_lookup self.detectorParams = detector_lookup(log=log, settings=settings).get(self.arm) from soxspipe.commonutils.toolkit import utility_setup self.qcDir, self.productDir = utility_setup( log=self.log, settings=settings, recipeName=recipeName, startNightDate=startNightDate ) return None
[docs] def get(self): """ *get the response_function object* **Return:** - ``response_function`` -- a set of polynomial coefficients """ self.log.debug("starting the ``get`` method") import pandas as pd from scipy.interpolate import interp1d import numpy as np from scipy.signal import savgol_filter from datetime import datetime from astropy.table import Table from matplotlib import pyplot as plt response_function = None # GET THE EXTRACTED STANDARD STAR'S WAVELENGTH AND FLUX stdExtWave = self.stdExtractionDF["WAVE"].values stdExtFlux = self.stdExtractionDF["FLUX_COUNTS"].values stdExtWaveNotFlat = self.stdExtractionNotFlatDF["WAVE"].values stdExtFluxNotFlat = self.stdExtractionNotFlatDF["FLUX_DENSITY_COUNTS"].values # GET THE ABSOLUTE STANDARD STAR FLUXES, ASSUMING TO HAVE 1-1 MAPPING BETWEEN OBJECT NAME IN THE FITS HEADER AND DATABASE stdAbsFluxDF = Table.read(self.calibrationRootPath + "/" + self.detectorParams["flux-standards"], format="fits") stdAbsFluxDF = stdAbsFluxDF.to_pandas() # MAKE ALL COLUMNS UPPERCASE stdAbsFluxDF.columns = [d.upper() for d in stdAbsFluxDF.columns] # SELECTING ROWS IN THE INTERESTED WAVELENGTH RANGE ADDING A MARGIN TO THE RANGE stdAbsFluxDF = stdAbsFluxDF[ (stdAbsFluxDF["WAVE"] > np.min(stdExtWaveNotFlat) - 10) & (stdAbsFluxDF["WAVE"] < 10 + np.max(stdExtWaveNotFlat)) ] # FLUX IS CONVERTED IN ERG / CM2 / S / ANG try: self.std_wavelength_to_abs_flux = interp1d( np.array(stdAbsFluxDF["WAVE"]), np.array(stdAbsFluxDF[self.std_objName]) * 10**17, kind="next", fill_value="extrapolate", ) # print("YESSTD") except Exception as e: # print("NOSTD") self.log.warning( f"Standard star {self.std_objName} not found in the static calibration database. The available STDs are {', '.join(stdAbsFluxDF.columns[1:])}" ) forceFailure = True return ( self.qc, self.products, f"Standard star {self.std_objName} not found in the static calibration database. The available STDs are {', '.join(stdAbsFluxDF.columns[1:])}", ) # STRONG SKY ABS REGION TO BE EXCLUDED if self.arm == "NIR": excludeRegions = [(770, 850), (930, 970), (1080, 1200), (1280, 1520), (1785, 1950)] else: excludeRegions = [] # INTEGRATING THE EXTRACTED STANDARD IN xx nm BIN-WIDE FILTERS (CONVERTING BACK IN A/PX) stdExtFluxNotFlat = stdExtFluxNotFlat / 10.0 # NOW DIVIDING FOR THE EXPOSURE TIME stdExtFlux = stdExtFlux / self.texp stdExtFluxNotFlat = stdExtFluxNotFlat / self.texp if False: plt.plot(stdExtWave, stdExtFlux) plt.xlabel("Wavelength (nm)") plt.ylabel("Extracted Flux (counts/s)") plt.title("Extracted Standard Star Spectrum") plt.grid() plt.show() plt.plot(stdAbsFluxDF["WAVE"], stdAbsFluxDF[self.std_objName] * 10**17) plt.xlabel("Wavelength (nm)") plt.ylabel("Absolute Flux (erg/s/cm²/Å)") plt.title("Absolute Standard Star Spectrum") plt.grid() plt.show() # GETTING EFFICIENCY stdEfficiencyEstimate = None # USING THE STD STAR SPECTRUM THAT IS NOT CORRECTED BY FLAT try: # CONSTANTS FOR CALCULATIONS c = 3 * 10**10 # SPEED OF LIGHT IN CM/S h = 6.63 * 10**-27 # PLANCK'S CONSTANT IN ERG·S # TELESCOPE AREA BASED ON THE INSTRUMENT if self.instrument == "xsh": # VLT TELESCOPE AREA (IN CM²) area = 400 * 400 * 3.14 else: # NTT TELESCOPE AREA (IN CM²) area = 89000 # EXTRACT WAVELENGTH VALUES AND COMPUTE ABSOLUTE FLUX wavelength = stdExtWaveNotFlat abs_flux = self.std_wavelength_to_abs_flux(wavelength) * 10**-17 # CONVERT WAVELENGTH FROM ÅNGSTRÖM TO CM wavelength_in_cm = wavelength * 10**-7 # COMPUTE THE ABSOLUTE PHOTON FLUX OF THE STANDARD STAR hc = h * c # PRECOMPUTE PLANCK'S CONSTANT * SPEED OF LIGHT stdAbsPhotonFlux = (abs_flux * wavelength_in_cm / hc) * area # COMPUTE THE EFFICIENCY ESTIMATE stdEfficiencyEstimate = stdExtFluxNotFlat / stdAbsPhotonFlux # Change backend to MacOSX if False: import matplotlib matplotlib.use("MacOSX") from matplotlib import pyplot as plt print(self.texp) plt.plot(wavelength, stdExtFluxNotFlat, label="Extracted Flux (not flat corrected)") plt.plot(wavelength, stdAbsPhotonFlux, label="Absolute Photon Flux") plt.plot(wavelength, stdEfficiencyEstimate, label="Efficiency Estimate") plt.xlabel("Wavelength (nm)") plt.ylabel("Efficiency") plt.title("Efficiency Estimate") plt.grid() plt.legend() plt.show() # APPLY SAVITZKY-GOLAY FILTER TO SMOOTH THE EFFICIENCY ESTIMATE stdEfficiencyEstimate = savgol_filter(stdEfficiencyEstimate, 21, 2) except Exception as e: print(e) # LANDING HERE NO STD STAR SPECTRUM WITHOUT FLAT CORRECTION IS PROVIDED stdEfficiencyEstimate = None pass # APPLYING EXTINCTION CORRECTION if self.arm == "UVB" or self.arm == "VIS": extCorrectionFactor = extinction_correction_factor( stdExtWave, self.calibrationRootPath + "/" + self.detectorParams["extinction"], self.airmass ) if False: from matplotlib import pyplot as plt plt.plot(stdExtWave, extCorrectionFactor) plt.xlabel("Wavelength (nm)") plt.ylabel("Extinction Correction Factor") plt.title("Extinction Correction Factor vs Wavelength") plt.grid() plt.show() stdExtFlux = stdExtFlux * extCorrectionFactor self.response_function_raw = self.std_wavelength_to_abs_flux(stdExtWave) / stdExtFlux wavelength_response = self.stdExtractionDF["WAVE"].values polyOrder = int(self.recipeSettings[self.arm.lower()]["poly_order"]) numIter = 0 deletedPoints = 1 # REMOVE EXCLUDED REGION FROM WAVELENGTH AND RESPONSE FUNCTION for er in excludeRegions: elementsToDelete = np.where((wavelength_response >= er[0]) & (wavelength_response <= er[1]))[0] wavelength_response = np.delete(wavelength_response, elementsToDelete) self.response_function_raw = np.delete(self.response_function_raw, elementsToDelete) # SMOOTHING DATA IF NIR if self.arm == "NIR": from scipy.ndimage import gaussian_filter1d self.response_function_raw = gaussian_filter1d(self.response_function_raw, sigma=5) # FITTING ITERATIVELY THE DATA WITH A POLYNOMIAL while (numIter < int(self.recipeSettings[self.arm.lower()]["max_iteration"])) and (deletedPoints > 0): try: # FITTING THE DATA elementsToDelete = [] wavelength_response_tmp = np.array( [ np.median(wavelength_response[max(0, i - 5) : i + 6]) for i in range(0, len(wavelength_response), 100) ] ) response_function_raw_tmp = np.array( [ np.median(self.response_function_raw[max(0, i - 5) : i + 6]) for i in range(0, len(self.response_function_raw), 100) ] ) responseFuncCoeffs = np.polyfit(wavelength_response_tmp, response_function_raw_tmp, deg=polyOrder) for index, (z, zf) in enumerate( zip( self.response_function_raw, np.polyval(responseFuncCoeffs, wavelength_response), ) ): # if np.abs(np.abs(z)-np.abs(zf)) > 0.05: # ff np.abs(np.abs(z) - np.abs(zf)) / np.abs(z) > 100 or z < 0: if z < 0 or np.abs(np.abs(z) - np.abs(zf)) / np.abs(z) > 0.2: elementsToDelete.append(index) wavelength_response = np.delete(wavelength_response, elementsToDelete) self.response_function_raw = np.delete(self.response_function_raw, elementsToDelete) deletedPoints = len(elementsToDelete) numIter = numIter + 1 except Exception as e: self.log.print("fail") raise Exception("The fitting of response function did not converge!") from e if False: plt.plot(wavelength_response, self.response_function_raw) plt.show() # WRITE RESPONSE FUNCTION TO FITS BINARY TABLE self.write_response_function_to_file(responseFuncCoeffs=responseFuncCoeffs, polyOrder=polyOrder) if not isinstance(stdEfficiencyEstimate, bool): # CREATE A DATAFRAME FOR EFFICIENCY ESTIMATE stdEfficiencyEstimateDF = pd.DataFrame({"WAVE": stdExtWaveNotFlat, "EFFICIENCY": stdEfficiencyEstimate}) # WRITE THE EFFICIENCY ESTIMATE TO FITS BINARY TABLE from astropy.table import Table import copy from astropy.io import fits from soxspipe.commonutils.toolkit import add_snr_efficiency_qcs t = Table.from_pandas(stdEfficiencyEstimateDF) filename = f"{self.sofName}_EFFICIENCY.fits" filepath = f"{self.productDir}/{filename}" header = copy.deepcopy(self.header) header[self.kw("SEQ_ARM").upper()] = self.arm header[self.kw("PRO_TYPE").upper()] = "REDUCED" header[self.kw("PRO_CATG").upper()] = f"EFFICIENCY_TAB_{self.arm}".upper() BinTableHDU = fits.table_to_hdu(t) self.qc = add_snr_efficiency_qcs( log=self.log, spectrumDF=stdEfficiencyEstimateDF, qcTable=self.qc, orderJoins=self.orderJoins, recipeName=self.recipeName, dateObs=self.dateObs, ) # ADD QC METRICS TO HEADER for n, v, c, h in zip( self.qc["qc_name"].values, self.qc["qc_value"].values, self.qc["qc_comment"].values, self.qc["to_header"].values, ): if h and v is not np.nan: header[f"ESO QC {n}".upper()] = (v, c) priHDU = fits.PrimaryHDU(header=header) hduList = fits.HDUList([priHDU, BinTableHDU]) hduList.verify("fix") hduList.writeto(filepath, checksum=True, overwrite=True) utcnow = datetime.utcnow() utcnow = utcnow.strftime("%Y-%m-%dT%H:%M:%S") self.products = pd.concat( [ self.products, pd.Series( { "soxspipe_recipe": self.recipeName, "product_label": "EFFICIENCY", "file_name": filename, "file_type": "FITS", "obs_date_utc": self.dateObs, "reduction_date_utc": utcnow, "product_desc": f"SOXS efficiency estimate", "file_path": filepath, "label": "QC", } ) .to_frame() .T, ], ignore_index=True, ) self.plot_response_curve( stdExtWave=stdExtWave, stdExtWaveNotFlat=stdExtWaveNotFlat, stdExtFlux=stdExtFlux, binCentreWave=wavelength_response, binCentreWaveOriginal=wavelength_response, binIntegratedFlux=self.response_function_raw, absToExtFluxRatio=None, responseFuncCoeffs=responseFuncCoeffs, stdEfficiencyEstimate=stdEfficiencyEstimate, ) return self.qc, self.products, False
[docs] def plot_response_curve( self, stdExtWave, stdExtWaveNotFlat, stdExtFlux, binCentreWave, binCentreWaveOriginal, binIntegratedFlux, absToExtFluxRatio, responseFuncCoeffs, stdEfficiencyEstimate, ): """*generate a QC plot for the response curve* **Key Arguments:** - ``stdExtWave`` -- the extracted standard star wavelength - ``stdExtFlux`` -- the extracted standard star flux - ``stdExtWaveNotFlat`` -- the extracted standard star wavelength (not flattened) - ``binCentreWave`` -- binned wavelengths after clipping (during fitting) - ``binCentreWaveOriginal`` -- binned wavelengths - ``binIntegratedFlux`` -- binned flux - ``absToExtFluxRatio`` -- the ratio of the absolute flux vs the extraction flux - ``responseFuncCoeffs`` -- the response function coefficients - ``stdEfficiencyEstimate`` -- the estimated instrument efficiency **Return:** - `plotFilePath` -- the path to the QC plot PDF """ self.log.debug("starting the ``plot_response_curve`` method") import matplotlib.pyplot as plt import numpy as np from datetime import datetime import pandas as pd # WRITE THE QC PLOT TO PDF fig = plt.figure(figsize=(6, 22), constrained_layout=True, dpi=120) if stdEfficiencyEstimate is not None: gs = fig.add_gridspec(30, 4) else: gs = fig.add_gridspec(25, 4) onerow = fig.add_subplot(gs[0:4, :]) tworow = fig.add_subplot(gs[5:9, :]) threerow = fig.add_subplot(gs[10:14, :]) fourrow = fig.add_subplot(gs[15:19, :]) fiverow = fig.add_subplot(gs[20:24, :]) if stdEfficiencyEstimate is not None: sixrow = fig.add_subplot(gs[25:29, :]) onerow.plot(stdExtWave, self.std_wavelength_to_abs_flux(stdExtWave) * 10**-17, linewidth=0.2) onerow.set_title(f"{self.std_objName} absolute flux spectrum", fontsize=12) onerow.set_xlabel(f"wavelength (nm)", fontsize=9) onerow.set_ylabel("flux ($\\mathrm{erg/cm^{2}/s/angstom}$)", fontsize=9) onerow.tick_params(axis="both", which="major", labelsize=9) # Set y-limits based on the absolute flux spectrum abs_flux = self.std_wavelength_to_abs_flux(stdExtWave) min_flux, max_flux = np.min(abs_flux), np.max(abs_flux) flux_margin = 0.2 * (max_flux - min_flux) # Add 10% margin # onerow.set_ylim(min_flux - flux_margin, max_flux + flux_margin) tworow.scatter(binCentreWaveOriginal, binIntegratedFlux, marker="o", s=10, alpha=0.5) tworow.set_title("Raw ratio", fontsize=12) tworow.set_xlabel(f"wavelength (nm)", fontsize=9) tworow.set_ylabel("Ratio $\\frac{F_{\\lambda}}{F_c}$", fontsize=9) tworow.tick_params(axis="both", which="major", labelsize=9) # threerow.plot(binCentreWave, absToExtFluxRatio, linewidth=0.5) threerow.set_title("Fitted Response Function", fontsize=12) threerow.plot( stdExtWave, np.polyval(responseFuncCoeffs, stdExtWave), c="red", label="response curve", linewidth=2.5, alpha=0.7, ) threerow.scatter(binCentreWaveOriginal, binIntegratedFlux, marker="o", s=10, alpha=0.2) # threerow.set_xlim(min(binCentreWave), max(binCentreWave)) # threerow.set_ylim(min(absToExtFluxRatio), max(absToExtFluxRatio)) threerow.set_xlabel(f"wavelength (nm)", fontsize=9) threerow.set_ylabel("absolute-extracted flux ratio", fontsize=9) threerow.tick_params(axis="both", which="major", labelsize=9) # TEST THE X-CALIB # FLUX IS ALREADY DIVIDED BY DISPERSION AND CORRECTED FOR THE EXTINCTION !! # OTHERWISE, COPY LINES ABOVE flux_calib = stdExtFlux * np.polyval(responseFuncCoeffs, stdExtWave) flux_calib = flux_calib * 10**-17 # CONVERTING BACK TO PHYS UNITS fourrow.plot(stdExtWave, flux_calib, linewidth=0.2) fourrow.set_title("Self calibration of std star", fontsize=12) fourrow.set_xlabel(f"wavelength (nm)", fontsize=9) fourrow.set_ylabel("flux ($\\mathrm{erg/cm^{2}/s/angstom}$)", fontsize=9) fourrow.tick_params(axis="both", which="major", labelsize=9) # fourrow.set_ylim(min_flux - flux_margin, max_flux + flux_margin) fiverow.plot( stdExtWave, (flux_calib * 10**17 - self.std_wavelength_to_abs_flux(stdExtWave)) / self.std_wavelength_to_abs_flux(stdExtWave), linewidth=0.2, ) fiverow.set_ylim(-5, 5) # plt.plot(np.array(stdAbsFluxDF[0]),np.array(stdAbsFluxDF[4])*10**17,c='red') plt.subplots_adjust(hspace=1.0) fiverow.set_title("Relative residuals", fontsize=12) fiverow.set_xlabel(f"wavelength (nm)", fontsize=9) fiverow.set_ylabel("residual", fontsize=9) fiverow.tick_params(axis="both", which="major", labelsize=9) if stdEfficiencyEstimate is not None: sixrow.plot(stdExtWaveNotFlat, stdEfficiencyEstimate, linewidth=0.2) sixrow.set_ylim(0, np.min([np.nanmax(stdEfficiencyEstimate), 1.0])) # plt.plot(np.array(stdAbsFluxDF[0]),np.array(stdAbsFluxDF[4])*10**17,c='red') plt.subplots_adjust(hspace=1.0) sixrow.set_title("Efficiency (end-to-end)", fontsize=12) sixrow.set_xlabel(f"wavelength (nm)", fontsize=9) sixrow.set_ylabel("Efficiency", fontsize=9) sixrow.tick_params(axis="both", which="major", labelsize=9) plotFilename = self.sofName + "_RESPONSE.pdf" plotFilePath = f"{self.qcDir}/{plotFilename}" plt.savefig(plotFilePath, dpi=120, format="pdf", bbox_inches="tight") plt.close("all") utcnow = datetime.utcnow() utcnow = utcnow.strftime("%Y-%m-%dT%H:%M:%S") self.products = pd.concat( [ self.products, pd.Series( { "soxspipe_recipe": self.recipeName, "product_label": "RESPONSE_QC_PLOT", "file_name": plotFilename, "file_type": "PDF", "obs_date_utc": self.dateObs, "reduction_date_utc": utcnow, "product_desc": f"Response curve QC plot.", "file_path": plotFilePath, "label": "QC", } ) .to_frame() .T, ], ignore_index=True, ) if False: plt.show() self.log.debug("completed the ``plot_response_curve`` method") return plotFilePath
[docs] def write_response_function_to_file(self, responseFuncCoeffs, polyOrder): """*write out the fitted polynomial solution coefficients to file* **Key Arguments:** - ``responseFuncCoeffs`` -- the response curve coefficients - ``polyOrder`` -- the order of polynomial used to fit the curve **Return:** - ``responseCurvePath`` -- path to the saved file """ self.log.debug("starting the ``write_response_function_to_file`` method") import pandas as pd from astropy.table import Table from astropy.io import fits from datetime import datetime import copy arm = self.arm kw = self.kw coeffDict = {} coeffDict["polyOrder"] = polyOrder n_coeff = 0 for i in range(0, polyOrder + 1): coeffDict[f"c{i}"] = responseFuncCoeffs[n_coeff] n_coeff += 1 filename = self.sofName + "_RESP.fits" header = copy.deepcopy(self.header) filePath = f"{self.productDir}/{filename}" df = pd.DataFrame([coeffDict]) t = Table.from_pandas(df) BinTableHDU = fits.table_to_hdu(t) header[kw("SEQ_ARM").upper()] = arm header[kw("PRO_TYPE").upper()] = "REDUCED" header[kw("PRO_CATG").upper()] = f"RESP_TAB_{arm}".upper() # WRITE QCs TO HEADERS for n, v, c, h in zip( self.qc["qc_name"].values, self.qc["qc_value"].values, self.qc["qc_comment"].values, self.qc["to_header"].values, ): if h: header[f"ESO QC {n}".upper()] = (v, c) priHDU = fits.PrimaryHDU(header=header) hduList = fits.HDUList([priHDU, BinTableHDU]) hduList.verify("fix") hduList.writeto(filePath, checksum=True, overwrite=True) utcnow = datetime.utcnow() utcnow = utcnow.strftime("%Y-%m-%dT%H:%M:%S") self.products = pd.concat( [ self.products, pd.Series( { "soxspipe_recipe": self.recipeName, "product_label": "RESPONSE_FUNC", "file_name": filename, "file_type": "FITS", "obs_date_utc": self.dateObs, "reduction_date_utc": utcnow, "product_desc": f"Response function coeffs.", "file_path": filePath, "label": "PROD", } ) .to_frame() .T, ], ignore_index=True, ) self.log.debug("completed the ``write_response_function_to_file`` method") return filePath