#!/usr/bin/env python
# encoding: utf-8
"""
*Reduce SOXS/Xshooter data taken in nodding mode*
Author
: David Young & Marco Landoni
Date Created
: February 27, 2024
"""
################# GLOBAL IMPORTS ####################
from soxspipe.commonutils import keyword_lookup
from .base_recipe import base_recipe
from soxspipe.commonutils.toolkit import (
add_snr_efficiency_qcs,
generic_quality_checks,
spectroscopic_image_quality_checks,
get_calibrations_path,
)
from fundamentals import tools
from builtins import object
import sys
import os
from soxspipe.commonutils.filenamer import filenamer
from os.path import expanduser
os.environ["TERM"] = "vt100"
# TODO: When combining spectra at the end, we use a simple sum. If we use sigma-clipping followed by a mean combine, we can remove CRHs for data sets with more than 1 AB cycle.
[docs]
class soxs_nod(base_recipe):
"""
*Reduce SOXS/Xshooter data taken in nodding mode*
**Key Arguments**
- ``log`` -- logger
- ``settings`` -- the settings dictionary
- ``inputFrames`` -- input fits frames. Can be a directory, a set-of-files (SOF) file or a list of fits frame paths.
- ``verbose`` -- verbose. True or False. Default *False*
- ``overwrite`` -- overwrite the product file if it already exists. Default *False*
- ``command`` -- the command called to run the recipe
- ``debug`` -- generate debug plots. Default *False*
- ``turnOffMP`` -- turn off multiprocessing. True or False. Default *False*. If True, multiprocessing will be turned off and the recipe will run in serial. This is useful for debugging.
- ``recipeName`` -- the name of the recipe. Default "soxs-nod". This is used to retrieve the recipe settings from the settings dictionary and to name the product file (soxs_offset inherits soxs_nod)
**Usage**
```python
from soxspipe.recipes import soxs_nod
recipe = soxs_nod(
log=log,
settings=settings,
inputFrames=fileList
).produce_product()
```
"""
def __init__(
self,
log,
settings=False,
inputFrames=[],
verbose=False,
overwrite=False,
command=False,
debug=False,
turnOffMP=False,
recipeName="soxs-nod",
):
# INHERIT INITIALISATION FROM base_recipe
super(soxs_nod, self).__init__(
log=log,
settings=settings,
inputFrames=inputFrames,
overwrite=overwrite,
recipeName=recipeName,
command=command,
debug=debug,
verbose=verbose,
turnOffMP=turnOffMP,
)
self.log = log
log.debug("instansiating a new 'soxs_nod' object")
self.settings = settings
self.inputFrames = inputFrames
self.verbose = verbose
# INITIAL ACTIONS
# CONVERT INPUT FILES TO A CCDPROC IMAGE COLLECTION (inputFrames >
# imagefilecollection)
from soxspipe.commonutils.set_of_files import set_of_files
sof = set_of_files(
log=self.log,
settings=self.settings,
inputFrames=self.inputFrames,
recipeName=self.recipeName,
ext=self.settings["data-extension"],
)
self.inputFrames, self.supplementaryInput = sof.get()
# VERIFY THE FRAMES ARE THE ONES EXPECTED BY SOXS_nod - NO MORE, NO LESS.
# PRINT SUMMARY OF FILES.
self.log.print("# VERIFYING INPUT FRAMES")
self.verify_input_frames()
sys.stdout.flush()
sys.stdout.write("\x1b[1A\x1b[2K")
self.log.print("# VERIFYING INPUT FRAMES - ALL GOOD")
# SORT IMAGE COLLECTION
self.inputFrames.sort(["MJD-OBS"])
if self.verbose:
self.log.print("# RAW INPUT FRAMES - SUMMARY")
self.log.print(self.inputFrames.summary)
self.log.print("\n")
# PREPARE THE FRAMES - CONVERT TO ELECTRONS, ADD UNCERTAINTY AND MASK
# EXTENSIONS
self.inputFrames = self.prepare_frames(save=self.settings["save-intermediate-products"])
return None
[docs]
def produce_product(self):
"""*The code to generate the product of the soxs_nod recipe*
**Return:**
- ``productPath`` -- the path to the final product
**Usage**
```python
from soxspipe.recipes import soxs_nod
recipe = soxs_nod(
log=log,
settings=settings,
inputFrames=fileList
)
nodFrame = recipe.produce_product()
```
"""
self.log.debug("starting the ``produce_product`` method")
from astropy.nddata import CCDData
from astropy import units as u
import pandas as pd
from datetime import datetime
from soxspipe.commonutils.toolkit import quicklook_image, plot_merged_spectrum_qc
arm = self.arm
kw = self.kw
dp = self.detectorParams
productPath = None
master_bias = False
master_flat = False
dark = False
# OBJECT/STANDARD FRAMES
types = ["OBJECT", "STD,FLUX", "STD,TELLURIC"]
allObjectFrames, allFilenames = [], []
self.masterHeaderFrame = False
for t in types:
add_filters = {kw("DPR_TYPE"): t, kw("DPR_TECH"): "ECHELLE,SLIT,NODDING"}
for i in self.inputFrames.files_filtered(include_path=True, **add_filters):
if t == "STD,FLUX" and "-std" not in self.recipeName:
self.recipeName += "-std"
self.recipeSettings = self.get_recipe_settings()
self.productDir = self.productDir.replace("soxs-nod", "soxs-nod-std")
singleFrame = CCDData.read(
i,
hdu=0,
unit=u.electron,
hdu_uncertainty="ERRS",
hdu_mask="QUAL",
hdu_flags="FLAGS",
key_uncertainty_type="UTYPE",
)
allObjectFrames.append(singleFrame)
allFilenames.append(os.path.basename(i).replace("_pre.", "."))
if not self.masterHeaderFrame:
self.masterHeaderFrame = singleFrame.copy()
if len(allObjectFrames):
break
# UVB/VIS/NIR FLAT
add_filters = {kw("PRO_CATG"): "MASTER_FLAT_" + arm}
for i in self.inputFrames.files_filtered(include_path=True, **add_filters):
master_flat = CCDData.read(
i,
hdu=0,
unit=u.electron,
hdu_uncertainty="ERRS",
hdu_mask="QUAL",
hdu_flags="FLAGS",
key_uncertainty_type="UTYPE",
)
# FIND THE ORDER TABLE
filterDict = {kw("PRO_CATG"): f"ORDER_TAB_{arm}"}
orderTablePath = self.inputFrames.filter(**filterDict).files_filtered(include_path=True)[0]
# FIND THE 2D MAP TABLE
filterDict = {kw("PRO_CATG"): f"DISP_TAB_{arm}"}
self.dispMap = self.inputFrames.filter(**filterDict).files_filtered(include_path=True)[0]
# FIND THE 2D MAP IMAGE
filterDict = {kw("PRO_CATG"): f"DISP_IMAGE_{arm}"}
self.twoDMap = self.inputFrames.filter(**filterDict).files_filtered(include_path=True)[0]
# CHECK IF FLUX CALIBRATION IS REQUESTED
try:
filterDict = {kw("PRO_CATG"): f"RESP_TAB_{arm}"}
responseFunctionPath = self.inputFrames.filter(**filterDict).files_filtered(include_path=True)[0]
except:
responseFunctionPath = False
quicklook_image(
log=self.log,
CCDObject=allObjectFrames[0],
show=False,
ext=False,
stdWindow=3,
title=False,
surfacePlot=True,
saveToPath=False,
)
# DIVIDING IN A AND B SEQUENCES
allFrameA, allFrameB, allFrameAOffsets, allFrameBOffsets, allFrameANames, allFrameBNames = (
[],
[],
[],
[],
[],
[],
)
# CUMOFF Y IS THE OFFSET IN THE Y DIRECTION OF THE NODDING SEQUENCE. POSITIVE A, NEGATIVE B
for frame, filename in zip(allObjectFrames, allFilenames):
# offset = frame.header[kw(f"NOD_CUMULATIVE_OFFSET{self.axisA.upper()}")]
offset = frame.header[kw(f"NOD_CUMULATIVE_OFFSETY")]
if offset == 0:
pass
if offset > 0:
allFrameAOffsets.append(offset)
allFrameA.append(frame)
allFrameANames.append(filename)
else:
allFrameBOffsets.append(offset)
allFrameB.append(frame)
allFrameBNames.append(filename)
uniqueOffsets = list(set(allFrameAOffsets))
if len(allFrameAOffsets) != len(allFrameBOffsets):
error = f"Found {len(allFrameAOffsets)} A frames and {len(allFrameBOffsets)} B frames. The number of A and B frames must be the same for nodding reductions."
self.log.error(
f"Found {len(allFrameAOffsets)} A frames and {len(allFrameBOffsets)} B frames. The number of A and B frames must be the same for nodding reductions."
)
raise Exception(error)
if len(uniqueOffsets) == 0:
error = f"Did not find frames with a positive offset. Please check the `NOD_CUMULATIVE_OFFSETY` header keyword in the providing nodding frames."
self.log.error(
f"Did not find frames with a positive offset. Please check the `NOD_CUMULATIVE_OFFSETY` header keyword in the providing nodding frames."
)
raise Exception(error)
elif len(uniqueOffsets) > 1:
s = "S"
else:
s = ""
self.log.print(
f"# PROCESSING {len(allFrameAOffsets)} AB NODDING CYCLES WITH {len(uniqueOffsets)} UNIQUE PAIR{s} OF OFFSET LOCATIONS"
)
forceFailure = False
if len(allFrameAOffsets) > 1 and len(uniqueOffsets) > 1:
allSpectrumA = []
allSpectrumB = []
sequenceCount = 1
# SORT frameA and frameB looping at their MJDOBS keyword in the header in order to the closest A and B frames in time
allFrameA.sort(key=lambda x: x.header["MJD-OBS"])
allFrameB.sort(key=lambda x: x.header["MJD-OBS"])
for frameA, frameB, frameAName, frameBName in zip(allFrameA, allFrameB, allFrameANames, allFrameBNames):
self.log.print(f"Processing AB Nodding Sequence {sequenceCount}")
if False:
quicklook_image(
log=self.log,
CCDObject=frameA,
show=True,
ext=False,
stdWindow=1,
title=False,
surfacePlot=False,
saveToPath=False,
)
quicklook_image(
log=self.log,
CCDObject=frameB,
show=False,
ext=False,
stdWindow=1,
title=False,
surfacePlot=False,
saveToPath=False,
)
# Save frameA and frameB to disk in temporary file
home = expanduser("~")
filenameA = self.sofName + f"_A_{sequenceCount}.fits"
filenameB = self.sofName + f"_B_{sequenceCount}.fits"
filePathA = f"{self.productDir}/{filenameA}"
filePathB = f"{self.productDir}/{filenameB}"
frameA.write(filePathA, overwrite=True, checksum=True)
frameB.write(filePathB, overwrite=True, checksum=True)
rawFrames = []
if "ARCFILE" in frameA.header:
rawFrames.append(frameA.header["ARCFILE"])
rawFrames.append(frameB.header["ARCFILE"])
elif "ORIGFILE" in frameA.header:
rawFrames.append(frameA.header["ORIGFILE"])
rawFrames.append(frameB.header["ORIGFILE"])
else:
rawFrames.append(frameAName)
rawFrames.append(frameBName)
# INJECT KEYWORDS INTO HEADER
self.update_fits_keywords(frame=frameA, rawFrames=rawFrames)
self.update_fits_keywords(frame=frameB, rawFrames=rawFrames)
if self.recipeSettings["use_flat"] and master_flat:
masterFlat = master_flat
else:
masterFlat = False
# PROCESSING SINGLE SEQUENCE
mergedSpectrumDF_A, mergedSpectrumDF_B, orderJoins = self.process_single_ab_nodding_cycle(
aFrame=frameA,
bFrame=frameB,
locationSetIndex=sequenceCount,
orderTablePath=orderTablePath,
masterFlat=masterFlat,
)
if sequenceCount == 1:
allSpectrumA = mergedSpectrumDF_A
allSpectrumB = mergedSpectrumDF_B
else:
allSpectrumA = pd.concat([allSpectrumA, mergedSpectrumDF_A])
allSpectrumB = pd.concat([allSpectrumB, mergedSpectrumDF_B])
sequenceCount += 1
stackedSpectrum, extractionPath = self.stack_extractions(
[allSpectrumA, allSpectrumB], orderJoins=orderJoins
)
if self.generateReponseCurve:
forceFailure = True
else:
# STACKING A AND B SEQUENCES - ONLY IF JITTER IS NOT PRESENT
aFrame = self.clip_and_stack(
frames=allFrameA, recipe="soxs_nod", ignore_input_masks=False, post_stack_clipping=True
)
bFrame = self.clip_and_stack(
frames=allFrameB, recipe="soxs_nod", ignore_input_masks=False, post_stack_clipping=True
)
# INJECT KEYWORDS INTO HEADER
self.update_fits_keywords(frame=aFrame)
self.update_fits_keywords(frame=bFrame)
if self.recipeSettings["use_flat"] and master_flat:
masterFlat = master_flat
else:
masterFlat = False
mergedSpectrumDF_A, mergedSpectrumDF_B, orderJoins = self.process_single_ab_nodding_cycle(
aFrame=aFrame, bFrame=bFrame, locationSetIndex=1, orderTablePath=orderTablePath, masterFlat=masterFlat
)
stackedSpectrum, extractionPath = self.stack_extractions(
[mergedSpectrumDF_A, mergedSpectrumDF_B], orderJoins=orderJoins
)
if self.generateReponseCurve:
from soxspipe.commonutils import response_function
mergedSpectrumDF_A, mergedSpectrumDF_B, orderJoins = self.process_single_ab_nodding_cycle(
aFrame=aFrame,
bFrame=bFrame,
locationSetIndex=1,
orderTablePath=orderTablePath,
notFlattened=True,
masterFlat=masterFlat,
)
stackedSpectrum_notflat, extractionPath_notflat = self.stack_extractions(
[mergedSpectrumDF_A, mergedSpectrumDF_B], notFlattened=True, orderJoins=orderJoins
)
# GETTING THE RESPONSE
self.log.print(f"# CALCULATING RESPONSE FUNCTION\n")
response = response_function(
log=self.log,
settings=self.settings,
recipeName=self.recipeName,
sofName=self.sofName,
stdExtractionPath=extractionPath,
qcTable=self.qc,
productsTable=self.products,
startNightDate=self.startNightDate,
stdNotFlatExtractionPath=extractionPath_notflat,
orderJoins=orderJoins,
)
self.qc, self.products, forceFailure = response.get()
# CHECK IF FLUX CALIBRATION IS REQUESTED
filePath_fluxcal = None
if responseFunctionPath:
calibrationRootPath = get_calibrations_path(log=self.log, settings=self.settings)
from soxspipe.commonutils.flux_calibration import flux_calibration
self.log.print(f"# PERFORMING FLUX CALIBRATION\n")
# TODO CHECK IF TAKING THE HEADER OF ONE FRAME IS OK
fluxCalibrator = flux_calibration(
log=self.log,
responseFunction=responseFunctionPath,
extractedSpectrum=stackedSpectrum,
settings=self.settings,
airmass=allFrameA[0].header.get("HIERARCH ESO TEL AIRM END"),
exptime=allFrameA[0].header.get("EXPTIME"),
extinctionPath=calibrationRootPath + "/" + self.detectorParams["extinction"],
arm=self.arm,
header=allFrameA[0].header,
recipeName=self.recipeName,
startNightDate=self.startNightDate,
sofName=self.sofName,
debug=self.debug,
)
filePath_fluxcal, products = fluxCalibrator.calibrate()
self.products = pd.concat([self.products, products], ignore_index=True)
self.log.print(f"# FLUX CALIBRATION COMPLETED\n")
self.products, filePath = plot_merged_spectrum_qc(
merged_orders=stackedSpectrum,
products=self.products,
log=self.log,
qcDir=self.qcDir,
filenameTemplate=self.filenameTemplate,
noddingSequence=False,
dateObs=self.dateObs,
arm=self.arm,
recipeName=self.recipeName,
orderJoins=orderJoins,
debug=self.debug,
fluxCalibrated=False,
qcTable=self.qc,
settings=self.settings,
)
if filePath_fluxcal:
from astropy.table import Table
from astropy.io import fits
from astropy import units as u
fluxcal_spec = Table.read(filePath_fluxcal, format="fits")
fluxcal_spec["WAVE"] = fluxcal_spec["WAVE"] * u.nm
fluxcal_spec["FLUX_COUNTS"] = fluxcal_spec["FLUX_CALIBRATED"] # BACK COMPATIBILITY WITH THE CODE
# ADD THE SNR COLUMN AND COPY VALUES FROM stackedSpectrum
fluxcal_spec["SNR"] = stackedSpectrum["SNR"]
self.products, filePath = plot_merged_spectrum_qc(
merged_orders=fluxcal_spec,
products=self.products,
log=self.log,
qcDir=self.qcDir,
filenameTemplate=self.filenameTemplate,
noddingSequence=False,
dateObs=self.dateObs,
arm=self.arm,
recipeName=self.recipeName,
orderJoins=orderJoins,
debug=self.debug,
fluxCalibrated=True,
qcTable=self.qc,
settings=self.settings,
)
qcTable = self.report_output()
self.clean_up(forceFail=forceFailure)
self.log.debug("completed the ``produce_product`` method")
return productPath, qcTable
[docs]
def process_single_ab_nodding_cycle(
self, aFrame, bFrame, locationSetIndex, orderTablePath, notFlattened=False, masterFlat=False
):
"""*process a single AB nodding cycle*
**Key Arguments:**
- ``aFrame`` -- the frame taken at the A location. CCDData object.
- ``bFrame`` -- the frame taken at the B location. CCDDate object.
- ``locationSetIndex`` -- the index of the AB cycle
- ``orderTablePath`` -- path to the order table
- ``notFlattened`` -- if True, the extraction is performed on non-flattened data. Default *False*
- ``masterFlat`` -- path to the master flat frame. Default *False*
**Return:**
- ``mergedSpectrumDF_A`` -- the order merged spectrum of nodding location A (dataframe)
- ``mergedSpectrumDF_B`` -- the order merged spectrum of nodding location B (dataframe)
**Usage:**
```python
mergedSpectrumDF_A, mergedSpectrumDF_B = soxs_nod.process_single_ab_nodding_cycle(
aFrame=aFrame, bFrame=bFrame, locationSetIndex=1, orderTablePath=orderTablePath, masterFlat=masterFlat)
```
"""
self.log.debug("starting the ``process_single_ab_nodding_cycle`` method")
from soxspipe.commonutils import horne_extraction
import pandas as pd
# SUBTRACTING A FROM B
A_minus_B_notflattened = aFrame.subtract(bFrame)
if "nod" in self.recipeName:
B_minus_A_notflattened = bFrame.subtract(aFrame)
# REAPPLYING HEADERS
hdr_A = aFrame.header
hdr_B = bFrame.header
A_minus_B_notflattened.header = hdr_A
if "nod" in self.recipeName:
B_minus_A_notflattened.header = hdr_B
# WRITE IN A FITS FILE THE A-B AND B-A FRAMES
if notFlattened:
extraText = " (not flattened this time - needed to calculate efficiency)"
else:
extraText = ""
if "nod" in self.recipeName:
self.log.print(f"\n# PROCESSING AB NODDING CYCLE {locationSetIndex} {extraText}")
else:
self.log.print(f"\n# PROCESSING ON-OFF OFFSET CYCLE {locationSetIndex} {extraText}")
home = expanduser("~")
if "nod" in self.recipeName:
filename = self.sofName + f"_AB_{locationSetIndex}.fits"
filePath = f"{self.productDir}/{filename}"
A_minus_B_notflattened.write(filePath, overwrite=True, checksum=True)
filename = self.sofName + f"_BA_{locationSetIndex}.fits"
filePath = f"{self.productDir}/{filename}"
B_minus_A_notflattened.write(filePath, overwrite=True, checksum=True)
else:
filename = self.sofName + f"_ONOFF_{locationSetIndex}.fits"
filePath = f"{self.productDir}/{filename}"
A_minus_B_notflattened.write(filePath, overwrite=True, checksum=True)
if False:
from soxspipe.commonutils.toolkit import quicklook_image
quicklook_image(
log=self.log,
CCDObject=A_minus_B_notflattened,
show=True,
ext="data",
stdWindow=3,
title=False,
surfacePlot=True,
saveToPath=False,
)
if "nod" in self.recipeName:
quicklook_image(
log=self.log,
CCDObject=B_minus_A_notflattened,
show=True,
ext="data",
stdWindow=3,
title=False,
surfacePlot=True,
saveToPath=False,
)
# TODO: ADD THESE CHECKS .... LIKELY FOR EACH AB CYCLE INDEX
if True:
self.qc = generic_quality_checks(
log=self.log,
frame=A_minus_B_notflattened,
settings=self.settings,
recipeName=self.recipeName,
qcTable=self.qc,
)
self.qc = spectroscopic_image_quality_checks(
log=self.log,
frame=A_minus_B_notflattened,
settings=self.settings,
recipeName=self.recipeName,
qcTable=self.qc,
orderTablePath=orderTablePath,
)
if self.recipeSettings["save_single_frame_extractions"] == False:
theseProducts = False
else:
theseProducts = self.products
if not isinstance(masterFlat, bool) and notFlattened == False:
A_minus_B = self.detrend(
inputFrame=A_minus_B_notflattened,
master_bias=False,
dark=False,
master_flat=masterFlat,
order_table=orderTablePath,
)
if "nod" in self.recipeName:
B_minus_A = self.detrend(
inputFrame=B_minus_A_notflattened,
master_bias=False,
dark=False,
master_flat=masterFlat,
order_table=orderTablePath,
)
else:
A_minus_B = A_minus_B_notflattened
if "nod" in self.recipeName:
B_minus_A = B_minus_A_notflattened
# EXTRACT THE A MINUS B FRAME
optimalExtractor = horne_extraction(
log=self.log,
skyModelFrame=False,
skySubtractedFrame=A_minus_B,
unflattenedFrame=A_minus_B_notflattened,
twoDMapPath=self.twoDMap,
settings=self.settings,
recipeName=self.recipeName,
recipeSettings=self.recipeSettings,
qcTable=self.qc,
productsTable=theseProducts,
dispersionMap=self.dispMap,
sofName=self.sofName,
locationSetIndex=locationSetIndex,
startNightDate=self.startNightDate,
debug=self.debug,
notFlattened=notFlattened,
turnOffMP=self.turnOffMP,
)
self.qc, theseProducts, mergedSpectrumDF_A, orderJoins, extractionFITSPathA = optimalExtractor.extract()
# EXTRACT THE B MINUS A FRAME
if "nod" in self.recipeName:
optimalExtractor = horne_extraction(
log=self.log,
skyModelFrame=False,
skySubtractedFrame=B_minus_A,
unflattenedFrame=B_minus_A_notflattened,
twoDMapPath=self.twoDMap,
settings=self.settings,
recipeName=self.recipeName,
recipeSettings=self.recipeSettings,
qcTable=self.qc,
productsTable=theseProducts,
dispersionMap=self.dispMap,
sofName=self.sofName,
locationSetIndex=locationSetIndex,
startNightDate=self.startNightDate,
debug=self.debug,
notFlattened=notFlattened,
turnOffMP=self.turnOffMP,
)
self.qc, theseProducts, mergedSpectrumDF_B, orderJoins, extractionFITSPathB = optimalExtractor.extract()
else:
mergedSpectrumDF_B = False
if self.recipeSettings["save_single_frame_extractions"] == True:
self.products = theseProducts
self.log.debug("completed the ``process_single_ab_nodding_cycle`` method")
return mergedSpectrumDF_A, mergedSpectrumDF_B, orderJoins