#!/usr/bin/env python
# encoding: utf-8
"""
*Reduce SOXS/Xshooter data taken in stare mode*
Author
: David Young & Marco Landoni
Date Created
: February 28, 2022
"""
################# GLOBAL IMPORTS ####################
from soxspipe.commonutils import detector_lookup, keyword_lookup
from .base_recipe import base_recipe
from soxspipe.commonutils import subtract_sky
from soxspipe.commonutils.toolkit import (
generic_quality_checks,
get_calibrations_path,
spectroscopic_image_quality_checks,
)
from fundamentals import tools
from builtins import object
import sys
import os
os.environ["TERM"] = "vt100"
[docs]
class soxs_stare(base_recipe):
"""
*Reduce SOXS/Xshooter data taken in stare mode*
**Key Arguments**
- ``log`` -- logger
- ``settings`` -- the settings dictionary
- ``inputFrames`` -- input fits frames. Can be a directory, a set-of-files (SOF) file or a list of fits frame paths.
- ``verbose`` -- verbose. True or False. Default *False*
- ``overwrite`` -- overwrite the product file if it already exists. Default *False*
- ``command`` -- the command called to run the recipe
- ``debug`` -- show debug plots. Default *False*
- ``turnOffMP`` -- turn off multiprocessing. True or False. Default *False*. If True, multiprocessing will be turned off and the recipe will run in serial. This is useful for debugging.
See `produce_product` method for usage.
"""
# Initialisation
def __init__(
self,
log,
settings=False,
inputFrames=[],
verbose=False,
overwrite=False,
command=False,
debug=False,
turnOffMP=False,
):
# INHERIT INITIALISATION FROM base_recipe
super(soxs_stare, self).__init__(
log=log,
settings=settings,
inputFrames=inputFrames,
overwrite=overwrite,
recipeName="soxs-stare",
command=command,
debug=debug,
verbose=verbose,
turnOffMP=turnOffMP,
)
self.log = log
log.debug("instantiating a new 'soxs_stare' object")
self.settings = settings
self.inputFrames = inputFrames
self.verbose = verbose
self.recipeSettings = self.get_recipe_settings()
# xt-self-arg-tmpx
# INITIAL ACTIONS
# CONVERT INPUT FILES TO A CCDPROC IMAGE COLLECTION (inputFrames >
# imagefilecollection)
from soxspipe.commonutils.set_of_files import set_of_files
sof = set_of_files(log=self.log, settings=self.settings, inputFrames=self.inputFrames)
self.inputFrames, self.supplementaryInput = sof.get()
# VERIFY THE FRAMES ARE THE ONES EXPECTED BY SOXS_stare - NO MORE, NO LESS.
# PRINT SUMMARY OF FILES.
self.log.print("# VERIFYING INPUT FRAMES")
self.verify_input_frames()
sys.stdout.flush()
sys.stdout.write("\x1b[1A\x1b[2K")
self.log.print("# VERIFYING INPUT FRAMES - ALL GOOD")
# SORT IMAGE COLLECTION
self.inputFrames.sort(["MJD-OBS"])
if self.verbose:
self.log.print("# RAW INPUT FRAMES - SUMMARY")
self.log.print(self.inputFrames.summary)
# PREPARE THE FRAMES - CONVERT TO ELECTRONS, ADD UNCERTAINTY AND MASK
# EXTENSIONS
self.inputFrames = self.prepare_frames(save=self.settings["save-intermediate-products"])
# GET A TEMPLATE FILENAME USED TO NAME PRODUCTS
if self.sofName:
self.filenameTemplate = self.sofName + ".fits"
else:
self.filenameTemplate = filenamer(log=self.log, frame=self.objectFrame, settings=self.settings)
self.generateReponseCurve = False
return None
[docs]
def produce_product(self):
"""*The code to generate the product of the soxs_stare recipe*
**Return:**
- ``productPath`` -- the path to the final product
**Usage**
```python
from soxspipe.recipes import soxs_stare
recipe = soxs_stare(
log=log,
settings=settings,
inputFrames=fileList
)
stareFrame = recipe.produce_product()
```
"""
self.log.debug("starting the ``produce_product`` method")
from astropy.nddata import CCDData
from astropy import units as u
import pandas as pd
from datetime import datetime
from ccdproc import cosmicray_lacosmic, cosmicray_median
arm = self.arm
kw = self.kw
dp = self.detectorParams
productPath = None
master_bias = False
master_flat = False
dark = False
self.subtractSky = self.recipeSettings["sky-subtraction"]["subtract_sky"]
# OBJECT FRAMES
filter_list = [
{kw("DPR_TYPE"): "OBJECT", kw("DPR_TECH"): "ECHELLE,SLIT,STARE"},
{kw("DPR_TYPE"): "OBJECT,ASYNC", kw("DPR_TECH"): "ECHELLE,SLIT,STARE"},
]
allObjectFrames = []
for add_filters in filter_list:
for i in self.inputFrames.files_filtered(include_path=True, **add_filters):
singleFrame = CCDData.read(
i,
hdu=0,
unit=u.electron,
hdu_uncertainty="ERRS",
hdu_mask="QUAL",
hdu_flags="FLAGS",
key_uncertainty_type="UTYPE",
)
allObjectFrames.append(singleFrame)
# FLUX STD FRAMES
if not len(allObjectFrames):
add_filters = {kw("DPR_TYPE"): "STD,FLUX", kw("DPR_TECH"): "ECHELLE,SLIT,STARE"}
allObjectFrames = []
for i in self.inputFrames.files_filtered(include_path=True, **add_filters):
singleFrame = CCDData.read(
i,
hdu=0,
unit=u.electron,
hdu_uncertainty="ERRS",
hdu_mask="QUAL",
hdu_flags="FLAGS",
key_uncertainty_type="UTYPE",
)
allObjectFrames.append(singleFrame)
self.generateReponseCurve = True
# FLUX STD FRAMES
if not len(allObjectFrames):
add_filters = {kw("DPR_TYPE"): "STD,TELLURIC", kw("DPR_TECH"): "ECHELLE,SLIT,STARE"}
allObjectFrames = []
for i in self.inputFrames.files_filtered(include_path=True, **add_filters):
singleFrame = CCDData.read(
i,
hdu=0,
unit=u.electron,
hdu_uncertainty="ERRS",
hdu_mask="QUAL",
hdu_flags="FLAGS",
key_uncertainty_type="UTYPE",
)
allObjectFrames.append(singleFrame)
if not len(allObjectFrames) and "PAE" in self.settings and self.settings["PAE"]:
add_filters = {kw("DPR_TYPE"): "LAMP,FLAT", kw("DPR_TECH"): "ECHELLE,PINHOLE"}
allObjectFrames = []
for i in self.inputFrames.files_filtered(include_path=True, **add_filters):
singleFrame = CCDData.read(
i,
hdu=0,
unit=u.electron,
hdu_uncertainty="ERRS",
hdu_mask="QUAL",
hdu_flags="FLAGS",
key_uncertainty_type="UTYPE",
)
allObjectFrames.append(singleFrame)
self.log.warning("Processing a ORDER-TRACE frame with the stare-mode recipe")
self.subtractSky = False
if "PAE" in self.settings and self.settings["PAE"]:
self.subtractSky = False
if not len(allObjectFrames):
add_filters = {kw("DPR_TYPE"): "STD,FLUX", kw("DPR_TECH"): "ECHELLE,SLIT,NODDING"}
allObjectFrames = []
for i in self.inputFrames.files_filtered(include_path=True, **add_filters):
singleFrame = CCDData.read(
i,
hdu=0,
unit=u.electron,
hdu_uncertainty="ERRS",
hdu_mask="QUAL",
hdu_flags="FLAGS",
key_uncertainty_type="UTYPE",
)
allObjectFrames.append(singleFrame)
self.log.warning("Processing a NODDING frame with the stare-mode recipe")
combined_object_notflattened = self.clip_and_stack(
frames=allObjectFrames, recipe="soxs_stare", ignore_input_masks=True, post_stack_clipping=False
)
self.dateObs = combined_object_notflattened.header[kw("DATE_OBS")]
add_filters = {kw("PRO_CATG"): "MASTER_BIAS_" + arm}
for i in self.inputFrames.files_filtered(include_path=True, **add_filters):
master_bias = CCDData.read(
i,
hdu=0,
unit=u.electron,
hdu_uncertainty="ERRS",
hdu_mask="QUAL",
hdu_flags="FLAGS",
key_uncertainty_type="UTYPE",
)
# MASTER DARK
add_filters = {kw("PRO_CATG"): "MASTER_DARK_" + arm}
for i in self.inputFrames.files_filtered(include_path=True, **add_filters):
dark = CCDData.read(
i,
hdu=0,
unit=u.electron,
hdu_uncertainty="ERRS",
hdu_mask="QUAL",
hdu_flags="FLAGS",
key_uncertainty_type="UTYPE",
)
if not dark:
# NIR DARK
add_filters = {kw("DPR_TYPE"): "OBJECT", kw("DPR_TECH"): "IMAGE"}
for i in self.inputFrames.files_filtered(include_path=True, **add_filters):
dark = CCDData.read(
i,
hdu=0,
unit=u.electron,
hdu_uncertainty="ERRS",
hdu_mask="QUAL",
hdu_flags="FLAGS",
key_uncertainty_type="UTYPE",
)
if "PAE" in self.settings and self.settings["PAE"]:
add_filters = {kw("DPR_TYPE"): "FLAT,LAMP", kw("DPR_TECH"): "IMAGE"}
for i in self.inputFrames.files_filtered(include_path=True, **add_filters):
dark = CCDData.read(
i,
hdu=0,
unit=u.electron,
hdu_uncertainty="ERRS",
hdu_mask="QUAL",
hdu_flags="FLAGS",
key_uncertainty_type="UTYPE",
)
# UVB/VIS/NIR FLAT
add_filters = {kw("PRO_CATG"): "MASTER_FLAT_" + arm}
for i in self.inputFrames.files_filtered(include_path=True, **add_filters):
master_flat = CCDData.read(
i,
hdu=0,
unit=u.electron,
hdu_uncertainty="ERRS",
hdu_mask="QUAL",
hdu_flags="FLAGS",
key_uncertainty_type="UTYPE",
)
# FIND THE ORDER TABLE
filterDict = {kw("PRO_CATG"): f"ORDER_TAB_{arm}"}
orderTablePath = self.inputFrames.filter(**filterDict).files_filtered(include_path=True)[0]
# FIND THE 2D MAP TABLE
filterDict = {kw("PRO_CATG"): f"DISP_TAB_{arm}"}
dispMap = self.inputFrames.filter(**filterDict).files_filtered(include_path=True)[0]
# FIND THE 2D MAP IMAGE
filterDict = {kw("PRO_CATG"): f"DISP_IMAGE_{arm}"}
twoDMap = self.inputFrames.filter(**filterDict).files_filtered(include_path=True)[0]
# FIND THE RESPONSE FUNCTION, IF PRESENT
try:
filterDict = {kw("PRO_CATG"): f"RESP_TAB_{arm}"}
responseFunctionPath = self.inputFrames.filter(**filterDict).files_filtered(include_path=True)[0]
except:
responseFunctionPath = False
try:
if not self.recipeSettings["use_flat"]:
master_flat = False
except:
master_flat = False
combined_object = self.detrend(
inputFrame=combined_object_notflattened,
master_bias=master_bias,
dark=dark,
master_flat=master_flat,
order_table=orderTablePath,
)
# INJECT KEYWORDS INTO HEADER
self.update_fits_keywords(frame=combined_object)
from soxspipe.commonutils.toolkit import quicklook_image
quicklook_image(
log=self.log,
CCDObject=combined_object,
show=False,
ext=False,
stdWindow=3,
title=False,
surfacePlot=False,
dispMap=dispMap,
dispMapImage=twoDMap,
settings=self.settings,
skylines=False,
)
if self.subtractSky:
skymodel = subtract_sky(
log=self.log,
settings=self.settings,
recipeSettings=self.recipeSettings,
objectFrame=combined_object,
twoDMap=twoDMap,
qcTable=self.qc,
productsTable=self.products,
dispMap=dispMap,
sofName=self.sofName,
recipeName=self.recipeName,
startNightDate=self.startNightDate,
debug=self.debug,
)
skymodelCCDData, skySubtractedCCDData, skySubtractedResidualsCCDData, self.qc, self.products = (
skymodel.subtract()
)
if skymodelCCDData is None:
self.subtractSky = False
self.recipeSettings["sky-subtraction"]["subtract_sky"] = False
skymodelCCDData = False
skySubtractedCCDData = combined_object
self.recipeSettings["horne-extraction-slit-length"] = 30.0
else:
# WRITE SKY-SUBTRACTON TO DISK
filename = self.filenameTemplate.replace(".fits", "_SKYSUB.fits")
productPath = self._write(
frame=skySubtractedCCDData,
filedir=self.workspaceRootPath,
filename=filename,
overwrite=True,
maskToZero=True,
)
filename = os.path.basename(productPath)
utcnow = datetime.utcnow()
utcnow = utcnow.strftime("%Y-%m-%dT%H:%M:%S")
self.products = pd.concat(
[
self.products,
pd.Series(
{
"soxspipe_recipe": "soxs-stare",
"product_label": "SKY_SUBTRACTED_OBJECT",
"file_name": filename,
"file_type": "FITS",
"obs_date_utc": self.dateObs,
"reduction_date_utc": utcnow,
"product_desc": f"The sky-subtracted object",
"file_path": productPath,
"label": "PROD",
}
)
.to_frame()
.T,
],
ignore_index=True,
)
# WRITE SKY-MODEL TO DISK
filename = self.filenameTemplate.replace(".fits", "_SKYMODEL.fits")
productPath = self._write(
frame=skymodelCCDData, filedir=self.workspaceRootPath, filename=filename, overwrite=True
)
filename = os.path.basename(productPath)
self.products = pd.concat(
[
self.products,
pd.Series(
{
"soxspipe_recipe": "soxs-stare",
"product_label": "SKY_MODEL",
"file_name": filename,
"file_type": "FITS",
"obs_date_utc": self.dateObs,
"reduction_date_utc": utcnow,
"product_desc": f"The sky background model",
"file_path": productPath,
"label": "PROD",
}
)
.to_frame()
.T,
],
ignore_index=True,
)
if True:
# WRITE SKY-MODEL TO DISK
filename = self.filenameTemplate.replace(".fits", "_SKYSUB_RESIDUALS.fits")
productPath = self._write(
frame=skySubtractedResidualsCCDData,
filedir=self.workspaceRootPath,
filename=filename,
overwrite=True,
)
filename = os.path.basename(productPath)
self.products = pd.concat(
[
self.products,
pd.Series(
{
"soxspipe_recipe": "soxs-stare",
"product_label": "SKY_SUB_RESIDUALS",
"file_name": filename,
"file_type": "FITS",
"obs_date_utc": self.dateObs,
"reduction_date_utc": utcnow,
"product_desc": f"The sky subtraction residuals",
"file_path": productPath,
"label": "PROD",
}
)
.to_frame()
.T,
],
ignore_index=True,
)
# ADD QUALITY CHECKS
self.qc = generic_quality_checks(
log=self.log,
frame=skySubtractedCCDData,
settings=self.settings,
recipeName=self.recipeName,
qcTable=self.qc,
)
self.qc = spectroscopic_image_quality_checks(
log=self.log,
frame=skySubtractedCCDData,
settings=self.settings,
recipeName=self.recipeName,
qcTable=self.qc,
orderTablePath=orderTablePath,
)
else:
skymodelCCDData = False
skySubtractedCCDData = combined_object
if self.subtractSky:
skymodel = subtract_sky(
log=self.log,
settings=self.settings,
recipeSettings=self.recipeSettings,
objectFrame=combined_object_notflattened,
twoDMap=twoDMap,
qcTable=self.qc,
productsTable=self.products,
dispMap=dispMap,
sofName=self.sofName,
recipeName=self.recipeName,
startNightDate=self.startNightDate,
debug=self.debug,
)
(
unflattenedSkymodelCCDData,
unflattenedSkySubtractedCCDData,
unflattenedSkySubtractedResidualsCCDData,
self.qc,
self.products,
) = skymodel.subtract()
else:
unflattenedSkySubtractedCCDData = combined_object_notflattened
unflattenedSkymodelCCDData = False
from soxspipe.commonutils import horne_extraction
optimalExtractor = horne_extraction(
log=self.log,
skyModelFrame=skymodelCCDData,
skySubtractedFrame=skySubtractedCCDData,
unflattenedFrame=unflattenedSkySubtractedCCDData,
twoDMapPath=twoDMap,
settings=self.settings,
recipeSettings=self.recipeSettings,
recipeName=self.recipeName,
qcTable=self.qc,
productsTable=self.products,
dispersionMap=dispMap,
sofName=self.sofName,
startNightDate=self.startNightDate,
debug=self.debug,
turnOffMP=self.turnOffMP,
)
self.qc, self.products, mergedSpectumDF, orderJoins, extractionPath = optimalExtractor.extract()
# CHECK IF FLUX CALIBRATION IS NEEDED
filePath_fluxcal = None
forceFailure = False
if responseFunctionPath:
from soxspipe.commonutils import flux_calibration
calibrationRootPath = get_calibrations_path(log=self.log, settings=self.settings)
detectorParams = detector_lookup(log=self.log, settings=self.settings).get(self.arm)
self.log.print(f"# FLUX CALIBRATING THE SPECTRUM\n")
fluxCalibrator = flux_calibration(
log=self.log,
responseFunction=responseFunctionPath,
extractedSpectrum=mergedSpectumDF,
settings=self.settings,
airmass=combined_object.header.get("HIERARCH ESO TEL AIRM END"),
exptime=combined_object.header.get("EXPTIME"),
extinctionPath=calibrationRootPath + "/" + detectorParams["extinction"],
arm=self.arm,
header=combined_object.header,
recipeName=self.recipeName,
startNightDate=self.startNightDate,
sofName=self.sofName,
debug=self.debug,
)
filePath_fluxcal, prod = fluxCalibrator.calibrate()
self.products = pd.concat([self.products, prod], ignore_index=True)
# self.qc, self.products, calibratedSpectrumDF, calibrationPath = fluxCalibrator.calibrate()
self.log.print(f"# FLUX CALIBRATION COMPLETED\n")
elif self.generateReponseCurve:
optimalExtractor = horne_extraction(
log=self.log,
skyModelFrame=unflattenedSkymodelCCDData,
skySubtractedFrame=unflattenedSkySubtractedCCDData,
unflattenedFrame=unflattenedSkySubtractedCCDData,
twoDMapPath=twoDMap,
settings=self.settings,
recipeSettings=self.recipeSettings,
recipeName=self.recipeName,
qcTable=self.qc,
productsTable=self.products,
dispersionMap=dispMap,
sofName=self.sofName,
startNightDate=self.startNightDate,
debug=self.debug,
notFlattened=True,
turnOffMP=self.turnOffMP,
)
self.qc, self.products, _, _, extractionPath_notflat = optimalExtractor.extract()
# GETTING THE RESPONSE
from soxspipe.commonutils import response_function
self.log.print(f"# CALCULATING RESPONSE FUNCTION\n")
response = response_function(
log=self.log,
settings=self.settings,
recipeName=self.recipeName,
sofName=self.sofName,
stdExtractionPath=extractionPath,
qcTable=self.qc,
productsTable=self.products,
startNightDate=self.startNightDate,
stdNotFlatExtractionPath=extractionPath_notflat,
orderJoins=orderJoins,
)
self.qc, self.products, forceFailure = response.get()
from soxspipe.commonutils.toolkit import plot_merged_spectrum_qc
self.products, filePath = plot_merged_spectrum_qc(
merged_orders=mergedSpectumDF,
products=self.products,
log=self.log,
qcDir=self.qcDir,
filenameTemplate=self.filenameTemplate,
noddingSequence=None,
dateObs=self.dateObs,
arm=self.arm,
recipeName=self.recipeName,
orderJoins=orderJoins,
debug=self.debug,
fluxCalibrated=False,
qcTable=self.qc,
settings=self.settings,
)
if filePath_fluxcal:
from astropy.table import Table
from astropy.io import fits
from astropy import units as u
fluxcal_spec = Table.read(filePath_fluxcal, format="fits")
fluxcal_spec["WAVE"] = fluxcal_spec["WAVE"] * u.nm
fluxcal_spec["FLUX_COUNTS"] = fluxcal_spec["FLUX_CALIBRATED"] # BACK COMPATIBILITY WITH THE CODE
fluxcal_spec["SNR"] = mergedSpectumDF["SNR"]
fluxcal_spec["SKY_COUNTS"] = mergedSpectumDF["SKY_COUNTS"]
self.products, filePath = plot_merged_spectrum_qc(
merged_orders=fluxcal_spec,
products=self.products,
log=self.log,
qcDir=self.qcDir,
filenameTemplate=self.filenameTemplate,
noddingSequence=None,
dateObs=self.dateObs,
arm=self.arm,
recipeName=self.recipeName,
orderJoins=orderJoins,
debug=self.debug,
fluxCalibrated=True,
qcTable=self.qc,
settings=self.settings,
)
qcTable = self.report_output()
self.clean_up(forceFail=forceFailure)
self.log.debug("completed the ``produce_product`` method")
return productPath, qcTable
# use the tab-trigger below for new method
# xt-class-method
# Override Method Attributes
# method-override-tmpx