#!/usr/bin/env python
# encoding: utf-8
"""
*The base recipe class which all other recipes inherit*
Author
: David Young & Marco Landoni
Date Created
: January 22, 2020
"""
################# GLOBAL IMPORTS ####################
from soxspipe.commonutils import filenamer
from soxspipe.commonutils import detector_lookup
from soxspipe.commonutils import keyword_lookup
from soxspipe.commonutils import subtract_background
from fundamentals import tools
from builtins import object
import sys
import os
os.environ["TERM"] = "vt100"
[docs]
class base_recipe(object):
"""
The base recipe class which all other recipes inherit
**Key Arguments:**
- ``log`` -- logger
- ``settings`` -- the settings dictionary
- ``inputFrames`` -- input fits frames. Can be a directory, a set-of-files (SOF) file or a list of fits frame paths.
- ``verbose`` -- verbose. True or False. Default *False*
- ``overwrite`` -- overwrite the product file if it already exists. Default *False*
- ``recipeName`` -- name of the recipe as it appears in the settings dictionary. Default *False*
- ``command`` -- the command called to run the recipe
- ``debug`` -- debug mode. True or False. Default *False*
- ``turnOffMP`` -- turn off multiprocessing. True or False. Default *False*
**Usage**
To use this base recipe to create a new `soxspipe` recipe, have a look at the code for one of the simpler recipes (e.g. `soxs_mbias`) - copy and modify the code.
"""
def __init__(
self,
log,
settings=False,
inputFrames=False,
verbose=False,
overwrite=False,
recipeName=False,
command=False,
debug=False,
turnOffMP=False,
):
import yaml
import pandas as pd
from soxspipe.commonutils import toolkit
import sqlite3 as sql
import matplotlib
import random
log.debug("instantiating a new '__init__' object")
self.recipeName = recipeName
self.settings = settings
self.debug = debug
self.workspaceRootPath = self._absolute_path(settings["workspace-root-dir"])
self.turnOffMP = turnOffMP
if self.debug:
matplotlib.use("TkAgg")
else:
matplotlib.use("Agg")
self.darkDetrendWarningIssued1 = False
self.darkDetrendWarningIssued2 = False
if isinstance(inputFrames, str) and "_STD_" in inputFrames:
self.recipeName = self.recipeName.replace("soxs-nod", "soxs-nod-std")
self.recipeName = self.recipeName.replace("soxs-stare", "soxs-stare-std")
self.recipeName = self.recipeName.replace("soxs-offset", "soxs-offset-std")
# CHECK IF PRODUCT ALREADY EXISTS
if inputFrames and not isinstance(inputFrames, list) and inputFrames.split(".")[-1].lower() == "sof":
self.sofName = os.path.basename(inputFrames).replace(".sof", "")
self.productPath, self.startNightDate = toolkit.predict_product_path(inputFrames, self.recipeName)
errorLog = os.path.splitext(self.productPath)[0] + "_ERROR.log"
basename = os.path.basename(self.productPath)
if os.path.exists(errorLog) and not overwrite:
if verbose:
print(
f"This recipe previously failed (see `{basename}`). To rerun the recipe, run the recipe command with the overwrite flag (-x)."
)
raise FileExistsError(
f"This recipe previously failed (see `{basename}`). To rerun the recipe, run the recipe command with the overwrite flag (-x)."
)
elif os.path.exists(self.productPath) and not overwrite:
basename = os.path.basename(self.productPath)
if verbose:
print(
f"The product of this recipe already exists: `{basename}`. To overwrite this product, rerun the pipeline command with the overwrite flag (-x)."
)
raise FileExistsError(
f"The product of this recipe already exists: `{basename}`. To overwrite this product, rerun the pipeline command with the overwrite flag (-x)."
)
self.log = toolkit.add_recipe_logger(log, self.productPath)
else:
self.sofName = False
self.productPath = False
self.log = log
if command:
self.log.print(f"\nRecipe Command: {command}")
from soxspipe.commonutils.toolkit import get_calibrations_path
self.calibrationRootPath = get_calibrations_path(log=self.log, settings=self.settings)
self.verbose = verbose
# SET LATER WHEN VERIFYING FRAMES
self.arm = None
self.detectorParams = None
self.dateObs = None
self.outDir = self.workspaceRootPath + "/tmp/" + str(random.randint(100000, 999999))
# FIND THE CURRENT SESSION
from os.path import expanduser
home = expanduser("~")
from soxspipe.commonutils import data_organiser
do = data_organiser(
log=self.log, rootDir=self.settings["workspace-root-dir"].replace("~", home), dbConnect=False
)
self.currentSession, allSessions = do.session_list(silent=True)
do.close()
# INITIATE A DB CONNECTION
self.conn = None
self.status = None
if not self.turnOffMP:
if self.currentSession and self.sofName:
self.sessionDb = self.settings["workspace-root-dir"].replace("~", home) + "/soxspipe.db"
def dict_factory(cursor, row):
d = {}
for idx, col in enumerate(cursor.description):
d[col[0]] = row[idx]
return d
self.conn = sql.connect(self.sessionDb, check_same_thread=False, timeout=300, autocommit=True)
c = self.conn.cursor()
c.execute("PRAGMA busy_timeout = 100000")
c.execute("PRAGMA synchronous = OFF")
self.conn.row_factory = dict_factory
# SET RECIPE TO 'FAIL' AND SWITCH TO 'PASS' ONLY IF RECIPE COMPLETES
if self.conn:
c = self.conn.cursor()
sqlQuery = f"select status_{self.currentSession} as status from product_frames where sof = '{self.sofName}.sof'"
c.execute(sqlQuery)
try:
self.status = c.fetchone()["status"]
sqlQuery = f"update product_frames set status_{self.currentSession} = 'fail' where sof = '{self.sofName}.sof'"
c.execute(sqlQuery)
except:
self.status = None
c.close()
# COLLECT ADVANCED SETTINGS IF AVAILABLE
parentDirectory = os.path.dirname(__file__)
advs = parentDirectory + "/advanced_settings.yaml"
level = 0
exists = False
count = 1
while not exists and len(advs) and count < 10:
count += 1
level -= 1
exists = os.path.exists(advs)
if not exists:
advs = "/".join(parentDirectory.split("/")[:level]) + "/advanced_settings.yaml"
if not exists:
advs = {}
else:
with open(advs, "r") as stream:
advs = yaml.safe_load(stream)
# MERGE ADVANCED SETTINGS AND USER SETTINGS (USER SETTINGS OVERRIDE)
self.settings = {**advs, **self.settings}
# DATAFRAMES TO COLLECT QCs AND PRODUCTS
self.qc = pd.DataFrame(
{
"soxspipe_recipe": [],
"qc_name": [],
"qc_value": [],
"qc_unit": [],
"qc_order": [],
"qc_comment": [],
"obs_date_utc": [],
"reduction_date_utc": [],
"to_header": [],
}
)
self.products = pd.DataFrame(
{
"soxspipe_recipe": [],
"product_label": [],
"file_name": [],
"file_type": [],
"obs_date_utc": [],
"reduction_date_utc": [],
"file_path": [],
"label": [],
}
)
# KEYWORD LOOKUP OBJECT - LOOKUP KEYWORD FROM DICTIONARY IN RESOURCES
# FOLDER
self.kw = keyword_lookup(log=self.log, settings=self.settings).get
from soxspipe.commonutils.toolkit import utility_setup
self.qcDir, self.productDir = utility_setup(
log=self.log, settings=settings, recipeName=self.recipeName, startNightDate=self.startNightDate
)
self.generateReponseCurve = False
return None
def _prepare_single_frame(self, frame, save=False):
"""*prepare a single raw frame by converting pixel data from ADU to electrons and adding mask and uncertainty extensions*
**Key Arguments:**
- ``frame`` -- the path to the frame to prepare, of a CCDData object
**Return:**
- ``frame`` -- the prepared frame with mask and uncertainty extensions (CCDData object)
:::{todo}
- write a command-line tool for this method
:::
"""
self.log.debug("starting the ``_prepare_single_frame`` method")
from astropy.nddata import CCDData
import ccdproc
from astropy import units as u
import numpy as np
import logging
import warnings
from datetime import datetime
from soxspipe.commonutils import toolkit
warnings.filterwarnings(action="ignore")
logging.captureWarnings(True)
kw = self.kw
dp = self.detectorParams
# STORE FILEPATH FOR LATER USE
filepath = frame
# CONVERT FILEPATH TO CCDDATA OBJECT
if isinstance(frame, str):
# CONVERT RELATIVE TO ABSOLUTE PATHS
frame = self._absolute_path(frame)
# OPEN THE RAW FRAME - MASK AND UNCERT TO BE POPULATED LATER
try:
frame = CCDData.read(
frame,
hdu=0,
unit=u.adu,
hdu_uncertainty="ERRS",
hdu_mask="QUAL",
hdu_flags="FLAGS",
key_uncertainty_type="UTYPE",
)
except TypeError as e:
if "buffer is too small" in str(e):
self.log.warning(
f"Buffer is too small for frame {filepath}. The frame is likely corrupted and will not be used in the reduction.\n"
)
return None
else:
self.log.info(f"{filepath} is a FITS Binary Table")
return filepath
# CHECK THE NUMBER OF EXTENSIONS IS ONLY 1 AND "SXSPRE" DOES NOT
# EXIST. i.e. THIS IS A RAW UNTOUCHED FRAME
if len(frame.to_hdu()) > 1 or "SXSPRE" in frame.header:
return filepath
# MANIPULATE XSH DATA
frame = self.xsh2soxs(frame)
frame = self._trim_frame(frame)
# CORRECT FOR GAIN - CONVERT DATA FROM ADU TO ELECTRONS
frame = ccdproc.gain_correct(frame, dp["gain"])
toolkit.frame_to_32(frame)
# GENERATE UNCERTAINTY MAP AS EXTENSION
if frame.header[kw("DPR_TYPE")] == "BIAS":
# ERROR IS ONLY FROM READNOISE FOR BIAS FRAMES
errorMap = np.ones_like(frame.data).astype(np.float32) * dp["ron"].astype(np.float32)
# errorMap = StdDevUncertainty(errorMap)
frame.uncertainty = errorMap.astype(np.float32)
else:
# GENERATE UNCERTAINTY MAP AS EXTENSION
frame = ccdproc.create_deviation(frame, readnoise=dp["ron"], disregard_nan=True)
toolkit.frame_to_32(frame)
# FIND THE APPROPRIATE BAD-PIXEL BITMAP AND APPEND AS 'FLAG' EXTENSION
# NOTE FLAGS NOT YET SUPPORTED BY CCDPROC THIS THIS WON'T GET SAVED OUT
# AS AN EXTENSION
arm = self.arm
if arm != "NIR" and kw("WIN_BINX") in frame.header:
binx = int(frame.header[kw("WIN_BINX")])
biny = int(frame.header[kw("WIN_BINY")])
else:
binx = 1
biny = 1
bitMapPath = self.calibrationRootPath + "/" + dp["bad-pixel map"][f"{binx}x{biny}"]
if not os.path.exists(bitMapPath):
message = "the path to the bitMapPath %s does not exist on this machine" % (bitMapPath,)
if True:
# CREATE A DUMMY BAD-PIXEL MAP
import numpy as np
from astropy.nddata import CCDData
frame = CCDData(np.full_like(frame.data, 0), unit="adu")
toolkit.frame_to_32(frame)
# WRITE CCDDATA OBJECT TO FILE
HDUList = frame.to_hdu()
HDUList.verify("fix")
HDUList.writeto(bitMapPath, output_verify="exception", overwrite=True, checksum=True)
self.log.critical(message)
raise IOError(message)
bitMap = CCDData.read(bitMapPath, hdu=0, unit=u.dimensionless_unscaled)
# frame.flags = bitMap.data
# FLATTEN BAD-PIXEL BITMAP TO BOOLEAN FALSE (GOOD) OR TRUE (BAD) AND
# APPEND AS 'UNCERT' EXTENSION
boolMask = bitMap.data.astype(bool).data
try:
# FAILS IN PYTHON 2.7 AS BOOLMASK IS A BUFFER - NEED TO CONVERT TO
# 2D ARRAY
boolMask.shape
except:
arr = np.frombuffer(boolMask, dtype=np.uint8)
arr.shape = frame.data.shape
boolMask = arr
frame.mask = boolMask
if self.recipeName in ["soxs-nod-std", "soxs-stare-std", "soxs-offset-std"] and self.recipeSettings["use_flat"]:
# OBJECT/STANDARD FRAMES
if frame.meta[kw("DPR_TYPE")] == "STD,FLUX" or "STD_stare" in frame.meta[kw("OBS_NAME")]:
# ASSUMING WE HAVE ONLY STANDARD A-B CYCLES AND NOT JITTER.
self.generateReponseCurve = True
if "use_lacosmic" in self.recipeSettings and self.recipeSettings["use_lacosmic"]:
# oldCount = frame.mask.sum()
# oldMask = frame.mask.copy()
from ccdproc import cosmicray_lacosmic
frame = cosmicray_lacosmic(
frame,
sigclip=15.0,
sigfrac=0.3,
objlim=10.0,
gain_apply=False,
niter=2,
verbose=False,
cleantype="meanmask",
)
toolkit.frame_to_32(frame)
# newCount = self.skySubtractedFrame.mask.sum()
# self.laCosmicClippedCount = newCount - oldCount
# frame.mask = oldMask | frame.mask
from soxspipe.commonutils.toolkit import quicklook_image
quicklook_image(
log=self.log,
CCDObject=frame,
show=self.debug,
ext=False,
stdWindow=3,
title="L.A.Cosmic cleaned (red = masked)",
surfacePlot=False,
settings=self.settings,
skylines=False,
)
if save:
outDir = self.workspaceRootPath
else:
outDir = self.outDir
# INJECT THE PRE KEYWORD
utcnow = datetime.utcnow()
frame.header["SXSPRE"] = (utcnow.strftime("%Y-%m-%dT%H:%M:%S.%f"), "UTC timestamp")
# RECURSIVELY CREATE MISSING DIRECTORIES
if not os.path.exists(outDir):
try:
os.makedirs(outDir)
except:
pass
# CONVERT CCDData TO FITS HDU (INCLUDING HEADER) AND SAVE WITH PRE TAG
# PREPENDED TO FILENAME
basename = os.path.basename(filepath)
filenameNoExtension = os.path.splitext(basename)[0]
extension = os.path.splitext(basename)[1]
filePath = outDir + "/" + filenameNoExtension + "_pre" + extension
# SAVE TO DISK
self._write(
frame=frame,
filedir=outDir,
filename=filenameNoExtension + "_pre" + extension,
overwrite=True,
product=False,
)
## KEYWORDS FOR LATER QCs
if self.inst == "SOXS":
self.cptemp = frame.header[kw("CP_TEMP_C")]
if self.arm == "NIR":
self.detectorTemp = frame.header[kw("NIR_TEMP_K")]
elif self.arm == "VIS":
self.detectorTemp = frame.header[kw("VIS_TEMP_C")]
else:
self.detectorTemp = None
self.log.debug("completed the ``_prepare_single_frame`` method")
return filePath
def _absolute_path(self, path):
"""*convert paths from home directories to absolute paths*
**Key Arguments:**
- ``path`` -- path possibly relative to home directory
**Return:**
- ``absolutePath`` -- absolute path
**Usage**
```python
myPath = self._absolute_path(myPath)
```
"""
from os.path import expanduser
home = expanduser("~")
if path[0] == "~":
path = home + "/" + path[1:]
return path.replace("//", "/")
[docs]
def prepare_frames(self, save=False):
"""*prepare raw frames by converting pixel data from ADU to electrons and adding mask and uncertainty extensions*
**Key Arguments:**
- ``save`` -- save out the prepared frame to the intermediate products directory. Default False.
**Return:**
- ``preframes`` -- the new image collection containing the prepared frames
**Usage**
Usually called within a recipe class once the input frames have been selected and verified (see `soxs_mbias` code for example):
```python
self.inputFrames = self.prepare_frames(
save=self.settings["save-intermediate-products"])
```
"""
self.log.debug("starting the ``prepare_frames`` method")
from soxspipe.commonutils.set_of_files import set_of_files
import numpy as np
kw = self.kw
filepaths = self.inputFrames.files_filtered(include_path=True)
frameCount = len(filepaths)
if "use_lacosmic" in self.recipeSettings and self.recipeSettings["use_lacosmic"]:
laC = ", RUNNING L.A.COSMIC"
else:
laC = ""
self.log.print(
f"\n# PREPARING {frameCount} RAW FRAMES - TRIMMING OVERSCAN, CONVERTING TO ELECTRON COUNTS, GENERATING UNCERTAINTY MAPS{laC} AND APPENDING DEFAULT BAD-PIXEL MASK"
)
preframes = []
preframes[:] = [self._prepare_single_frame(frame=frame, save=save) for frame in filepaths]
preframes = [f for f in preframes if f is not None]
sof = set_of_files(
log=self.log,
settings=self.settings,
inputFrames=preframes,
recipeName=self.recipeName,
verbose=self.verbose,
)
preframes, supplementaryInput = sof.get()
preframes.sort([kw("MJDOBS")])
self.log.print("# PREPARED FRAMES - SUMMARY")
slitname = kw(f"SLIT_{self.arm}".upper())
try:
preframes.summary["SLIT"] = preframes.summary[slitname]
except:
pass
preframes.summary["LAMP"] = "------------"
columns = preframes.summary.colnames
for i in range(7):
thisLamp = kw(f"LAMP{i+1}")
# FIRST FIND THE NAME OF THE LAMP
newLamp = preframes.summary[thisLamp][np.where(preframes.summary[thisLamp].filled(999) != 999)]
if len(newLamp):
newLamp = newLamp[0]
newLamp = newLamp.replace("_Lamp", "")
newLamp = (
newLamp.replace("Argo", "Ar").replace("Neon", "Ne").replace("Merc", "Hg").replace("Xeno", "Xe")
)
updatedList = list(
preframes.summary["LAMP"][np.where(preframes.summary[thisLamp].filled(999) != 999)].data
)
updatedList[:] = [u.replace("-", "") + newLamp for u in updatedList]
preframes.summary["LAMP"][np.where(preframes.summary[thisLamp].filled(999) != 999)] = updatedList
columns.remove(thisLamp)
preframes.summary["LAMP"][np.where(preframes.summary["LAMP"] == "------------")] = "--"
try:
columns.remove(kw("SLIT_NIR"))
except:
pass
try:
columns.remove(kw("SLIT_VIS"))
except:
pass
try:
columns.remove(kw("SLIT_UVB"))
except:
pass
if "filename" in columns:
# columns.remove("file")
columns.remove("filename")
columns = ["filename"] + columns
# MAKE A COPY TO RENAME COLUMNS
this = preframes.summary.copy()
newColumns = []
for c in columns:
newC = c
if "ESO SEQ " in c:
newC = c.replace("ESO SEQ ", "")
this.rename_column(c, newC)
if "ESO DPR " in c:
newC = c.replace("ESO DPR ", "")
this.rename_column(c, newC)
newColumns.append(newC)
cleanColumns = newColumns.copy()
cleanColumns.remove("file")
self.log.print(this[cleanColumns])
self.log.print("\n")
self.rawFrames = this[newColumns]
# SORT RECIPE AND ARM SETTINGS
self.recipeSettings = self.get_recipe_settings()
self.log.debug("completed the ``prepare_frames`` method")
return preframes
def _verify_input_frames_basics(self):
"""*the basic verifications that needs done for all recipes*
**Return:**
- None
If the fits files conform to the required input for the recipe, everything will pass silently; otherwise, an exception will be raised.
"""
self.log.debug("starting the ``_verify_input_frames_basics`` method")
from astropy import units as u
from contextlib import suppress
import numpy as np
kw = self.kw
# CHECK WE ACTUALLY HAVE IMAGES
if not len(self.inputFrames.files_filtered(include_path=True)):
sys.stdout.flush()
sys.stdout.write("\x1b[1A\x1b[2K")
self.log.print("# VERIFYING INPUT FRAMES - **ERROR**\n")
raise FileNotFoundError("No image frames where passed to the recipe")
arm = self.inputFrames.values(keyword=kw("SEQ_ARM"), unique=True)
# SORT RECIPE AND ARM SETTINGS
self.recipeSettings = self.get_recipe_settings()
inst = self.inputFrames.values(kw("INSTRUME"), unique=True)
with suppress(ValueError):
inst.remove(None)
self.inst = inst[0]
# MIXED INPUT ARMS ARE BAD
if None in arm:
arm.remove(None)
if len(arm) > 1:
arms = " and ".join(arm)
sys.stdout.flush()
sys.stdout.write("\x1b[1A\x1b[2K")
self.log.print("# VERIFYING INPUT FRAMES - **ERROR**\n")
self.log.print(self.inputFrames.summary)
raise TypeError("Input frames are a mix of %(imageTypes)s" % locals())
else:
self.arm = arm[0]
# CREATE DETECTOR LOOKUP DICTIONARY - SOME VALUES CAN BE OVERWRITTEN
# WITH WHAT IS FOUND HERE IN FITS HEADERS
self.detectorParams = detector_lookup(log=self.log, settings=self.settings).get(self.arm)
# SET IMAGE ORIENTATION
if self.detectorParams["dispersion-axis"] == "x":
self.axisA = "x"
self.axisB = "y"
else:
self.axisA = "y"
self.axisB = "x"
# BE CAREFUL WHAT TO MATCH WHEN INSPECTING BINNING ... DON'T BE TO STRICT
matches = (
(self.inputFrames.summary[kw("PRO_CATG")] != f"DISP_TAB_{self.arm}".upper())
& (self.inputFrames.summary[kw("PRO_CATG")] != f"ORDER_TAB_{self.arm}".upper())
& (self.inputFrames.summary[kw("PRO_CATG")] != f"DISP_IMAGE_{self.arm}".upper())
)
binningMatch = self.inputFrames.summary[matches]
# MIXED BINNING IS BAD
if self.arm == "NIR":
# NIR ARRAY NEVER BINNED
cdelt1 = [1]
cdelt2 = [1]
else:
cdelt1 = np.unique(binningMatch[kw("CDELT1")].data)
cdelt2 = np.unique(binningMatch[kw("CDELT2")].data)
try:
cdelt1.remove(None)
cdelt2.remove(None)
except:
pass
if len(cdelt1) > 1 or len(cdelt2) > 1:
sys.stdout.flush()
sys.stdout.write("\x1b[1A\x1b[2K")
self.log.print("# VERIFYING INPUT FRAMES - **ERROR**\n")
raise TypeError("Input frames are a mix of binnings" % locals())
if cdelt1[0] and cdelt2[0]:
self.detectorParams["binning"] = [int(cdelt2[0]), int(cdelt1[0])]
# CHECK IF BINNING IS MIXED IF RESPONSE FUNCTION (FLUX CALIBRATION) IS REQUIRED
if self.arm != "NIR":
mask = self.inputFrames.summary[kw("PRO_CATG")] == f"RESP_TAB_{self.arm}"
resp_match = self.inputFrames.summary[mask]
if len(resp_match) > 0:
if int(resp_match[0][kw("CDELT1")]) == cdelt1[0] and int(resp_match[0][kw("CDELT2")]) == cdelt2[0]:
pass
else:
raise TypeError("Input response function has different binning to science frames" % locals())
# MIXED READOUT SPEEDS IS BAD
readSpeed = self.inputFrames.values(keyword=kw("DET_READ_SPEED"), unique=True)
with suppress(ValueError):
readSpeed.remove(None)
if len(readSpeed) > 1:
sys.stdout.flush()
sys.stdout.write("\x1b[1A\x1b[2K")
self.log.print("# VERIFYING INPUT FRAMES - **ERROR**\n")
self.log.print(self.inputFrames.summary)
raise TypeError(f"Input frames are a mix of readout speeds. {readSpeed}" % locals())
# MIXED GAIN SPEEDS IS BAD
# HIERARCH ESO DET OUT1 CONAD - Electrons/ADU
# CONAD IS REALLY GAIN AND HAS UNIT OF Electrons/ADU
if self.settings["instrument"] == "xsh":
gain = self.inputFrames.values(keyword=kw("CONAD"), unique=True)
else:
mask = self.inputFrames.summary[kw("PRO_CATG")] != f"RESP_TAB_{self.arm}"
checkFrames = self.inputFrames.summary[mask]
# REMOVE MASKED/EMPTY VALUES (WORKS FOR ASTROPY TABLE COLUMNS TOO)
conad = np.ma.asarray(checkFrames[kw("CONAD")])
gainVals = np.ma.asarray(checkFrames[kw("GAIN")])
valid = (~np.ma.getmaskarray(conad)) & (~np.ma.getmaskarray(gainVals))
a = conad[valid].tolist()
b = gainVals[valid].tolist()
gain = [max(x, y) for x, y in zip(a, b) if x is not None and y is not None]
gain = list(set(gain))
with suppress(ValueError):
gain.remove(None)
if len(gain) > 1:
sys.stdout.flush()
sys.stdout.write("\x1b[1A\x1b[2K")
self.log.print("# VERIFYING INPUT FRAMES - **ERROR**\n")
self.log.print(self.inputFrames.summary)
# gain = np.unique(gain)
raise TypeError(f"Input frames are a mix of gain {gain}" % locals())
if len(gain) and gain[0]:
# UVB & VIS
self.detectorParams["gain"] = gain[0] * u.electron / u.adu
else:
# NIR
self.log.print("\n\tGain is being read from the detector parameter file (not the FITS header)\n")
self.detectorParams["gain"] = self.detectorParams["gain"] * u.electron / u.adu
# CONVERT TO DATAFRAME AND FILTER TO CHECK SLIT WIDTHS
filteredDf = self.inputFrames.summary.to_pandas()
matchList = ["LAMP,FLAT", "LAMP,QFLAT", "LAMP,DFLAT", "OBJECT", "STD,FLUX", f"MASTER_FLAT_{self.arm}"]
mask = ((filteredDf[kw("DPR_TYPE")].isin(matchList)) | (filteredDf[kw("PRO_CATG")].isin(matchList))) & (
~filteredDf[kw("PRO_CATG")].isin([f"RESP_TAB_{self.arm}"])
)
filteredDf = filteredDf.loc[mask]
# MIXED SLIT-WIDTH IS BAD
slitWidth = list(filteredDf[kw(f"SLIT_{self.arm}")].unique())
with suppress(ValueError):
slitWidth.remove(None)
# ITEMS TO REMOVE
removeItems = ["pin", "blind"]
for i in slitWidth:
for j in removeItems:
if j in str(i).lower():
slitWidth.remove(i)
slitWidth = [str(s).replace("JH", "") for s in slitWidth]
slitWidth = list(set(slitWidth))
if len(slitWidth) > 1:
if (
self.inst == "SOXS"
and self.arm == "NIR"
and self.recipeName.replace("-std", "").replace("-obj", "") in ["soxs-nod", "soxs-stare", "soxs-offset"]
and "SLIT5.0" in slitWidth
and "SLIT1.5" in slitWidth
and len(slitWidth) == 2
):
pass
else:
sys.stdout.flush()
sys.stdout.write("\x1b[1A\x1b[2K")
self.log.print("# VERIFYING INPUT FRAMES - **ERROR**\n")
self.log.print(self.inputFrames.summary)
raise TypeError(f"Input frames are a mix of slit-width ({slitWidth})" % locals())
# HIERARCH ESO DET OUT1 RON - Readout noise in electrons
ron = self.inputFrames.values(keyword=kw("RON"), unique=True)
with suppress(ValueError):
ron.remove(None)
# MIXED NOISE
if len(ron) > 1:
sys.stdout.flush()
sys.stdout.write("\x1b[1A\x1b[2K")
self.log.print("# VERIFYING INPUT FRAMES - **ERROR**\n")
self.log.print(self.inputFrames.summary)
raise TypeError(f"Input frames are a mix of readnoise. {ron}" % locals())
if len(ron) and ron[0]:
# UVB & VIS
self.detectorParams["ron"] = ron[0] * u.electron
else:
# NIR
self.detectorParams["ron"] = self.detectorParams["ron"] * u.electron
imageTypes = self.inputFrames.values(keyword=kw("DPR_TYPE"), unique=True) + self.inputFrames.values(
keyword=kw("PRO_TYPE"), unique=True
)
imageTech = self.inputFrames.values(keyword=kw("DPR_TECH"), unique=True) + self.inputFrames.values(
keyword=kw("PRO_TECH"), unique=True
)
imageCat = self.inputFrames.values(keyword=kw("DPR_CATG"), unique=True) + self.inputFrames.values(
keyword=kw("PRO_CATG"), unique=True
)
def clean_list(myList):
myList = list(set(myList))
try:
myList.remove(None)
except:
pass
try:
myList.remove("REDUCED")
except:
pass
return myList
imageTypes = clean_list(imageTypes)
imageTech = clean_list(imageTech)
imageCat = clean_list(imageCat)
self.log.debug("completed the ``_verify_input_frames_basics`` method")
return imageTypes, imageTech, imageCat
[docs]
def clean_up(self, forceFail=False):
"""*update product status in DB and remove intermediate files once recipe is complete*
**Key Arguments:**
- ``forceFail`` -- force the recipe to be marked as failed in the DB
**Usage**
```python
recipe.clean_up(forceFail=True)
```
"""
self.log.debug("starting the ``clean_up`` method")
import shutil
import time
# FILTER QC TABLE ON qc_flag
mask = self.qc["qc_flag"] == "fail"
passToFail = False
failedQcs = self.qc.loc[mask]
failedQcs = failedQcs["qc_name"].tolist()
if len(failedQcs):
forceFail = True
if self.status == "pass":
passToFail = True
# SET RECIPE PRODUCTS TO 'PASS'
if self.conn:
if not passToFail and not forceFail:
c = self.conn.cursor()
sqlQuery = (
f"update product_frames set status_{self.currentSession} = 'pass' where sof = '{self.sofName}.sof'"
)
c.execute(sqlQuery)
c.close()
# PREVIOUSLY FAILED RECIPE THAT HAS NOW PASSED
if (self.status == "fail" and passToFail) or (forceFail and self.status == "pass"):
from soxspipe.commonutils import data_organiser
if passToFail or forceFail:
failure = True
else:
failure = False
do = data_organiser(log=self.log, rootDir=self.workspaceRootPath)
do.session_refresh(failure=failure)
do.close()
self.conn.close()
del self.conn
try:
shutil.rmtree(self.outDir)
except:
pass
if forceFail and isinstance(forceFail, str):
self.log.error(f"\nRecipe marked as failed in the database. {forceFail}")
elif forceFail:
self.log.error(
f"\nRecipe marked as failed in the database as the following QC values are outside of the acceptable limits: {', '.join(failedQcs)}."
)
self.log.debug("completed the ``clean_up`` method")
return None
[docs]
def xsh2soxs(self, frame):
"""*perform some massaging of the xshooter data so it more closely resembles soxs data - this function can be removed once code is production ready*
**Key Arguments:**
- ``frame`` -- the CCDDate frame to manipulate
**Return:**
- ``frame`` -- the manipulated soxspipe-ready frame
**Usage:**
```python
frame = self.xsh2soxs(frame)
```
"""
self.log.debug("starting the ``xsh2soxs`` method")
import numpy as np
kw = self.kw
dp = self.detectorParams
# NP ROTATION OF ARRAYS IS IN COUNTER-CLOCKWISE DIRECTION
rotationIndex = int(dp["clockwise-rotation"] / 90.0)
if rotationIndex > 0:
frame.data = np.rot90(frame.data, rotationIndex)
self.log.debug("completed the ``xsh2soxs`` method")
return frame
def _trim_frame(self, frame):
"""*return frame with pre-scan and overscan regions removed*
**Key Arguments:**
- ``frame`` -- the CCDData frame to be trimmed
"""
self.log.debug("starting the ``_trim_frame`` method")
import ccdproc
kw = self.kw
arm = self.arm
dp = self.detectorParams
rs, re, cs, ce = (
dp["science-pixels"]["rows"]["start"],
dp["science-pixels"]["rows"]["end"],
dp["science-pixels"]["columns"]["start"],
dp["science-pixels"]["columns"]["end"],
)
binning = dp["binning"]
if binning[0] > 1:
rs = int(rs / binning[0])
re = int(re / binning[0])
if binning[1] > 1:
cs = int(cs / binning[1])
ce = int(ce / binning[1])
trimmed_frame = ccdproc.trim_image(frame[rs:re, cs:ce])
self.log.debug("completed the ``_trim_frame`` method")
return trimmed_frame
def _write(self, frame, filedir, filename=False, overwrite=True, product=True, maskToZero=False):
"""*write frame to disk at the specified location*
**Key Arguments:**
- ``frame`` -- the frame to save to disk (CCDData object)
- ``filedir`` -- the location to save the frame
- ``filename`` -- the filename to save the file as. Default: **False** (standardised filename generated in code)
- ``overwrite`` -- if a file exists at the filepath then choose to overwrite the file. Default: True
- ``product`` -- is this a recipe product?
- ``maskToZero`` -- set masked pixels to zero before writing to file?
**Usage:**
Use within a recipe like so:
```python
self._write(frame, filePath)
```
"""
self.log.debug("starting the ``write`` method")
kw = self.kw
# WRITE QCs TO HEADERS
for n, v, c, h in zip(
self.qc["qc_name"].values,
self.qc["qc_value"].values,
self.qc["qc_comment"].values,
self.qc["to_header"].values,
):
if h:
frame.header[f"ESO QC {n}".upper()] = (v, c)
# NEATLY SORT KEYWORDS
keywords = [k for k in frame.header if len(k)]
values = [frame.header[k] for k in frame.header if len(k)]
comments = [frame.header.comments[k] for k in frame.header if len(k)]
keywords, values, comments = zip(*sorted(zip(keywords, values, comments)))
if "COMMENT" not in keywords and "HISTORY" not in keywords:
frame.header.clear()
for k, v, c in zip(keywords, values, comments):
if k == "COMMENT":
frame.header[k] = v
else:
frame.header[k] = (v, c)
if not filename and self.sofName:
filename = self.sofName + ".fits"
if not filename:
filename = filenamer(log=self.log, frame=frame, settings=self.settings)
if product:
removeKw = ["DPR_TECH", "DPR_CATG", "DPR_TYPE"]
for k in removeKw:
try:
frame.header.pop(kw(k))
except:
pass
filedir += f"/reduced/{self.startNightDate}/{self.recipeName}/"
filedir = filedir.replace("//", "/")
# Recursively create missing directories
if not os.path.exists(filedir):
try:
os.makedirs(filedir)
except:
pass
filepath = filedir + "/" + filename
# SET BAD-PIXELS TO 0 IN DATA FRAME
if maskToZero:
self.log.print(f"\nSetting {frame.mask.sum()} bad-pixels to a value of 0 while saving '{filename}'.")
frame.data[frame.mask] = 1
if "NAXIS" in frame.header and frame.header["NAXIS"] != 0 and "INHERIT" in frame.header:
del frame.header["INHERIT"]
HDUList = frame.to_hdu(hdu_mask="QUAL", hdu_uncertainty="ERRS", hdu_flags=None)
HDUList[0].name = "FLUX"
HDUList.verify("fix")
if product:
HDUList.writeto(filepath, output_verify="fix+warn", overwrite=overwrite, checksum=True)
else:
HDUList.writeto(filepath, overwrite=overwrite, checksum=False)
filepath = os.path.abspath(filepath)
self.log.debug("completed the ``write`` method")
return filepath
[docs]
def clip_and_stack(self, frames, recipe, ignore_input_masks=False, post_stack_clipping=True):
"""*mean combine input frames after sigma-clipping outlying pixels using a median value with median absolute deviation (mad) as the deviation function*
**Key Arguments:**
- ``frames`` -- an ImageFileCollection of the frames to stack or a list of CCDData objects
- ``recipe`` -- the name of recipe needed to read the correct settings from the yaml files
- ``ignore_input_masks`` -- ignore the input masks during clip and stacking?
- ``post_stack_clipping`` -- allow cross-plane clipping on combined frame. Clipping settings in setting file. Default *True*.
**Return:**
- ``combined_frame`` -- the combined master frame (with updated bad-pixel and uncertainty maps)
**Usage:**
This snippet can be used within the recipe code to combine individual (using bias frames as an example):
```python
combined_bias_mean = self.clip_and_stack(
frames=self.inputFrames, recipe="soxs_mbias", ignore_input_masks=False, post_stack_clipping=True)
```
"""
self.log.debug("starting the ``clip_and_stack`` method")
from astropy.stats import sigma_clip, mad_std
from soxspipe.commonutils.combiner import Combiner
from astropy import units as u
import numpy as np
from soxspipe.commonutils import toolkit
if len(frames) == 1:
self.log.info(
"Only 1 frame was sent to the clip and stack method. Returning the frame with no further processing."
)
return frames[0]
elif len(frames) == 0:
self.log.critical("No frames were sent to the clip and stack method. Cannot proceed.")
raise ValueError("No frames were sent to the clip and stack method.")
arm = self.arm
kw = self.kw
dp = self.detectorParams
imageType = self.imageType
# ALLOW FOR UNDERSCORE AND HYPHENS
recipe = recipe.replace("soxs_", "soxs-")
# UNPACK SETTINGS
stacked_clipping_sigma = self.recipeSettings["stacked-clipping-sigma"]
stacked_clipping_iterations = self.recipeSettings["stacked-clipping-iterations"]
# LIST OF CCDDATA OBJECTS NEEDED BY COMBINER OBJECT
if not isinstance(frames, list):
ccds = [
toolkit.frame_to_32(c)
# c
for c in frames.ccds(
ccd_kwargs={
"hdu_uncertainty": "ERRS",
"hdu_mask": "QUAL",
"hdu_flags": "FLAGS",
"key_uncertainty_type": "UTYPE",
"unit": u.electron,
}
)
]
else:
ccds = [toolkit.frame_to_32(c) for c in frames]
imageType = ccds[0].header[kw("DPR_TYPE")].replace(",", "-")
imageTech = ccds[0].header[kw("DPR_TECH")].replace(",", "-")
imageCat = ccds[0].header[kw("DPR_CATG")].replace(",", "-")
self.log.print(f"\n# MEAN COMBINING {len(ccds)} {arm} {imageCat} {imageTech} {imageType} FRAMES")
# COMBINE MASKS AND THEN RESET
combinedMask = ccds[0].mask
for c in ccds:
combinedMask = c.mask | combinedMask
if ignore_input_masks:
c.mask[:, :] = False
# COMBINER OBJECT WILL FIRST GENERATE MASKS FOR INDIVIDUAL IMAGES VIA
# CLIPPING AND THEN COMBINE THE IMAGES WITH THE METHOD SELECTED. PIXEL
# MASKED IN ALL INDIVIDUAL IMAGES ARE MASK IN THE FINAL COMBINED IMAGE
combiner = Combiner(ccds, dtype=np.float32)
# self.log.print(f"\n# SIGMA-CLIPPING PIXEL WITH OUTLYING VALUES IN INDIVIDUAL {imageType} FRAMES")
# PRINT SOME INFO FOR USER
badCount = combinedMask.sum()
totalPixels = np.size(combinedMask)
percent = (float(badCount) / float(totalPixels)) * 100.0
if imageType != "BIAS":
self.log.print(
f"\tThe basic bad-pixel mask for the {arm} detector {imageType} frames contains {badCount} pixels ({percent:0.2}% of all pixels)"
)
# GENERATE A MASK FOR EACH OF THE INDIVIDUAL INPUT FRAMES - USING
# MEDIAN WITH MEDIAN ABSOLUTE DEVIATION (MAD) AS THE DEVIATION FUNCTION
old_n_masked = -1
# THIS IS THE SUM OF BAD-PIXELS IN ALL INDIVIDUAL FRAME MASKS
new_n_masked = combiner.data_arr.mask.sum()
# SIGMA CLIPPING OVERWRITES ORIGINAL MASKS - COPY HERE TO READD
# preclipped_masks = np.copy(combiner.data_arr.mask)
totalPixels = np.size(combinedMask)
## Reduce memory by avoiding an extra full-array copy during clipping
combiner.data_arr.mask = sigma_clip(
np.asarray(combiner.data_arr.data, dtype=np.float32),
sigma_lower=stacked_clipping_sigma,
sigma_upper=stacked_clipping_sigma,
axis=0,
copy=False,
maxiters=stacked_clipping_iterations,
cenfunc="median",
stdfunc="mad_std",
masked=True,
).mask
old_n_masked = new_n_masked
# RECOUNT BAD-PIXELS NOW CLIPPING HAS RUN
new_n_masked = combiner.data_arr.mask.sum()
diff = new_n_masked - old_n_masked
if self.verbose:
percent = 100 * combiner.data_arr.mask[0].sum() / totalPixels
self.log.print(
f"\tClipping found {diff} more rogue pixels in the set of all input frames (~{percent:0.2}% per-frame)"
)
# GENERATE THE COMBINED MEAN
# self.log.print("\n# MEAN COMBINING FRAMES - WITH UPDATED BAD-PIXEL MASKS")
combined_frame = combiner.average_combine()
# RECOMBINE THE COMBINED MASK FROM ABOVE
combined_frame.mask = combined_frame.mask | combinedMask
# INDIVIDUAL UPDATED MASKS (POST CLIPPING)
new_individual_masks = combiner.data_arr.mask
masked_values = new_individual_masks.sum(axis=0)
# A HACK TO THE COMBINER OBJECT TO COMBINE ERROR MAPS EXACTLY AS DATA WAS COMBINED
for i, ccd in enumerate(ccds):
combiner.data_arr.data[i] = ccd.uncertainty.array
combined_uncertainty = combiner.average_combine()
combined_frame.uncertainty = combined_uncertainty.data / (np.sqrt(len(new_individual_masks) - masked_values))
toolkit.frame_to_32(combined_frame)
# MASSIVE FUDGE - NEED TO CORRECTLY WRITE THE HEADER FOR COMBINED
# IMAGES
combined_frame.header = ccds[0].header
try:
combined_frame.wcs = ccds[0].wcs
except:
pass
# CALCULATE NEW PIXELS ADDED TO MASK
if imageType != "BIAS":
newBadCount = combined_frame.mask.sum()
diff = newBadCount - badCount
totalPixels = np.size(combinedMask)
percent = (float(newBadCount) / float(totalPixels)) * 100.0
self.log.print(
f"\t{diff} new pixels made it into the combined bad-pixel map (bad pixels now account for {percent:0.2f}% of all pixels)"
)
self.log.debug("completed the ``clip_and_stack`` method")
return combined_frame
[docs]
def detrend(self, inputFrame, master_bias=False, dark=False, master_flat=False, order_table=False):
"""*subtract calibration frames from an input frame*
**Key Arguments:**
- ``inputFrame`` -- the input frame to have calibrations subtracted. CCDData object.
- ``master_bias`` -- the master bias frame to be subtracted. CCDData object. Default *False*.
- ``dark`` -- a dark frame to be subtracted. CCDData object. Default *False*.
- ``master_flat`` -- divided input frame by this master flat frame. CCDData object. Default *False*.
- ``order_table`` -- order table with order edges defined. Used to subtract scattered light background from frames. Default *False*.
**Return:**
- ``calibration_subtracted_frame`` -- the input frame with the calibration frame(s) subtracted. CCDData object.
**Usage:**
Within a soxspipe recipe use `detrend` like so:
```python
myCalibratedFrame = self.detrend(
inputFrame=inputFrameCCDObject, master_bias=masterBiasCCDObject, dark=darkCCDObject)
```
"""
self.log.debug("starting the ``detrend`` method")
import ccdproc
from astropy import units as u
import copy
from astropy.io import fits
import pandas as pd
from datetime import datetime
from os.path import expanduser
from soxspipe.commonutils import toolkit
arm = self.arm
kw = self.kw
dp = self.detectorParams
if master_bias == None:
master_bias = False
if dark == None:
dark = False
# VERIFY DATA IS IN ORDER
if master_bias == False and dark == False and master_flat == False:
raise TypeError("detrend method needs at least a master-bias frame, a dark frame or a master flat frame")
if master_bias == False and dark != False and dark.header[kw("EXPTIME")] != inputFrame.header[kw("EXPTIME")]:
if not self.darkDetrendWarningIssued1:
self.log.warning("Dark and science/calibration frame have differing exposure-times.")
self.darkDetrendWarningIssued1 = True
processedFrame = inputFrame
if master_bias != False:
processedFrame = ccdproc.subtract_bias(processedFrame, master_bias)
toolkit.frame_to_32(processedFrame)
# DARK WITH MATCHING EXPOSURE TIME
tolerence = 0.5
if (
dark != False
and (int(dark.header[kw("EXPTIME")]) < int(processedFrame.header[kw("EXPTIME")]) + tolerence)
and (int(dark.header[kw("EXPTIME")]) > int(processedFrame.header[kw("EXPTIME")]) - tolerence)
):
processedFrame = ccdproc.subtract_dark(
processedFrame, dark, exposure_time=kw("EXPTIME"), exposure_unit=u.second
)
elif dark != False:
if self.inst == "SOXS":
if not self.darkDetrendWarningIssued2:
self.log.warning(
f"Dark and science/calibration frame have differing exposure-times. SOXS dark noise does not scale linearly with time. Skipping dark subtraction."
)
self.darkDetrendWarningIssued2 = True
else:
if not self.darkDetrendWarningIssued2:
self.log.warning(
f"Dark and science/calibration frame have differing exposure-times. Scaling dark to match science/calibration frame."
)
self.darkDetrendWarningIssued2 = True
self.log.print(f"Scaling the dark to the exposure time of {inputFrame.header[kw('EXPTIME')]}s")
processedFrame = ccdproc.subtract_dark(
processedFrame, dark, exposure_time=kw("EXPTIME"), exposure_unit=u.second, scale=True
)
toolkit.frame_to_32(processedFrame)
doSubtraction = True
if "subtract_background" in self.recipeSettings and not self.recipeSettings["subtract_background"]:
doSubtraction = False
if order_table != False and doSubtraction:
background = subtract_background(
log=self.log,
frame=processedFrame,
sofName=self.sofName,
recipeName=self.recipeName,
orderTable=order_table,
settings=self.settings,
productsTable=self.products,
qcTable=self.qc,
startNightDate=self.startNightDate,
)
backgroundFrame, processedFrame, self.products = background.subtract()
from soxspipe.commonutils.toolkit import quicklook_image
quicklook_image(
log=self.log,
CCDObject=backgroundFrame,
show=False,
ext="data",
stdWindow=3,
title="Background Light",
surfacePlot=True,
)
utcnow = datetime.utcnow()
utcnow = utcnow.strftime("%Y-%m-%dT%H:%M:%S")
# WRITE FITS FRAME OF BACKGROUND IMAGE ... PDF BEING GENERATED INSTEAD
if False:
# DETERMINE WHERE TO WRITE THE FILE
home = expanduser("~")
if self.currentSession:
outDir = (
self.settings["workspace-root-dir"].replace("~", home)
+ f"/sessions/{self.currentSession}/qc/{self.startNightDate}/{self.recipeName}"
)
else:
outDir = (
self.settings["workspace-root-dir"].replace("~", home)
+ f"/qc/{self.startNightDate}/{self.recipeName}"
)
outDir = outDir.replace("//", "/")
# RECURSIVELY CREATE MISSING DIRECTORIES
if not os.path.exists(outDir):
try:
os.makedirs(outDir)
except:
pass
# GET THE EXTENSION (WITH DOT PREFIX)
filename = self.sofName + "_BKGROUND.fits"
filepath = f"{outDir}/{filename}"
header = copy.deepcopy(inputFrame.header)
primary_hdu = fits.PrimaryHDU(backgroundFrame.data, header=header)
hdul = fits.HDUList([primary_hdu])
hdul.verify("fix")
hdul.writeto(filepath, output_verify="exception", overwrite=True, checksum=True)
self.products = pd.concat(
[
self.products,
pd.Series(
{
"soxspipe_recipe": self.recipeName,
"product_label": "BKGROUND",
"file_name": filename,
"file_type": "FITS",
"obs_date_utc": self.dateObs,
"reduction_date_utc": utcnow,
"product_desc": f"Fitted intra-order image background",
"file_path": filepath,
"label": "QC",
}
)
.to_frame()
.T,
],
ignore_index=True,
)
if master_flat != False:
processedFrame = ccdproc.flat_correct(processedFrame, master_flat)
toolkit.frame_to_32(processedFrame)
self.log.debug("completed the ``detrend`` method")
return processedFrame
[docs]
def report_output(self, rformat="stdout"):
"""*a method to report QC values alongside intermediate and final products*
**Key Arguments:**
- ``rformat`` -- the format to outout reports as. Default *stdout*. [stdout|....]
**Usage:**
```python
self.report_output(rformat="stdout")
```
"""
self.log.debug("starting the ``report_output`` method")
from tabulate import tabulate
if not self.verbose:
# REMOVE COLUMN FROM DATA FRAME
self.products.drop(columns=["file_path"], inplace=True)
self.flag_poor_data()
# REMOVE DUPLICATE ENTRIES IN COLUMN 'qc_name' AND KEEP THE LAST ENTRY
self.qc = self.qc.drop_duplicates(subset=["qc_name", "qc_order"], keep="last")
# SEND TO DATABASE
self.qc["sof_name"] = self.sofName + ".sof"
self.qc["obs_date_utc"] = self.dateObs
# SORT BY COLUMN NAME
self.qc.sort_values(["qc_name"], inplace=True)
columns = list(self.qc.columns)
columns.remove("to_header")
columns.remove("obs_date_utc")
columns.remove("qc_order")
columns.remove("reduction_date_utc")
columns.remove("soxspipe_recipe")
columns.remove("sof_name")
try:
columns.remove("qc_value_min")
columns.remove("qc_value_max")
except:
pass
dbColumns = list(self.qc.columns)
dbColumns.remove("to_header")
# SORT BY COLUMN NAME
self.products.sort_values(["label"], ascending=[True], inplace=True)
self.products.drop_duplicates(inplace=True)
columns2 = list(self.products.columns)
columns2.remove("reduction_date_utc")
columns2.remove("soxspipe_recipe")
try:
soxspipe_recipe = self.qc["soxspipe_recipe"].values[0].upper()
except:
soxspipe_recipe = self.recipeName.upper()
if rformat == "stdout":
self.log.print(f"\n# {soxspipe_recipe} QC METRICS")
mask = self.qc["qc_order"] == "-1"
# Format float values to 3 decimal places
qc_display = self.qc.loc[mask][columns].copy()
for col in qc_display.columns:
qc_display[col] = qc_display[col].apply(
lambda x: (
f"{float(x):.3f}"
if isinstance(x, (int, float))
or (isinstance(x, str) and x.replace(".", "", 1).replace("-", "", 1).isdigit())
else x
)
)
self.log.print(tabulate(qc_display, headers="keys", tablefmt="psql", showindex=False, stralign="right"))
self.log.print(f"\n# {soxspipe_recipe} RECIPE PRODUCTS & QC OUTPUTS")
self.log.print(
tabulate(self.products[columns2], headers="keys", tablefmt="psql", showindex=False, stralign="right")
)
if self.conn:
sofNames = self.qc[dbColumns]["sof_name"].values.tolist()
sqlQuery = f"delete from quality_control where sof_name in ({', '.join(['?']*len(sofNames))})"
c = self.conn.cursor()
c.execute(sqlQuery, sofNames)
c.close()
self._dataframe_to_sqlite(self.qc[dbColumns], "quality_control", replace=False)
self.log.debug("completed the ``report_output`` method")
return self.qc[dbColumns]
[docs]
def flag_poor_data(self):
"""*a method to flag data as 'poor' quality based on QC values exceeding thresholds defined in the settings file*
**Usage:**
```python
self.flag_poor_data()
```
"""
self.log.debug("starting the ``flag_poor_data`` method")
from datetime import datetime
import pandas as pd
utcnow = datetime.utcnow()
utcnow = utcnow.strftime("%Y-%m-%dT%H:%M:%S")
if self.inst.upper() == "SOXS":
self.qc = pd.concat(
[
self.qc,
pd.Series(
{
"soxspipe_recipe": self.recipeName,
"qc_name": "DETECTOR TEMP",
"qc_value": self.detectorTemp,
"qc_comment": f"[K] temp of detector",
"qc_unit": "kelvin",
"obs_date_utc": self.dateObs,
"reduction_date_utc": utcnow,
"to_header": False,
}
)
.to_frame()
.T,
],
ignore_index=True,
)
self.qc = pd.concat(
[
self.qc,
pd.Series(
{
"soxspipe_recipe": self.recipeName,
"qc_name": "CPATH TEMP",
"qc_value": self.cptemp,
"qc_comment": f"[C] temp of common path",
"qc_unit": "celsius",
"obs_date_utc": self.dateObs,
"reduction_date_utc": utcnow,
"to_header": False,
}
)
.to_frame()
.T,
],
ignore_index=True,
)
# FILTER DATA FRAME
mask = self.qc["qc_order"].isna()
self.qc.loc[mask, "qc_order"] = "-1"
self.qc["qc_flag"] = "pass"
if "qc-acceptable-ranges" not in self.recipeSettings:
self.log.debug("No acceptable ranges defined in settings file. Skipping the ``flag_poor_data`` method.")
return None
for k, v in self.recipeSettings["qc-acceptable-ranges"].items():
matchName = k.lower().replace("-", " ").replace("_", " ")
mask = (
(self.qc["qc_name"].str.lower() == matchName)
& (self.qc["qc_order"] == "-1")
& ((self.qc["qc_value"].astype(float) <= v[0]) | (self.qc["qc_value"].astype(float) >= v[1]))
)
self.qc.loc[mask, "qc_flag"] = "fail"
# ADD GUARDRAIL VALUES TO QC TABLE FOR REPORTING PURPOSES
self.qc.loc[(self.qc["qc_name"].str.lower() == matchName), "qc_value_min"] = v[0]
self.qc.loc[(self.qc["qc_name"].str.lower() == matchName), "qc_value_max"] = v[1]
self.log.debug("completed the ``flag_poor_data`` method")
return None
[docs]
def qc_ron(self, frameType=False, frameName=False, masterFrame=False, rawRon=False, masterRon=False):
"""*calculate the read-out-noise from bias/dark frames*
**Key Arguments:**
- ``frameType`` -- the type of the frame for reporting QC values. Default *False*
- ``frameName`` -- the name of the frame in human readable words. Default *False*
- ``masterFrame`` -- the master frame (only makes sense to measure RON on master bias). Default *False*
- ``rawRon`` -- if serendipitously calculated elsewhere don't recalculate. Default *False*
- ``masterRon`` -- if serendipitously calculated elsewhere don't recalculate. Default *False*
**Return:**
- ``rawRon`` -- raw read-out-noise in electrons
- ``masterRon`` -- combined read-out-noise in mbias
**Usage:**
```python
rawRon, mbiasRon = self.qc_ron(
frameType="MBIAS",
frameName="master bias",
masterFrame=masterFrame
)
```
"""
self.log.debug("starting the ``qc_bias_ron`` method")
from astropy.stats import sigma_clip
import numpy as np
import pandas as pd
import math
from datetime import datetime
from soxspipe.commonutils import toolkit
utcnow = datetime.utcnow()
utcnow = utcnow.strftime("%Y-%m-%dT%H:%M:%S")
if not rawRon and len(self.inputFrames.files) > 1:
# LIST OF RAW CCDDATA OBJECTS
ccds = [
c
for c in self.inputFrames.ccds(
ccd_kwargs={
"hdu_uncertainty": "ERRS",
"hdu_mask": "QUAL",
"hdu_flags": "FLAGS",
"key_uncertainty_type": "UTYPE",
}
)
]
# SINGLE FRAME RON
raw_one = ccds[0]
raw_two = ccds[1]
raw_diff = raw_one.subtract(raw_two)
toolkit.frame_to_32(raw_diff)
# SIGMA-CLIP THE DATA (AT HIGH LEVEL)
masked_diff = sigma_clip(
raw_diff.data.astype(np.float32),
sigma_lower=10,
sigma_upper=10,
maxiters=2,
cenfunc="median",
stdfunc="mad_std",
)
combinedMask = raw_diff.mask | masked_diff.mask
# FORCE CONVERSION OF CCDData OBJECT TO NUMPY ARRAY
raw_diff = np.ma.array(raw_diff.data, mask=combinedMask)
def imstats(dat):
return (dat.min(), dat.max(), dat.mean(), dat.std())
dmin, dmax, dmean, dstd = imstats(raw_diff)
if dstd == 0:
message = (
"The calculated raw frame readout noise has a value of zero. Please check the raw input frames."
)
raise ValueError(message)
# ACCOUNT FOR EXTRA NOISE ADDED FROM SUBTRACTING FRAMES
rawRon = dstd / math.sqrt(2)
if rawRon:
singleFrameType = frameType
if frameType[0] == "M":
singleFrameType = frameType[1:]
self.qc = pd.concat(
[
self.qc,
pd.Series(
{
"soxspipe_recipe": self.recipeName,
"qc_name": "RAW RON",
"qc_value": rawRon,
"qc_comment": f"[e-] RON in single {singleFrameType}",
"qc_unit": "electrons",
"obs_date_utc": self.dateObs,
"reduction_date_utc": utcnow,
"to_header": True,
}
)
.to_frame()
.T,
],
ignore_index=True,
)
if masterFrame and not masterRon:
# PREDICTED MASTER NOISE
# predictedMasterRon = rawRon / math.sqrt(len(ccds))
# FORCE CONVERSION OF CCDData OBJECT TO NUMPY ARRAY
tmp = np.ma.array(masterFrame.data, mask=combinedMask)
dmin, dmax, dmean, dstd = imstats(tmp)
masterRon = float(dstd)
elif masterRon:
self.qc = pd.concat(
[
self.qc,
pd.Series(
{
"soxspipe_recipe": self.recipeName,
"qc_name": "MASTER RON",
"qc_value": float(masterRon),
"qc_comment": f"[e-] Combined RON in {frameType}",
"qc_unit": "electrons",
"obs_date_utc": self.dateObs,
"reduction_date_utc": utcnow,
"to_header": True,
}
)
.to_frame()
.T,
],
ignore_index=True,
)
else:
masterRon = None
self.log.debug("completed the ``qc_bias_ron`` method")
return rawRon, masterRon
[docs]
def subtract_mean_flux_level(self, rawFrame):
"""*iteratively median sigma-clip raw bias data frames before calculating and removing the mean bias level*
**Key Arguments:**
- ``rawFrame`` -- the raw bias frame
**Return:**
- `meanFluxLevel` -- the frame mean bias level
- `fluxStd` -- the standard deviation of the flux distribution (RON)
- `noiseFrame` -- the raw bias frame with mean bias level removed
**Usage:**
```python
meanFluxLevel, fluxStd, noiseFrame = self.subtract_mean_flux_level(rawFrame)
```
"""
self.log.debug("starting the ``subtract_mean_flux_level`` method")
from astropy.stats import sigma_clip
import numpy as np
from soxspipe.commonutils import toolkit
# UNPACK SETTINGS
clipping_lower_sigma = self.recipeSettings["frame-clipping-sigma"]
clipping_upper_sigma = clipping_lower_sigma
clipping_iteration_count = self.recipeSettings["frame-clipping-iterations"]
maskedFrame = sigma_clip(
rawFrame.data.astype(np.float32),
sigma=clipping_lower_sigma,
maxiters=clipping_iteration_count,
cenfunc="median",
stdfunc="mad_std",
)
# DETERMINE MEDIAN BIAS LEVEL
maskedDataArray = np.ma.array(maskedFrame.data, mask=maskedFrame.mask).astype(np.float32)
meanFluxLevel = np.ma.mean(maskedDataArray)
fluxStd = np.ma.std(maskedDataArray)
rawFrame.data -= meanFluxLevel
toolkit.frame_to_32(rawFrame)
self.log.debug("completed the ``subtract_mean_flux_level`` method")
return (meanFluxLevel, fluxStd, rawFrame)
[docs]
def update_fits_keywords(self, frame, rawFrames=False):
"""*update fits keywords to comply with ESO Phase 3 standards*
**Key Arguments:**
- ``frame`` -- the frame to update
- ``rawFrames`` -- limit the raw frames to be listed in the fits header to only these frames (list)
**Return:**
- None
**Usage:**
```python
usage code
```
:::{todo}
- add usage info
- create a sublime snippet for usage
- write a command-line tool for this method
- update package tutorial with command-line tool info if needed
:::
"""
self.log.debug("starting the ``update_fits_keywords`` method")
import math
from astropy.utils.data import compute_hash
import soxspipe.__version__ as version
arm = self.arm
kw = self.kw
dp = self.detectorParams
imageType = self.imageType
if "FLAT" in imageType:
imageType = "FLAT"
frame.header[kw("SEQ_ARM").upper()] = arm
frame.header[kw("PRO_TYPE").upper()] = "REDUCED"
# PROD CATG
if imageType in ["BIAS", "DARK", "FLAT"]:
frame.header[kw("PRO_CATG")] = f"MASTER_{imageType}_{arm}".replace("QLAMP", "LAMP").replace("DLAMP", "LAMP")
frame.header[kw("PRO_TECH")] = "IMAGE"
# SPEC FORMAT TO PANDAS DATAFRAME
tableData = self.rawFrames.to_pandas()
tableData["filename"] = tableData["filename"].str.replace("_pre", "")
tableData["tag"] = tableData["TYPE"] + "_" + tableData["ARM"]
iterator = 1
for f, t, z in zip(tableData["filename"].values, tableData["tag"].values, tableData[kw("PRO_TYPE")].values):
if rawFrames and f not in rawFrames:
continue
if isinstance(z, float) and math.isnan(z):
valueLen = 80 - len(f"ESO PRO REC1 RAW{iterator} NAME" + "HIERARCH = '")
if len(f) > valueLen:
self.log.warning(f"The filename {f} has been trucated to {f[:valueLen]} in the FITS header")
frame.header[f"ESO PRO REC1 RAW{iterator} NAME"] = f[:valueLen]
frame.header[f"ESO PRO REC1 RAW{iterator} CATG"] = t
iterator += 1
iterator = 1
for f, c, z, p in zip(
tableData["filename"].values,
tableData[kw("PRO_CATG")].values,
tableData[kw("PRO_TYPE")].values,
tableData["file"].values,
):
if not isinstance(z, float) or not math.isnan(z):
valueLen = 80 - len(f"ESO PRO REC1 CAL{iterator} NAME" + "HIERARCH = '")
# if len(f) > valueLen:
# self.log.warning(f"The filename {f} has been trucated to {f[:valueLen]} in the FITS header")
frame.header[f"ESO PRO REC1 CAL{iterator} NAME"] = f[:valueLen]
frame.header[f"ESO PRO REC1 CAL{iterator} CATG"] = c
frame.header[f"ESO PRO REC1 CAL{iterator} DATAMD5"] = compute_hash(p)
iterator += 1
iterator = 1
recipeSettings = self.get_recipe_settings()
for k, v in recipeSettings.items():
if k.lower() in ["uvb", "vis", "nir", "qc-acceptable-ranges"]:
continue
if not isinstance(v, dict):
if isinstance(v, list):
v = ", ".join(map(str, v)).strip()
frame.header[f"ESO PRO REC1 PARAM{iterator} NAME"] = k[:40]
frame.header[f"ESO PRO REC1 PARAM{iterator} VALUE"] = v
iterator += 1
else:
for k2, v2 in v.items():
frame.header[f"ESO PRO REC1 PARAM{iterator} NAME"] = k2[:40]
frame.header[f"ESO PRO REC1 PARAM{iterator} VALUE"] = v2
iterator += 1
# SOXSPIPE VERSION
frame.header[f"ESO PRO REC1 PIPE ID"] = f"soxspipe/v{version}"
# RECIPE
if self.recipeName:
frame.header[f"ESO PRO REC1 ID"] = self.recipeName
# from tabulate import tabulate
# print(tabulate(tableData, headers='keys', tablefmt='psql'))
self.log.debug("completed the ``update_fits_keywords`` method")
return None
[docs]
def get_recipe_settings(self):
"""*get the recipe and arm specific settings*
**Return:**
- ``recipeSettings`` -- the recipe specific settings
**Usage:**
```python
usage code
```
"""
self.log.debug("starting the ``get_recipe_settings`` method")
recipeSettings = False
if self.recipeName:
recipeSettings = self.settings[self.recipeName]
if recipeSettings and self.arm and self.arm.lower() in recipeSettings:
for k, v in recipeSettings[self.arm.lower()].items():
recipeSettings[k] = v
self.log.debug("completed the ``get_recipe_settings`` method")
return recipeSettings
def _dataframe_to_sqlite(self, dataframe, table_name, replace=False):
"""
Retry inserting into the database with a maximum of keepTryingMax attempts.
**Key Arguments:**
- `dataframe` -- DataFrame containing rows to insert.
- `table_name` -- Name of the database table to insert into.
- `replace` -- If True, replace existing entries; otherwise, append.
**Raises:**
- Exception if the insertion fails after 7 attempts.
"""
import time
if replace:
c = self.conn.cursor()
sqlQuery = f"delete from {table_name};"
try:
c.execute(sqlQuery)
except:
pass
c.close()
keepTrying = 0
keepTryingMax = 7
while keepTrying < keepTryingMax:
try:
dataframe.replace(["--"], None).to_sql(
table_name, con=self.conn, index=False, if_exists="append", method="multi"
)
keepTrying = keepTryingMax
except Exception as e:
if keepTrying > keepTryingMax - 1:
raise Exception(e)
time.sleep(1)
keepTrying += 1
# use the tab-trigger below for new method
# xt-class-method